This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ae3a1f2 [BEAM-6356] Add the option to use TFRecord to store cache
results using PCollection's PCoder (#8687)
ae3a1f2 is described below
commit ae3a1f23e2b59c0cad743de416aa3809516f9554
Author: Alexey Strokach <[email protected]>
AuthorDate: Thu Jun 6 12:16:20 2019 -0700
[BEAM-6356] Add the option to use TFRecord to store cache results using
PCollection's PCoder (#8687)
* Use TFRecord to store intermediate cache results using PCollection's
PCoder.
* Add optional support for TFRecord as a cache serialization format
* Rename _Reader and _Writer to _reader_class and _writer_class
* Clarify the return type of the CacheManager.read method
---
.../runners/interactive/cache_manager.py | 88 ++++++++++++++++++----
.../runners/interactive/cache_manager_test.py | 36 +++++++--
.../runners/interactive/interactive_runner.py | 9 ++-
3 files changed, 113 insertions(+), 20 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py
b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index e8816fe..20d84e3 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -28,6 +28,8 @@ import urllib
import apache_beam as beam
from apache_beam import coders
from apache_beam.io import filesystems
+from apache_beam.io import textio
+from apache_beam.io import tfrecordio
from apache_beam.transforms import combiners
try: # Python 3
@@ -62,9 +64,12 @@ class CacheManager(object):
def read(self, *labels):
"""Return the PCollection as a list as well as the version number.
+ Args:
+ *labels: List of labels for PCollection instance.
+
Returns:
- (List[PCollection])
- (int) the version number
+ Tuple[List[Any], int]: A tuple containing a list of items in the
+ PCollection and the version number.
It is possible that the version numbers from read() and_latest_version()
are different. This usually means that the cache's been evicted (thus
@@ -81,6 +86,25 @@ class CacheManager(object):
"""Returns a beam.io.Sink that writes the PCollection cache."""
raise NotImplementedError
+ def save_pcoder(self, pcoder, *labels):
+ """Saves pcoder for given PCollection.
+
+ Correct reading of PCollection from Cache requires PCoder to be known.
+ This method saves desired PCoder for PCollection that will subsequently
+ be used by sink(...), source(...), and, most importantly, read(...) method.
+ The latter must be able to read a PCollection written by Beam using
+ non-Beam IO.
+
+ Args:
+ pcoder: A PCoder to be used for reading and writing a PCollection.
+ *labels: List of labels for PCollection instance.
+ """
+ raise NotImplementedError
+
+ def load_pcoder(self, *labels):
+ """Returns previously saved PCoder for reading and writing PCollection."""
+ raise NotImplementedError
+
def cleanup(self):
"""Cleans up all the PCollection caches."""
raise NotImplementedError
@@ -89,7 +113,12 @@ class CacheManager(object):
class FileBasedCacheManager(CacheManager):
"""Maps PCollections to local temp files for materialization."""
- def __init__(self, cache_dir=None):
+ _available_formats = {
+ 'text': (textio.ReadFromText, textio.WriteToText),
+ 'tfrecord': (tfrecordio.ReadFromTFRecord, tfrecordio.WriteToTFRecord)
+ }
+
+ def __init__(self, cache_dir=None, cache_format='text'):
if cache_dir:
self._cache_dir = filesystems.FileSystems.join(
cache_dir,
@@ -99,6 +128,25 @@ class FileBasedCacheManager(CacheManager):
prefix='interactive-temp-', dir=os.environ.get('TEST_TMPDIR', None))
self._versions = collections.defaultdict(lambda: self._CacheVersion())
+ if cache_format not in self._available_formats:
+ raise ValueError("Unsupported cache format: '%s'." % cache_format)
+ self._reader_class, self._writer_class = self._available_formats[
+ cache_format]
+ self._default_pcoder = (
+ SafeFastPrimitivesCoder() if cache_format == 'text' else None)
+
+ # List of saved pcoders keyed by PCollection path. It is OK to keep this
+ # list in memory because once FileBasedCacheManager object is
+ # destroyed/re-created it loses the access to previously written cache
+ # objects anyways even if cache_dir already exists. In other words,
+ # it is not possible to resume execution of Beam pipeline from the
+ # saved cache if FileBasedCacheManager has been reset.
+ #
+ # However, if we are to implement better cache persistence, one needs
+ # to take care of keeping consistency between the cached PCollection
+ # and its PCoder type.
+ self._saved_pcoders = {}
+
def exists(self, *labels):
return bool(self._match(*labels))
@@ -109,29 +157,35 @@ class FileBasedCacheManager(CacheManager):
result = self._versions["-".join(labels)].get_version(timestamp)
return result
+ def save_pcoder(self, pcoder, *labels):
+ self._saved_pcoders[self._path(*labels)] = pcoder
+
+ def load_pcoder(self, *labels):
+ return (self._default_pcoder if self._default_pcoder is not None else
+ self._saved_pcoders[self._path(*labels)])
+
def read(self, *labels):
if not self.exists(*labels):
return [], -1
- def _read_helper():
- coder = SafeFastPrimitivesCoder()
- for path in self._match(*labels):
- for line in filesystems.FileSystems.open(path):
- yield coder.decode(line.strip())
- result, version = list(_read_helper()), self._latest_version(*labels)
+ source = self.source(*labels)
+ range_tracker = source.get_range_tracker(None, None)
+ result = list(source.read(range_tracker))
+ version = self._latest_version(*labels)
return result, version
def source(self, *labels):
- return beam.io.ReadFromText(self._glob_path(*labels),
- coder=SafeFastPrimitivesCoder())._source
+ return self._reader_class(
+ self._glob_path(*labels), coder=self.load_pcoder(*labels))._source
def sink(self, *labels):
- return beam.io.WriteToText(self._path(*labels),
- coder=SafeFastPrimitivesCoder())._sink
+ return self._writer_class(
+ self._path(*labels), coder=self.load_pcoder(*labels))._sink
def cleanup(self):
if filesystems.FileSystems.exists(self._cache_dir):
filesystems.FileSystems.delete([self._cache_dir])
+ self._saved_pcoders = {}
def _glob_path(self, *labels):
return self._path(*labels) + '-*-of-*'
@@ -188,6 +242,14 @@ class WriteCache(beam.PTransform):
def expand(self, pcoll):
prefix = 'sample' if self._sample else 'full'
+
+ # We save pcoder that is necessary for proper reading of
+ # cached PCollection. _cache_manager.sink(...) call below
+ # should be using this saved pcoder.
+ self._cache_manager.save_pcoder(
+ coders.registry.get_coder(pcoll.element_type),
+ prefix, self._label)
+
if self._sample:
pcoll |= 'Sample' >> (
combiners.Sample.FixedSizeGlobally(self._sample_size)
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index 641643f..3ad81b8 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -25,14 +25,15 @@ import tempfile
import time
import unittest
+from apache_beam import coders
from apache_beam.io import filesystems
from apache_beam.runners.interactive import cache_manager as cache
-class FileBasedCacheManagerTest(unittest.TestCase):
+class FileBasedCacheManagerTest(object):
"""Unit test for FileBasedCacheManager.
- Note that this set of tests focuses only the the methods that interacts with
+ Note that this set of tests focuses only the methods that interacts with
the LOCAL file system. The idea is that once FileBasedCacheManager works well
with the local file system, it should work with any file system with
`apache_beam.io.filesystem` interface. Those tests that involve interactions
@@ -40,9 +41,12 @@ class FileBasedCacheManagerTest(unittest.TestCase):
tested with InteractiveRunner as a part of integration tests instead.
"""
+ cache_format = None
+
def setUp(self):
self.test_dir = tempfile.mkdtemp()
- self.cache_manager = cache.FileBasedCacheManager(self.test_dir)
+ self.cache_manager = cache.FileBasedCacheManager(
+ self.test_dir, cache_format=self.cache_format)
def tearDown(self):
# The test_dir might have already been removed by cache_manager.cleanup().
@@ -61,10 +65,16 @@ class FileBasedCacheManagerTest(unittest.TestCase):
time.sleep(0.1)
cache_file = cache_label + '-1-of-2'
+ labels = [prefix, cache_label]
+
+ # Usually, the pcoder will be inferred from `pcoll.element_type`
+ pcoder = coders.registry.get_coder(object)
+ self.cache_manager.save_pcoder(pcoder, *labels)
+ sink = self.cache_manager.sink(*labels)
+
with open(self.cache_manager._path(prefix, cache_file), 'wb') as f:
for line in pcoll_list:
- f.write(cache.SafeFastPrimitivesCoder().encode(line))
- f.write(b'\n')
+ sink.write_record(f, line)
def test_exists(self):
"""Test that CacheManager can correctly tell if the cache exists or not."""
@@ -163,5 +173,21 @@ class FileBasedCacheManagerTest(unittest.TestCase):
self.cache_manager.is_latest_version(version, prefix, cache_label))
+class TextFileBasedCacheManagerTest(
+ FileBasedCacheManagerTest,
+ unittest.TestCase,
+):
+
+ cache_format = 'text'
+
+
+class TFRecordBasedCacheManagerTest(
+ FileBasedCacheManagerTest,
+ unittest.TestCase,
+):
+
+ cache_format = 'tfrecord'
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index 391f3f0..4bf125e 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -44,19 +44,24 @@ class InteractiveRunner(runners.PipelineRunner):
Allows interactively building and running Beam Python pipelines.
"""
- def __init__(self, underlying_runner=None, cache_dir=None,
+ def __init__(self,
+ underlying_runner=None,
+ cache_dir=None,
+ cache_format='text',
render_option=None):
"""Constructor of InteractiveRunner.
Args:
underlying_runner: (runner.PipelineRunner)
cache_dir: (str) the directory where PCollection caches are kept
+ cache_format: (str) the file format that should be used for saving
+ PCollection caches. Available options are 'text' and 'tfrecord'.
render_option: (str) this parameter decides how the pipeline graph is
rendered. See display.pipeline_graph_renderer for available options.
"""
self._underlying_runner = (underlying_runner
or direct_runner.DirectRunner())
- self._cache_manager = cache.FileBasedCacheManager(cache_dir)
+ self._cache_manager = cache.FileBasedCacheManager(cache_dir, cache_format)
self._renderer = pipeline_graph_renderer.get_renderer(render_option)
self._in_session = False