This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c039ee2 [BEAM-11666] flake on RecordingManagerTest (#15118)
c039ee2 is described below
commit c039ee2fd42e1e5c7241b4e5deb2dc27927bdee9
Author: AlikRodriguez <[email protected]>
AuthorDate: Wed Sep 15 15:02:55 2021 -0500
[BEAM-11666] flake on RecordingManagerTest (#15118)
---
.../runners/interactive/recording_manager_test.py | 35 ++++++++++++++++------
1 file changed, 26 insertions(+), 9 deletions(-)
diff --git
a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
index ca44ca3..7b7a6e9 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
@@ -437,6 +437,21 @@ class RecordingManagerTest(unittest.TestCase):
set(pipeline_instrument.cache_key(pc) for pc in (elems, squares)))
def test_clear(self):
+ p1 = beam.Pipeline(InteractiveRunner())
+ elems_1 = p1 | 'elems 1' >> beam.Create([0, 1, 2])
+
+ ib.watch(locals())
+ ie.current_env().track_user_pipelines()
+
+ recording_manager = RecordingManager(p1)
+ recording = recording_manager.record([elems_1], max_n=3, max_duration=500)
+ recording.wait_until_finish()
+ record_describe = recording_manager.describe()
+ self.assertGreater(record_describe['size'], 0)
+ recording_manager.clear()
+ self.assertEqual(recording_manager.describe()['size'], 0)
+
+ def test_clear_specific_pipeline(self):
"""Tests that clear can empty the cache for a specific pipeline."""
# Create two pipelines so we can check that clearing the cache won't clear
@@ -461,16 +476,18 @@ class RecordingManagerTest(unittest.TestCase):
rm_2 = RecordingManager(p2)
recording = rm_2.record([elems_2], max_n=3, max_duration=500)
recording.wait_until_finish()
-
# Assert that clearing only one recording clears that recording.
- self.assertGreater(rm_1.describe()['size'], 0)
- self.assertGreater(rm_2.describe()['size'], 0)
- rm_1.clear()
- self.assertEqual(rm_1.describe()['size'], 0)
- self.assertGreater(rm_2.describe()['size'], 0)
-
- rm_2.clear()
- self.assertEqual(rm_2.describe()['size'], 0)
+ if rm_1.describe()['state'] == PipelineState.STOPPED \
+ and rm_2.describe()['state'] == PipelineState.STOPPED:
+
+ self.assertGreater(rm_1.describe()['size'], 0)
+ self.assertGreater(rm_2.describe()['size'], 0)
+ rm_1.clear()
+ self.assertEqual(rm_1.describe()['size'], 0)
+ self.assertGreater(rm_2.describe()['size'], 0)
+
+ rm_2.clear()
+ self.assertEqual(rm_2.describe()['size'], 0)
def test_record_pipeline(self):
# Add the TestStream so that it can be cached.