This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3bbe6ff Fixed empty labels treated as wildcard when matching cache
files (#16440)
3bbe6ff is described below
commit 3bbe6fffea5058f0db1d2b7260f70c26cbf04e1e
Author: Ning Kang <[email protected]>
AuthorDate: Thu Jan 6 08:38:06 2022 -0800
Fixed empty labels treated as wildcard when matching cache files (#16440)
1. Fixed a test watching nested dictionaries by watching the
concerned PCollections directly;
2. Fixed cache manager's "exists" implementations to avoid treating
empty labels as wildcard when matching cache files. This bug
is not a security issue but could cause unexpected behavior when
getting materialized values of PCollections that are not cached.
---
.../python/apache_beam/runners/interactive/cache_manager.py | 4 +++-
.../apache_beam/runners/interactive/cache_manager_test.py | 12 ++++++++++++
.../runners/interactive/caching/streaming_cache.py | 6 ++++--
.../runners/interactive/caching/streaming_cache_test.py | 3 +++
.../apache_beam/runners/interactive/interactive_runner.py | 2 +-
.../runners/interactive/pipeline_fragment_test.py | 13 +++++++------
6 files changed, 30 insertions(+), 10 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py
b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index a697494..2a15e2e 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -194,7 +194,9 @@ class FileBasedCacheManager(CacheManager):
return 0
def exists(self, *labels):
- return bool(self._match(*labels))
+ if labels and any(labels[1:]):
+ return bool(self._match(*labels))
+ return False
def _latest_version(self, *labels):
timestamp = 0
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index d6319b6..5bcf1df 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -68,6 +68,18 @@ class FileBasedCacheManagerTest(object):
self.mock_write_cache(cache_version_one, prefix, cache_label)
self.assertTrue(self.cache_manager.exists(prefix, cache_label))
+ def test_empty_label_not_exist(self):
+ prefix = 'full'
+ cache_label = 'some-cache-label'
+ cache_version_one = ['cache', 'version', 'one']
+
+ self.assertFalse(self.cache_manager.exists(prefix, cache_label))
+ self.mock_write_cache(cache_version_one, prefix, cache_label)
+ self.assertTrue(self.cache_manager.exists(prefix, cache_label))
+
+ # '' shouldn't be treated as a wildcard to match everything.
+ self.assertFalse(self.cache_manager.exists(prefix, ''))
+
def test_size(self):
"""Test getting the size of some cache label."""
diff --git
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index fc8a8aa..8934e52 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -289,8 +289,10 @@ class StreamingCache(CacheManager):
return self._capture_keys
def exists(self, *labels):
- path = os.path.join(self._cache_dir, *labels)
- return os.path.exists(path)
+ if labels and any(labels):
+ path = os.path.join(self._cache_dir, *labels)
+ return os.path.exists(path)
+ return False
# TODO(srohde): Modify this to return the correct version.
def read(self, *labels, **args):
diff --git
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
index 6b81102..c6b2983 100644
---
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
+++
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
@@ -51,6 +51,9 @@ class StreamingCacheTest(unittest.TestCase):
cache.write([TestStreamFileRecord()], 'my_label')
self.assertTrue(cache.exists('my_label'))
+ # '' shouldn't be treated as a wildcard to match everything.
+ self.assertFalse(cache.exists(''))
+
def test_empty(self):
CACHED_PCOLLECTION_KEY = repr(CacheKey('arbitrary_key', '', '', ''))
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index e19b85b..1324016 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -250,7 +250,7 @@ class PipelineResult(beam.runners.runner.PipelineResult):
key = self._pipeline_instrument.cache_key(pcoll)
cache_manager = ie.current_env().get_cache_manager(
self._pipeline_instrument.user_pipeline)
- if cache_manager.exists('full', key):
+ if key and cache_manager.exists('full', key):
coder = cache_manager.load_pcoder('full', key)
reader, _ = cache_manager.read('full', key)
return to_element_list(reader, coder, include_window_info)
diff --git
a/sdks/python/apache_beam/runners/interactive/pipeline_fragment_test.py
b/sdks/python/apache_beam/runners/interactive/pipeline_fragment_test.py
index f1f423f..3e7207f 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_fragment_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_fragment_test.py
@@ -36,6 +36,7 @@ from apache_beam.testing.test_stream import TestStream
'[interactive] dependency is not installed.')
class PipelineFragmentTest(unittest.TestCase):
def setUp(self):
+ ie.new_env()
# Assume a notebook frontend is connected to the mocked ipython kernel.
ie.current_env()._is_in_ipython = True
ie.current_env()._is_in_notebook = True
@@ -147,22 +148,22 @@ class PipelineFragmentTest(unittest.TestCase):
@beam.ptransform_fn
def Bar(pcoll):
- return pcoll | beam.Map(lambda n: n)
+ return pcoll | beam.Map(lambda n: 2 * n)
@beam.ptransform_fn
def Foo(pcoll):
- p1 = pcoll | beam.Map(lambda n: n)
+ p1 = pcoll | beam.Map(lambda n: 3 * n)
p2 = pcoll | beam.Map(str)
- bar = pcoll | Bar()
+ bar = p1 | Bar()
return {'pc1': p1, 'pc2': p2, 'bar': bar}
res = init | Foo()
+ ib.watch(res)
- ib.watch(locals())
- pc = res['pc1']
+ pc = res['bar']
result = pf.PipelineFragment([pc]).run()
- self.assertEqual([0, 1, 2, 3, 4], list(result.get(pc)))
+ self.assertEqual([0, 6, 12, 18, 24], list(result.get(pc)))
if __name__ == '__main__':