This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c11e087  Interactive Beam - enable caching in GCS
c11e087 is described below

commit c11e08745d72a3bda132b2e4ff3f41ca27b5f878
Author: Sindy Li <[email protected]>
AuthorDate: Tue Aug 7 14:54:10 2018 -0700

    Interactive Beam - enable caching in GCS
    
    Changes made:
    * Added last_updated() API to beam.io.filesystem.
        related files in /apache_beam/io
    
    * Generalized LocalFileCacheManager to FileBasedCacheManager. Now
    PCollection caches can be kept anywhere as long as it implements
    beam.io.filesystem.
        related files in /apache_beam/runners
---
 sdks/python/apache_beam/io/filesystem.py           | 14 ++++++++
 sdks/python/apache_beam/io/filesystem_test.py      |  3 ++
 sdks/python/apache_beam/io/filesystems.py          | 15 +++++++++
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py    | 13 ++++++++
 sdks/python/apache_beam/io/gcp/gcsio.py            | 17 ++++++++++
 sdks/python/apache_beam/io/gcp/gcsio_test.py       | 37 +++++++++++++++-------
 sdks/python/apache_beam/io/hadoopfilesystem.py     |  3 ++
 sdks/python/apache_beam/io/localfilesystem.py      | 15 +++++++++
 .../runners/interactive/cache_manager.py           | 25 ++++++++-------
 .../runners/interactive/cache_manager_test.py      | 14 ++++----
 .../runners/interactive/interactive_runner.py      |  7 ++--
 11 files changed, 130 insertions(+), 33 deletions(-)

diff --git a/sdks/python/apache_beam/io/filesystem.py 
b/sdks/python/apache_beam/io/filesystem.py
index e59b1f9..d83e8ff 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -690,6 +690,20 @@ class FileSystem(with_metaclass(abc.ABCMeta, BeamPlugin)):
     """
     raise NotImplementedError
 
+  @abc.abstractmethod
+  def last_updated(self, path):
+    """Get UNIX Epoch time in seconds on the FileSystem.
+
+    Args:
+      path: string path of file.
+
+    Returns: float UNIX Epoch time
+
+    Raises:
+      ``BeamIOError`` if path doesn't exist.
+    """
+    raise NotImplementedError
+
   def checksum(self, path):
     """Fetch checksum metadata of a file on the
     :class:`~apache_beam.io.filesystem.FileSystem`.
diff --git a/sdks/python/apache_beam/io/filesystem_test.py 
b/sdks/python/apache_beam/io/filesystem_test.py
index 8c9ffde..9185cf8 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -92,6 +92,9 @@ class TestingFileSystem(FileSystem):
   def size(self, path):
     raise NotImplementedError
 
+  def last_updated(self, path):
+    raise NotImplementedError
+
   def checksum(self, path):
     raise NotImplementedError
 
diff --git a/sdks/python/apache_beam/io/filesystems.py 
b/sdks/python/apache_beam/io/filesystems.py
index 91786b8..5d9c1fe 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -248,6 +248,21 @@ class FileSystems(object):
     return filesystem.exists(path)
 
   @staticmethod
+  def last_updated(path):
+    """Get UNIX Epoch time in seconds on the FileSystem.
+
+    Args:
+      path: string path of file.
+
+    Returns: float UNIX Epoch time
+
+    Raises:
+      ``BeamIOError`` if path doesn't exist.
+    """
+    filesystem = FileSystems.get_filesystem(path)
+    return filesystem.last_updated(path)
+
+  @staticmethod
   def checksum(path):
     """Fetch checksum metadata of a file on the
     :class:`~apache_beam.io.filesystem.FileSystem`.
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py 
b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index a52df0e..a6081c8 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -266,6 +266,19 @@ class GCSFileSystem(FileSystem):
     """
     return gcsio.GcsIO().size(path)
 
+  def last_updated(self, path):
+    """Get UNIX Epoch time in seconds on the FileSystem.
+
+    Args:
+      path: string path of file.
+
+    Returns: float UNIX Epoch time
+
+    Raises:
+      ``BeamIOError`` if path doesn't exist.
+    """
+    return gcsio.GcsIO().last_updated(path)
+
   def checksum(self, path):
     """Fetch checksum metadata of a file on the
     :class:`~apache_beam.io.filesystem.FileSystem`.
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py 
b/sdks/python/apache_beam/io/gcp/gcsio.py
index 9a64723..7e8a9f0 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -412,6 +412,23 @@ class GcsIO(object):
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def last_updated(self, path):
+    """Returns the last updated epoch time of a single GCS object.
+
+    This method does not perform glob expansion. Hence the given path must be
+    for a single GCS object.
+
+    Returns: last updated time of the GCS object in second.
+    """
+    bucket, object_path = parse_gcs_path(path)
+    request = storage.StorageObjectsGetRequest(
+        bucket=bucket, object=object_path)
+    datetime = self.client.objects.Get(request).updated
+    return (time.mktime(datetime.timetuple()) - time.timezone
+            + datetime.microsecond / 1000000.0)
+
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def list_prefix(self, path):
     """Lists files matching the prefix.
 
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py 
b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index 0a0b16d..1e217a6 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -18,6 +18,7 @@
 from __future__ import absolute_import
 from __future__ import division
 
+import datetime
 import errno
 import logging
 import os
@@ -52,20 +53,28 @@ class FakeGcsClient(object):
 
 class FakeFile(object):
 
-  def __init__(self, bucket, obj, contents, generation, crc32c=None):
+  def __init__(self, bucket, obj, contents, generation, crc32c=None,
+               last_updated=None):
     self.bucket = bucket
     self.object = obj
     self.contents = contents
     self.generation = generation
     self.crc32c = crc32c
+    self.last_updated = last_updated
 
   def get_metadata(self):
+    last_updated_datetime = None
+    if self.last_updated:
+      last_updated_datetime = datetime.datetime.utcfromtimestamp(
+          self.last_updated)
+
     return storage.Object(
         bucket=self.bucket,
         name=self.object,
         generation=self.generation,
         size=len(self.contents),
-        crc32c=self.crc32c)
+        crc32c=self.crc32c,
+        updated=last_updated_datetime)
 
 
 class FakeGcsObjects(object):
@@ -228,9 +237,11 @@ class TestGCSPathParser(unittest.TestCase):
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestGCSIO(unittest.TestCase):
 
-  def _insert_random_file(self, client, path, size, generation=1, crc32c=None):
+  def _insert_random_file(self, client, path, size, generation=1, crc32c=None,
+                          last_updated=None):
     bucket, name = gcsio.parse_gcs_path(path)
-    f = FakeFile(bucket, name, os.urandom(size), generation, crc32c=crc32c)
+    f = FakeFile(bucket, name, os.urandom(size), generation, crc32c=crc32c,
+                 last_updated=last_updated)
     client.objects.add_file(f)
     return f
 
@@ -245,14 +256,6 @@ class TestGCSIO(unittest.TestCase):
     self.assertFalse(self.gcs.exists(file_name + 'xyz'))
     self.assertTrue(self.gcs.exists(file_name))
 
-  def test_checksum(self):
-    file_name = 'gs://gcsio-test/dummy_file'
-    file_size = 1234
-    checksum = 'deadbeef'
-    self._insert_random_file(self.client, file_name, file_size, 
crc32c=checksum)
-    self.assertTrue(self.gcs.exists(file_name))
-    self.assertEqual(checksum, self.gcs.checksum(file_name))
-
   @mock.patch.object(FakeGcsObjects, 'Get')
   def test_exists_failure(self, mock_get):
     # Raising an error other than 404. Raising 404 is a valid failure for
@@ -281,6 +284,16 @@ class TestGCSIO(unittest.TestCase):
     self.assertTrue(self.gcs.exists(file_name))
     self.assertEqual(1234, self.gcs.size(file_name))
 
+  def test_last_updated(self):
+    file_name = 'gs://gcsio-test/dummy_file'
+    file_size = 1234
+    last_updated = 123456.78
+
+    self._insert_random_file(self.client, file_name, file_size,
+                             last_updated=last_updated)
+    self.assertTrue(self.gcs.exists(file_name))
+    self.assertEqual(last_updated, self.gcs.last_updated(file_name))
+
   def test_file_mode(self):
     file_name = 'gs://gcsio-test/dummy_mode_file'
     with self.gcs.open(file_name, 'wb') as f:
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py 
b/sdks/python/apache_beam/io/hadoopfilesystem.py
index 61c16dc..71d74e8 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem.py
@@ -340,6 +340,9 @@ class HadoopFileSystem(FileSystem):
       raise BeamIOError('File not found: %s' % url)
     return status[_FILE_STATUS_LENGTH]
 
+  def last_updated(self, url):
+    raise NotImplementedError
+
   def checksum(self, url):
     """Fetches a checksum description for a URL.
 
diff --git a/sdks/python/apache_beam/io/localfilesystem.py 
b/sdks/python/apache_beam/io/localfilesystem.py
index e373ad0..73d9a4d 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -268,6 +268,21 @@ class LocalFileSystem(FileSystem):
     except Exception as e:  # pylint: disable=broad-except
       raise BeamIOError("Size operation failed", {path: e})
 
+  def last_updated(self, path):
+    """Get UNIX Epoch time in seconds on the FileSystem.
+
+    Args:
+      path: string path of file.
+
+    Returns: float UNIX Epoch time
+
+    Raises:
+      ``BeamIOError`` if path doesn't exist.
+    """
+    if not self.exists(path):
+      raise BeamIOError('Path does not exist: %s' % path)
+    return os.path.getmtime(path)
+
   def checksum(self, path):
     """Fetch checksum metadata of a file on the
     :class:`~apache_beam.io.filesystem.FileSystem`.
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py 
b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index fcbb04a..f7913f2 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -20,9 +20,7 @@ from __future__ import division
 from __future__ import print_function
 
 import collections
-import glob
 import os
-import shutil
 import tempfile
 import urllib
 
@@ -79,7 +77,7 @@ class CacheManager(object):
     raise NotImplementedError
 
 
-class LocalFileCacheManager(CacheManager):
+class FileBasedCacheManager(CacheManager):
   """Maps PCollections to local temp files for materialization."""
 
   def __init__(self, temp_dir=None):
@@ -88,14 +86,12 @@ class LocalFileCacheManager(CacheManager):
     self._versions = collections.defaultdict(lambda: self._CacheVersion())
 
   def exists(self, *labels):
-    return bool(
-        filesystems.FileSystems.match([self._glob_path(*labels)],
-                                      limits=[1])[0].metadata_list)
+    return bool(self._match(*labels))
 
   def _latest_version(self, *labels):
     timestamp = 0
-    for path in glob.glob(self._glob_path(*labels)):
-      timestamp = max(timestamp, os.path.getmtime(path))
+    for path in self._match(*labels):
+      timestamp = max(timestamp, filesystems.FileSystems.last_updated(path))
     result = self._versions["-".join(labels)].get_version(timestamp)
     return result
 
@@ -105,8 +101,8 @@ class LocalFileCacheManager(CacheManager):
 
     def _read_helper():
       coder = SafeFastPrimitivesCoder()
-      for path in glob.glob(self._glob_path(*labels)):
-        for line in open(path):
+      for path in self._match(*labels):
+        for line in filesystems.FileSystems.open(path):
           yield coder.decode(line.strip())
     result, version = list(_read_helper()), self._latest_version(*labels)
     return result, version
@@ -120,8 +116,8 @@ class LocalFileCacheManager(CacheManager):
                                coder=SafeFastPrimitivesCoder())._sink
 
   def cleanup(self):
-    if os.path.exists(self._temp_dir):
-      shutil.rmtree(self._temp_dir)
+    if filesystems.FileSystems.exists(self._temp_dir):
+      filesystems.FileSystems.delete([self._temp_dir])
 
   def _glob_path(self, *labels):
     return self._path(*labels) + '-*-of-*'
@@ -129,6 +125,11 @@ class LocalFileCacheManager(CacheManager):
   def _path(self, *labels):
     return filesystems.FileSystems.join(self._temp_dir, *labels)
 
+  def _match(self, *labels):
+    match = filesystems.FileSystems.match([self._glob_path(*labels)])
+    assert len(match) == 1
+    return [metadata.path for metadata in match[0].metadata_list]
+
   class _CacheVersion(object):
     """This class keeps track of the timestamp and the corresponding 
version."""
 
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py 
b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index c8fdc9c..3377825 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -29,18 +29,20 @@ from apache_beam.io import filesystems
 from apache_beam.runners.interactive import cache_manager as cache
 
 
-class LocalFileCacheManagerTest(unittest.TestCase):
-  """Unit test for LocalFileCacheManager.
+class FileBasedCacheManagerTest(unittest.TestCase):
+  """Unit test for FileBasedCacheManager.
 
   Note that this set of tests focuses only the the methods that interacts with
-  the local operating system. Those tests that involve interactions with Beam
-  (i.e. source(), sink(), ReadCache, and WriteCache) will be tested with
-  InteractiveRunner as a part of integration tests instead.
+  the LOCAL file system. The idea is that once FileBasedCacheManager works well
+  with the local file system, it should work with any file system with
+  `apache_beam.io.filesystem` interface. Those tests that involve interactions
+  with Beam pipeline (i.e. source(), sink(), ReadCache, and WriteCache) will be
+  tested with InteractiveRunner as a part of integration tests instead.
   """
 
   def setUp(self):
     self.test_dir = tempfile.mkdtemp()
-    self.cache_manager = cache.LocalFileCacheManager(self.test_dir)
+    self.cache_manager = cache.FileBasedCacheManager(self.test_dir)
 
   def tearDown(self):
     # The test_dir might have already been removed by cache_manager.cleanup().
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py 
b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index bdc909b..b0bb918 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -45,11 +45,12 @@ class InteractiveRunner(runners.PipelineRunner):
   Allows interactively building and running Beam Python pipelines.
   """
 
-  def __init__(self, 
underlying_runner=direct_runner.BundleBasedDirectRunner()):
+  def __init__(self, underlying_runner=None, cache_dir=None):
     # TODO(qinyeli, BEAM-4755) remove explicitly overriding underlying runner
     # once interactive_runner works with FnAPI mode
-    self._underlying_runner = underlying_runner
-    self._cache_manager = cache.LocalFileCacheManager()
+    self._underlying_runner = (underlying_runner
+                               or direct_runner.BundleBasedDirectRunner())
+    self._cache_manager = cache.FileBasedCacheManager(cache_dir)
     self._in_session = False
 
   def start_session(self):

Reply via email to