This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bbe6ff  Fixed empty labels treated as wildcard when matching cache 
files (#16440)
3bbe6ff is described below

commit 3bbe6fffea5058f0db1d2b7260f70c26cbf04e1e
Author: Ning Kang <[email protected]>
AuthorDate: Thu Jan 6 08:38:06 2022 -0800

    Fixed empty labels treated as wildcard when matching cache files (#16440)
    
    1. Fixed a test watching nested dictionaries by watching the
       concerned PCollections directly;
    2. Fixed cache manager's "exists" implementations to avoid treating
       empty labels as wildcard when matching cache files. This bug
       is not a security issue but could cause unexpected behavior when
       getting materialized values of PCollections that are not cached.
---
 .../python/apache_beam/runners/interactive/cache_manager.py |  4 +++-
 .../apache_beam/runners/interactive/cache_manager_test.py   | 12 ++++++++++++
 .../runners/interactive/caching/streaming_cache.py          |  6 ++++--
 .../runners/interactive/caching/streaming_cache_test.py     |  3 +++
 .../apache_beam/runners/interactive/interactive_runner.py   |  2 +-
 .../runners/interactive/pipeline_fragment_test.py           | 13 +++++++------
 6 files changed, 30 insertions(+), 10 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py 
b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index a697494..2a15e2e 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -194,7 +194,9 @@ class FileBasedCacheManager(CacheManager):
     return 0
 
   def exists(self, *labels):
-    return bool(self._match(*labels))
+    if labels and any(labels[1:]):
+      return bool(self._match(*labels))
+    return False
 
   def _latest_version(self, *labels):
     timestamp = 0
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py 
b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index d6319b6..5bcf1df 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -68,6 +68,18 @@ class FileBasedCacheManagerTest(object):
     self.mock_write_cache(cache_version_one, prefix, cache_label)
     self.assertTrue(self.cache_manager.exists(prefix, cache_label))
 
+  def test_empty_label_not_exist(self):
+    prefix = 'full'
+    cache_label = 'some-cache-label'
+    cache_version_one = ['cache', 'version', 'one']
+
+    self.assertFalse(self.cache_manager.exists(prefix, cache_label))
+    self.mock_write_cache(cache_version_one, prefix, cache_label)
+    self.assertTrue(self.cache_manager.exists(prefix, cache_label))
+
+    # '' shouldn't be treated as a wildcard to match everything.
+    self.assertFalse(self.cache_manager.exists(prefix, ''))
+
   def test_size(self):
     """Test getting the size of some cache label."""
 
diff --git 
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py 
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index fc8a8aa..8934e52 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -289,8 +289,10 @@ class StreamingCache(CacheManager):
     return self._capture_keys
 
   def exists(self, *labels):
-    path = os.path.join(self._cache_dir, *labels)
-    return os.path.exists(path)
+    if labels and any(labels):
+      path = os.path.join(self._cache_dir, *labels)
+      return os.path.exists(path)
+    return False
 
   # TODO(srohde): Modify this to return the correct version.
   def read(self, *labels, **args):
diff --git 
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py 
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
index 6b81102..c6b2983 100644
--- 
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
+++ 
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache_test.py
@@ -51,6 +51,9 @@ class StreamingCacheTest(unittest.TestCase):
     cache.write([TestStreamFileRecord()], 'my_label')
     self.assertTrue(cache.exists('my_label'))
 
+    # '' shouldn't be treated as a wildcard to match everything.
+    self.assertFalse(cache.exists(''))
+
   def test_empty(self):
     CACHED_PCOLLECTION_KEY = repr(CacheKey('arbitrary_key', '', '', ''))
 
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py 
b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index e19b85b..1324016 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -250,7 +250,7 @@ class PipelineResult(beam.runners.runner.PipelineResult):
     key = self._pipeline_instrument.cache_key(pcoll)
     cache_manager = ie.current_env().get_cache_manager(
         self._pipeline_instrument.user_pipeline)
-    if cache_manager.exists('full', key):
+    if key and cache_manager.exists('full', key):
       coder = cache_manager.load_pcoder('full', key)
       reader, _ = cache_manager.read('full', key)
       return to_element_list(reader, coder, include_window_info)
diff --git 
a/sdks/python/apache_beam/runners/interactive/pipeline_fragment_test.py 
b/sdks/python/apache_beam/runners/interactive/pipeline_fragment_test.py
index f1f423f..3e7207f 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_fragment_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_fragment_test.py
@@ -36,6 +36,7 @@ from apache_beam.testing.test_stream import TestStream
     '[interactive] dependency is not installed.')
 class PipelineFragmentTest(unittest.TestCase):
   def setUp(self):
+    ie.new_env()
     # Assume a notebook frontend is connected to the mocked ipython kernel.
     ie.current_env()._is_in_ipython = True
     ie.current_env()._is_in_notebook = True
@@ -147,22 +148,22 @@ class PipelineFragmentTest(unittest.TestCase):
 
       @beam.ptransform_fn
       def Bar(pcoll):
-        return pcoll | beam.Map(lambda n: n)
+        return pcoll | beam.Map(lambda n: 2 * n)
 
       @beam.ptransform_fn
       def Foo(pcoll):
-        p1 = pcoll | beam.Map(lambda n: n)
+        p1 = pcoll | beam.Map(lambda n: 3 * n)
         p2 = pcoll | beam.Map(str)
-        bar = pcoll | Bar()
+        bar = p1 | Bar()
         return {'pc1': p1, 'pc2': p2, 'bar': bar}
 
       res = init | Foo()
+      ib.watch(res)
 
-    ib.watch(locals())
-    pc = res['pc1']
+    pc = res['bar']
 
     result = pf.PipelineFragment([pc]).run()
-    self.assertEqual([0, 1, 2, 3, 4], list(result.get(pc)))
+    self.assertEqual([0, 6, 12, 18, 24], list(result.get(pc)))
 
 
 if __name__ == '__main__':

Reply via email to