Valentyn Tymofieiev created BEAM-11666:
------------------------------------------
Summary:
apache_beam.runners.interactive.recording_manager_test.RecordingManagerTest.test_basic_execution
is flaky
Key: BEAM-11666
URL: https://issues.apache.org/jira/browse/BEAM-11666
Project: Beam
Issue Type: Bug
Components: test-failures
Reporter: Valentyn Tymofieiev
Assignee: Ning Kang
Happened in: https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/16819
{noformat}
self =
<apache_beam.runners.interactive.recording_manager_test.RecordingManagerTest
testMethod=test_basic_execution>
@unittest.skipIf(
sys.version_info < (3, 6, 0),
'This test requires at least Python 3.6 to work.')
def test_basic_execution(self):
"""A basic pipeline to be used as a smoke test."""
# Create the pipeline that will emit 0, 1, 2.
p = beam.Pipeline(InteractiveRunner())
numbers = p | 'numbers' >> beam.Create([0, 1, 2])
letters = p | 'letters' >> beam.Create(['a', 'b', 'c'])
# Watch the pipeline and PCollections. This is normally done in a notebook
# environment automatically, but we have to do it manually here.
ib.watch(locals())
ie.current_env().track_user_pipelines()
# Create the recording objects. By calling `record` a new PipelineFragment
# is started to compute the given PCollections and cache to disk.
rm = RecordingManager(p)
> numbers_recording = rm.record([numbers], max_n=3, max_duration=500)
apache_beam/runners/interactive/recording_manager_test.py:331:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/interactive/recording_manager.py:435: in record
self._clear(pipeline_instrument)
apache_beam/runners/interactive/recording_manager.py:319: in _clear
self._clear_pcolls(cache_manager, set(to_clear))
apache_beam/runners/interactive/recording_manager.py:323: in _clear_pcolls
cache_manager.clear('full', pc)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
<apache_beam.runners.interactive.testing.test_cache_manager.InMemoryCache
object at 0x7fa3903ac208>
labels = ('full', 'ee5c35ce3d-140340882711664-140340882712560-140340476166608')
def clear(self, *labels):
# type (*str) -> Boolean
"""Clears the cache entry of the given labels and returns True on success.
Args:
value: An encodable (with corresponding PCoder) value
*labels: List of labels for PCollection instance
"""
> raise NotImplementedError
E NotImplementedError
{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)