This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c11e087 Interactive Beam - enable caching in GCS
c11e087 is described below
commit c11e08745d72a3bda132b2e4ff3f41ca27b5f878
Author: Sindy Li <[email protected]>
AuthorDate: Tue Aug 7 14:54:10 2018 -0700
Interactive Beam - enable caching in GCS
Changes made:
* Added last_updated() API to beam.io.filesystem.
related files in /apache_beam/io
* Generalized LocalFileCacheManager to FileBasedCacheManager. Now
PCollection caches can be kept anywhere as long as it implements
beam.io.filesystem.
related files in /apache_beam/runners
---
sdks/python/apache_beam/io/filesystem.py | 14 ++++++++
sdks/python/apache_beam/io/filesystem_test.py | 3 ++
sdks/python/apache_beam/io/filesystems.py | 15 +++++++++
sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 13 ++++++++
sdks/python/apache_beam/io/gcp/gcsio.py | 17 ++++++++++
sdks/python/apache_beam/io/gcp/gcsio_test.py | 37 +++++++++++++++-------
sdks/python/apache_beam/io/hadoopfilesystem.py | 3 ++
sdks/python/apache_beam/io/localfilesystem.py | 15 +++++++++
.../runners/interactive/cache_manager.py | 25 ++++++++-------
.../runners/interactive/cache_manager_test.py | 14 ++++----
.../runners/interactive/interactive_runner.py | 7 ++--
11 files changed, 130 insertions(+), 33 deletions(-)
diff --git a/sdks/python/apache_beam/io/filesystem.py
b/sdks/python/apache_beam/io/filesystem.py
index e59b1f9..d83e8ff 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -690,6 +690,20 @@ class FileSystem(with_metaclass(abc.ABCMeta, BeamPlugin)):
"""
raise NotImplementedError
+ @abc.abstractmethod
+ def last_updated(self, path):
+ """Get UNIX Epoch time in seconds on the FileSystem.
+
+ Args:
+ path: string path of file.
+
+ Returns: float UNIX Epoch time
+
+ Raises:
+ ``BeamIOError`` if path doesn't exist.
+ """
+ raise NotImplementedError
+
def checksum(self, path):
"""Fetch checksum metadata of a file on the
:class:`~apache_beam.io.filesystem.FileSystem`.
diff --git a/sdks/python/apache_beam/io/filesystem_test.py
b/sdks/python/apache_beam/io/filesystem_test.py
index 8c9ffde..9185cf8 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -92,6 +92,9 @@ class TestingFileSystem(FileSystem):
def size(self, path):
raise NotImplementedError
+ def last_updated(self, path):
+ raise NotImplementedError
+
def checksum(self, path):
raise NotImplementedError
diff --git a/sdks/python/apache_beam/io/filesystems.py
b/sdks/python/apache_beam/io/filesystems.py
index 91786b8..5d9c1fe 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -248,6 +248,21 @@ class FileSystems(object):
return filesystem.exists(path)
@staticmethod
+ def last_updated(path):
+ """Get UNIX Epoch time in seconds on the FileSystem.
+
+ Args:
+ path: string path of file.
+
+ Returns: float UNIX Epoch time
+
+ Raises:
+ ``BeamIOError`` if path doesn't exist.
+ """
+ filesystem = FileSystems.get_filesystem(path)
+ return filesystem.last_updated(path)
+
+ @staticmethod
def checksum(path):
"""Fetch checksum metadata of a file on the
:class:`~apache_beam.io.filesystem.FileSystem`.
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index a52df0e..a6081c8 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -266,6 +266,19 @@ class GCSFileSystem(FileSystem):
"""
return gcsio.GcsIO().size(path)
+ def last_updated(self, path):
+ """Get UNIX Epoch time in seconds on the FileSystem.
+
+ Args:
+ path: string path of file.
+
+ Returns: float UNIX Epoch time
+
+ Raises:
+ ``BeamIOError`` if path doesn't exist.
+ """
+ return gcsio.GcsIO().last_updated(path)
+
def checksum(self, path):
"""Fetch checksum metadata of a file on the
:class:`~apache_beam.io.filesystem.FileSystem`.
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py
b/sdks/python/apache_beam/io/gcp/gcsio.py
index 9a64723..7e8a9f0 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -412,6 +412,23 @@ class GcsIO(object):
@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+ def last_updated(self, path):
+ """Returns the last updated epoch time of a single GCS object.
+
+ This method does not perform glob expansion. Hence the given path must be
+ for a single GCS object.
+
+ Returns: last updated time of the GCS object in second.
+ """
+ bucket, object_path = parse_gcs_path(path)
+ request = storage.StorageObjectsGetRequest(
+ bucket=bucket, object=object_path)
+ datetime = self.client.objects.Get(request).updated
+ return (time.mktime(datetime.timetuple()) - time.timezone
+ + datetime.microsecond / 1000000.0)
+
+ @retry.with_exponential_backoff(
+ retry_filter=retry.retry_on_server_errors_and_timeout_filter)
def list_prefix(self, path):
"""Lists files matching the prefix.
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py
b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index 0a0b16d..1e217a6 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -18,6 +18,7 @@
from __future__ import absolute_import
from __future__ import division
+import datetime
import errno
import logging
import os
@@ -52,20 +53,28 @@ class FakeGcsClient(object):
class FakeFile(object):
- def __init__(self, bucket, obj, contents, generation, crc32c=None):
+ def __init__(self, bucket, obj, contents, generation, crc32c=None,
+ last_updated=None):
self.bucket = bucket
self.object = obj
self.contents = contents
self.generation = generation
self.crc32c = crc32c
+ self.last_updated = last_updated
def get_metadata(self):
+ last_updated_datetime = None
+ if self.last_updated:
+ last_updated_datetime = datetime.datetime.utcfromtimestamp(
+ self.last_updated)
+
return storage.Object(
bucket=self.bucket,
name=self.object,
generation=self.generation,
size=len(self.contents),
- crc32c=self.crc32c)
+ crc32c=self.crc32c,
+ updated=last_updated_datetime)
class FakeGcsObjects(object):
@@ -228,9 +237,11 @@ class TestGCSPathParser(unittest.TestCase):
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestGCSIO(unittest.TestCase):
- def _insert_random_file(self, client, path, size, generation=1, crc32c=None):
+ def _insert_random_file(self, client, path, size, generation=1, crc32c=None,
+ last_updated=None):
bucket, name = gcsio.parse_gcs_path(path)
- f = FakeFile(bucket, name, os.urandom(size), generation, crc32c=crc32c)
+ f = FakeFile(bucket, name, os.urandom(size), generation, crc32c=crc32c,
+ last_updated=last_updated)
client.objects.add_file(f)
return f
@@ -245,14 +256,6 @@ class TestGCSIO(unittest.TestCase):
self.assertFalse(self.gcs.exists(file_name + 'xyz'))
self.assertTrue(self.gcs.exists(file_name))
- def test_checksum(self):
- file_name = 'gs://gcsio-test/dummy_file'
- file_size = 1234
- checksum = 'deadbeef'
- self._insert_random_file(self.client, file_name, file_size,
crc32c=checksum)
- self.assertTrue(self.gcs.exists(file_name))
- self.assertEqual(checksum, self.gcs.checksum(file_name))
-
@mock.patch.object(FakeGcsObjects, 'Get')
def test_exists_failure(self, mock_get):
# Raising an error other than 404. Raising 404 is a valid failure for
@@ -281,6 +284,16 @@ class TestGCSIO(unittest.TestCase):
self.assertTrue(self.gcs.exists(file_name))
self.assertEqual(1234, self.gcs.size(file_name))
+ def test_last_updated(self):
+ file_name = 'gs://gcsio-test/dummy_file'
+ file_size = 1234
+ last_updated = 123456.78
+
+ self._insert_random_file(self.client, file_name, file_size,
+ last_updated=last_updated)
+ self.assertTrue(self.gcs.exists(file_name))
+ self.assertEqual(last_updated, self.gcs.last_updated(file_name))
+
def test_file_mode(self):
file_name = 'gs://gcsio-test/dummy_mode_file'
with self.gcs.open(file_name, 'wb') as f:
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py
b/sdks/python/apache_beam/io/hadoopfilesystem.py
index 61c16dc..71d74e8 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem.py
@@ -340,6 +340,9 @@ class HadoopFileSystem(FileSystem):
raise BeamIOError('File not found: %s' % url)
return status[_FILE_STATUS_LENGTH]
+ def last_updated(self, url):
+ raise NotImplementedError
+
def checksum(self, url):
"""Fetches a checksum description for a URL.
diff --git a/sdks/python/apache_beam/io/localfilesystem.py
b/sdks/python/apache_beam/io/localfilesystem.py
index e373ad0..73d9a4d 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -268,6 +268,21 @@ class LocalFileSystem(FileSystem):
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Size operation failed", {path: e})
+ def last_updated(self, path):
+ """Get UNIX Epoch time in seconds on the FileSystem.
+
+ Args:
+ path: string path of file.
+
+ Returns: float UNIX Epoch time
+
+ Raises:
+ ``BeamIOError`` if path doesn't exist.
+ """
+ if not self.exists(path):
+ raise BeamIOError('Path does not exist: %s' % path)
+ return os.path.getmtime(path)
+
def checksum(self, path):
"""Fetch checksum metadata of a file on the
:class:`~apache_beam.io.filesystem.FileSystem`.
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py
b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index fcbb04a..f7913f2 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -20,9 +20,7 @@ from __future__ import division
from __future__ import print_function
import collections
-import glob
import os
-import shutil
import tempfile
import urllib
@@ -79,7 +77,7 @@ class CacheManager(object):
raise NotImplementedError
-class LocalFileCacheManager(CacheManager):
+class FileBasedCacheManager(CacheManager):
"""Maps PCollections to local temp files for materialization."""
def __init__(self, temp_dir=None):
@@ -88,14 +86,12 @@ class LocalFileCacheManager(CacheManager):
self._versions = collections.defaultdict(lambda: self._CacheVersion())
def exists(self, *labels):
- return bool(
- filesystems.FileSystems.match([self._glob_path(*labels)],
- limits=[1])[0].metadata_list)
+ return bool(self._match(*labels))
def _latest_version(self, *labels):
timestamp = 0
- for path in glob.glob(self._glob_path(*labels)):
- timestamp = max(timestamp, os.path.getmtime(path))
+ for path in self._match(*labels):
+ timestamp = max(timestamp, filesystems.FileSystems.last_updated(path))
result = self._versions["-".join(labels)].get_version(timestamp)
return result
@@ -105,8 +101,8 @@ class LocalFileCacheManager(CacheManager):
def _read_helper():
coder = SafeFastPrimitivesCoder()
- for path in glob.glob(self._glob_path(*labels)):
- for line in open(path):
+ for path in self._match(*labels):
+ for line in filesystems.FileSystems.open(path):
yield coder.decode(line.strip())
result, version = list(_read_helper()), self._latest_version(*labels)
return result, version
@@ -120,8 +116,8 @@ class LocalFileCacheManager(CacheManager):
coder=SafeFastPrimitivesCoder())._sink
def cleanup(self):
- if os.path.exists(self._temp_dir):
- shutil.rmtree(self._temp_dir)
+ if filesystems.FileSystems.exists(self._temp_dir):
+ filesystems.FileSystems.delete([self._temp_dir])
def _glob_path(self, *labels):
return self._path(*labels) + '-*-of-*'
@@ -129,6 +125,11 @@ class LocalFileCacheManager(CacheManager):
def _path(self, *labels):
return filesystems.FileSystems.join(self._temp_dir, *labels)
+ def _match(self, *labels):
+ match = filesystems.FileSystems.match([self._glob_path(*labels)])
+ assert len(match) == 1
+ return [metadata.path for metadata in match[0].metadata_list]
+
class _CacheVersion(object):
"""This class keeps track of the timestamp and the corresponding
version."""
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index c8fdc9c..3377825 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -29,18 +29,20 @@ from apache_beam.io import filesystems
from apache_beam.runners.interactive import cache_manager as cache
-class LocalFileCacheManagerTest(unittest.TestCase):
- """Unit test for LocalFileCacheManager.
+class FileBasedCacheManagerTest(unittest.TestCase):
+ """Unit test for FileBasedCacheManager.
Note that this set of tests focuses only the the methods that interacts with
- the local operating system. Those tests that involve interactions with Beam
- (i.e. source(), sink(), ReadCache, and WriteCache) will be tested with
- InteractiveRunner as a part of integration tests instead.
+ the LOCAL file system. The idea is that once FileBasedCacheManager works well
+ with the local file system, it should work with any file system with
+ `apache_beam.io.filesystem` interface. Those tests that involve interactions
+ with Beam pipeline (i.e. source(), sink(), ReadCache, and WriteCache) will be
+ tested with InteractiveRunner as a part of integration tests instead.
"""
def setUp(self):
self.test_dir = tempfile.mkdtemp()
- self.cache_manager = cache.LocalFileCacheManager(self.test_dir)
+ self.cache_manager = cache.FileBasedCacheManager(self.test_dir)
def tearDown(self):
# The test_dir might have already been removed by cache_manager.cleanup().
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index bdc909b..b0bb918 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -45,11 +45,12 @@ class InteractiveRunner(runners.PipelineRunner):
Allows interactively building and running Beam Python pipelines.
"""
- def __init__(self,
underlying_runner=direct_runner.BundleBasedDirectRunner()):
+ def __init__(self, underlying_runner=None, cache_dir=None):
# TODO(qinyeli, BEAM-4755) remove explicitly overriding underlying runner
# once interactive_runner works with FnAPI mode
- self._underlying_runner = underlying_runner
- self._cache_manager = cache.LocalFileCacheManager()
+ self._underlying_runner = (underlying_runner
+ or direct_runner.BundleBasedDirectRunner())
+ self._cache_manager = cache.FileBasedCacheManager(cache_dir)
self._in_session = False
def start_session(self):