This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cd6d904  [BEAM-10514] Restrict cache file path length
     new 7e4e501  Merge pull request #12283 from KevinGG/fix_path_length
cd6d904 is described below

commit cd6d904a169957537297be2aa420ede1e5129df9
Author: Ning Kang <[email protected]>
AuthorDate: Thu Jul 16 13:57:22 2020 -0700

    [BEAM-10514] Restrict cache file path length
    
    1. Removed unnecessary path components from the cache file paths.
    2. Obfuscated the variable name in the cache key that is used as part of
       cache file path so that it only takes constant length in the file
       path.
    
    Change-Id: I8d84351744b228e1c88154f46cd7578afc631608
---
 .../python/apache_beam/runners/interactive/cache_manager.py | 13 +++++++------
 .../runners/interactive/interactive_environment.py          |  2 +-
 .../apache_beam/runners/interactive/pipeline_instrument.py  |  5 ++++-
 3 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py 
b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index f583662..9894535 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -22,7 +22,6 @@ from __future__ import division
 from __future__ import print_function
 
 import collections
-import datetime
 import os
 import sys
 import tempfile
@@ -107,7 +106,11 @@ class CacheManager(object):
   def sink(self, labels, is_capture=False):
     # type (*str, bool) -> ptransform.PTransform
 
-    """Returns a PTransform that writes the PCollection cache."""
+    """Returns a PTransform that writes the PCollection cache.
+
+    TODO(BEAM-10514): Make sure labels will not be converted into an
+    arbitrarily long file path: e.g., windows has a 260 path limit.
+    """
     raise NotImplementedError
 
   def save_pcoder(self, pcoder, *labels):
@@ -150,12 +153,10 @@ class FileBasedCacheManager(CacheManager):
 
   def __init__(self, cache_dir=None, cache_format='text'):
     if cache_dir:
-      self._cache_dir = filesystems.FileSystems.join(
-          cache_dir,
-          datetime.datetime.now().strftime("cache-%y-%m-%d-%H_%M_%S"))
+      self._cache_dir = cache_dir
     else:
       self._cache_dir = tempfile.mkdtemp(
-          prefix='interactive-temp-', dir=os.environ.get('TEST_TMPDIR', None))
+          prefix='it-', dir=os.environ.get('TEST_TMPDIR', None))
     self._versions = collections.defaultdict(lambda: self._CacheVersion())
 
     if cache_format not in self._available_formats:
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_environment.py 
b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index 94dbb57..1d28517 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -319,7 +319,7 @@ class InteractiveEnvironment(object):
     if not cache_manager and create_if_absent:
       cache_dir = tempfile.mkdtemp(
           suffix=str(id(pipeline)),
-          prefix='interactive-temp-',
+          prefix='it-',
           dir=os.environ.get('TEST_TMPDIR', None))
       cache_manager = cache.FileBasedCacheManager(cache_dir)
       self._cache_managers[str(id(pipeline))] = cache_manager
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py 
b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
index 02fca9b..b30f79f 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
@@ -32,6 +32,7 @@ from apache_beam.runners.interactive import cache_manager as 
cache
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import pipeline_fragment as pf
 from apache_beam.runners.interactive import background_caching_job
+from apache_beam.runners.interactive.utils import obfuscate
 from apache_beam.testing import test_stream
 from apache_beam.transforms.window import WindowedValue
 
@@ -66,7 +67,9 @@ class Cacheable:
 # TODO: turn this into a dataclass object when we finally get off of Python2.
 class CacheKey:
   def __init__(self, var, version, producer_version, pipeline_id):
-    self.var = var
+    # Makes sure that the variable name is obfuscated and only first 10
+    # characters taken so that the CacheKey has a constant length.
+    self.var = obfuscate(var)[:10]
     self.version = version
     self.producer_version = producer_version
     self.pipeline_id = pipeline_id

Reply via email to