This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cd6d904 [BEAM-10514] Restrict cache file path length
new 7e4e501 Merge pull request #12283 from KevinGG/fix_path_length
cd6d904 is described below
commit cd6d904a169957537297be2aa420ede1e5129df9
Author: Ning Kang <[email protected]>
AuthorDate: Thu Jul 16 13:57:22 2020 -0700
[BEAM-10514] Restrict cache file path length
1. Removed unnecessary path components from the cache file paths.
2. Obfuscated the variable name in the cache key that is used as part of
cache file path so that it only takes constant length in the file
path.
Change-Id: I8d84351744b228e1c88154f46cd7578afc631608
---
.../python/apache_beam/runners/interactive/cache_manager.py | 13 +++++++------
.../runners/interactive/interactive_environment.py | 2 +-
.../apache_beam/runners/interactive/pipeline_instrument.py | 5 ++++-
3 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py
b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index f583662..9894535 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -22,7 +22,6 @@ from __future__ import division
from __future__ import print_function
import collections
-import datetime
import os
import sys
import tempfile
@@ -107,7 +106,11 @@ class CacheManager(object):
def sink(self, labels, is_capture=False):
# type (*str, bool) -> ptransform.PTransform
- """Returns a PTransform that writes the PCollection cache."""
+ """Returns a PTransform that writes the PCollection cache.
+
+ TODO(BEAM-10514): Make sure labels will not be converted into an
+ arbitrarily long file path: e.g., windows has a 260 path limit.
+ """
raise NotImplementedError
def save_pcoder(self, pcoder, *labels):
@@ -150,12 +153,10 @@ class FileBasedCacheManager(CacheManager):
def __init__(self, cache_dir=None, cache_format='text'):
if cache_dir:
- self._cache_dir = filesystems.FileSystems.join(
- cache_dir,
- datetime.datetime.now().strftime("cache-%y-%m-%d-%H_%M_%S"))
+ self._cache_dir = cache_dir
else:
self._cache_dir = tempfile.mkdtemp(
- prefix='interactive-temp-', dir=os.environ.get('TEST_TMPDIR', None))
+ prefix='it-', dir=os.environ.get('TEST_TMPDIR', None))
self._versions = collections.defaultdict(lambda: self._CacheVersion())
if cache_format not in self._available_formats:
diff --git
a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index 94dbb57..1d28517 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -319,7 +319,7 @@ class InteractiveEnvironment(object):
if not cache_manager and create_if_absent:
cache_dir = tempfile.mkdtemp(
suffix=str(id(pipeline)),
- prefix='interactive-temp-',
+ prefix='it-',
dir=os.environ.get('TEST_TMPDIR', None))
cache_manager = cache.FileBasedCacheManager(cache_dir)
self._cache_managers[str(id(pipeline))] = cache_manager
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
index 02fca9b..b30f79f 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
@@ -32,6 +32,7 @@ from apache_beam.runners.interactive import cache_manager as
cache
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import pipeline_fragment as pf
from apache_beam.runners.interactive import background_caching_job
+from apache_beam.runners.interactive.utils import obfuscate
from apache_beam.testing import test_stream
from apache_beam.transforms.window import WindowedValue
@@ -66,7 +67,9 @@ class Cacheable:
# TODO: turn this into a dataclass object when we finally get off of Python2.
class CacheKey:
def __init__(self, var, version, producer_version, pipeline_id):
- self.var = var
+ # Makes sure that the variable name is obfuscated and only first 10
+ # characters taken so that the CacheKey has a constant length.
+ self.var = obfuscate(var)[:10]
self.version = version
self.producer_version = producer_version
self.pipeline_id = pipeline_id