This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 28818e3 [BEAM-2857] Implementing WriteToFiles transform for fileio
(Python) (#8394)
28818e3 is described below
commit 28818e32c078316951da5dbc346d1f918a153d1a
Author: Pablo <[email protected]>
AuthorDate: Tue Jun 4 23:40:48 2019 -0700
[BEAM-2857] Implementing WriteToFiles transform for fileio (Python) (#8394)
* [BEAM-2857] Implementing WriteToFiles transform for fileio (Python)
---
sdks/python/apache_beam/io/fileio.py | 558 +++++++++++++++++++++++-
sdks/python/apache_beam/io/fileio_test.py | 374 +++++++++++++++-
sdks/python/apache_beam/io/filesystem.py | 5 +-
sdks/python/apache_beam/testing/util.py | 19 +
sdks/python/apache_beam/transforms/core.py | 30 +-
sdks/python/apache_beam/transforms/trigger.py | 1 +
sdks/python/apache_beam/transforms/window.py | 16 +-
sdks/python/apache_beam/utils/windowed_value.py | 8 +-
8 files changed, 972 insertions(+), 39 deletions(-)
diff --git a/sdks/python/apache_beam/io/fileio.py
b/sdks/python/apache_beam/io/fileio.py
index 10890ca..a1a7a58 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -23,17 +23,88 @@ and its metadata; and ``ReadMatches``, which takes in a
``PCollection`` of file
metadata records, and produces a ``PCollection`` of ``ReadableFile`` objects.
These transforms currently do not support splitting by themselves.
+Writing to Files
+================
+
+The transforms in this file include ``WriteToFiles``, which allows you to write
+a ``beam.PCollection`` to files, and gives you many options to customize how to
+do this.
+
+The ``WriteToFiles`` transform supports bounded and unbounded PCollections
+(i.e. it can be used both batch and streaming pipelines). For streaming
+pipelines, it currently does not have support for multiple trigger firings
+on the same window.
+
+File Naming
+-----------
+One of the parameters received by ``WriteToFiles`` is a function specifying how
+to name the files that are written. This is a function that takes in the
+following parameters:
+
+- window
+- pane
+- shard_index
+- total_shards
+- compression
+- destination
+
+It should return a file name that is unique for a combination of these
+parameters.
+
+The default naming strategy is to name files
+in the format
+`$prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix`,
+where:
+
+- `$prefix` is, by default, `"output"`.
+- `$start` and `$end` are the boundaries of the window for the data being
+ written. These are omitted if we're using the Global window.
+- `$pane` is the index for the number of firing for a window.
+- `$shard` and `$numShards` are the current shard number, and the total number
+ of shards for this window firing.
+- `$suffix` is, by default, an empty string, but it can be set by the user via
+ ``default_file_naming``.
+
+Dynamic Destinations
+--------------------
+If the elements in the input ``beam.PCollection`` can be partitioned into
groups
+that should be treated differently (e.g. some events are to be stored as CSV,
+while some others are to be stored as Avro files), it is possible to do this
+by passing a `destination` parameter to ``WriteToFiles``. Something like the
+following::
+
+ my_pcollection | beam.io.fileio.WriteToFiles(
+ path='/my/file/path',
+ destination=lambda record: 'avro' if record['type'] == 'A' else
'csv',
+ sink=lambda dest: AvroSink() if dest == 'avro' else CsvSink(),
+ file_naming=beam.io.fileio.destination_prefix_naming())
+
+In this transform, depending on the type of a record, it will be written down
to
+a destination named `'avro'`, or `'csv'`. The value returned by the
+`destination` call is then passed to the `sink` call, to determine what sort of
+sink will be used for each destination. The return type of the `destination`
+parameter can be anything, as long as elements can be grouped by it.
+
No backward compatibility guarantees. Everything in this module is
experimental.
"""
from __future__ import absolute_import
+import collections
+import logging
+import random
+import uuid
+
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import filesystem
from apache_beam.io import filesystems
from apache_beam.io.filesystem import BeamIOError
+from apache_beam.options.pipeline_options import GoogleCloudOptions
+from apache_beam.options.value_provider import StaticValueProvider
+from apache_beam.options.value_provider import ValueProvider
+from apache_beam.transforms.window import GlobalWindow
from apache_beam.utils.annotations import experimental
__all__ = ['EmptyMatchTreatment',
@@ -148,10 +219,11 @@ class ReadableFile(object):
self.metadata = metadata
def open(self, mime_type='text/plain'):
- return filesystems.FileSystems.open(self.metadata.path)
+ return filesystems.FileSystems.open(self.metadata.path,
+ mime_type=mime_type)
def read(self):
- return self.open().read()
+ return self.open('application/octet-stream').read()
def read_utf8(self):
return self.open().read().decode('utf-8')
@@ -170,3 +242,485 @@ class ReadMatches(beam.PTransform):
def expand(self, pcoll):
return pcoll | beam.ParDo(_ReadMatchesFn(self._compression,
self._skip_directories))
+
+
+class FileSink(object):
+ """Specifies how to write elements to individual files in ``WriteToFiles``.
+
+ **NOTE: THIS CLASS IS EXPERIMENTAL.**
+
+ A Sink class must implement the following:
+
+ - The ``open`` method, which initializes writing to a file handler (it is
not
+ responsible for opening the file handler itself).
+ - The ``write`` method, which writes an element to the file that was passed
+ in ``open``.
+ - The ``flush`` method, which flushes any buffered state. This is most often
+ called before closing a file (but not exclusively called in that
+ situation). The sink is not responsible for closing the file handler.
+ """
+
+ def open(self, fh):
+ raise NotImplementedError
+
+ def write(self, record):
+ raise NotImplementedError
+
+ def flush(self):
+ raise NotImplementedError
+
+
[email protected]_input_types(str)
+class TextSink(FileSink):
+ """A sink that encodes utf8 elements, and writes to file handlers.
+
+ **NOTE: THIS CLASS IS EXPERIMENTAL.**
+
+ This sink simply calls file_handler.write(record.encode('utf8') + '\n') on
all
+ records that come into it.
+ """
+
+ def open(self, fh):
+ self._fh = fh
+
+ def write(self, record):
+ self._fh.write(record.encode('utf8'))
+ self._fh.write(b'\n')
+
+ def flush(self):
+ self._fh.flush()
+
+
+def prefix_naming(prefix):
+ return default_file_naming(prefix)
+
+
+_DEFAULT_FILE_NAME_TEMPLATE = (
+ '{prefix}-{start}-{end}-{pane}-'
+ '{shard:05d}-{total_shards:05d}'
+ '{suffix}{compression}')
+
+
+def destination_prefix_naming():
+
+ def _inner(window, pane, shard_index, total_shards, compression,
destination):
+ kwargs = {'prefix': str(destination),
+ 'start': '',
+ 'end': '',
+ 'pane': '',
+ 'shard': 0,
+ 'total_shards': 0,
+ 'suffix': '',
+ 'compression': ''}
+ if total_shards is not None and shard_index is not None:
+ kwargs['shard'] = int(shard_index)
+ kwargs['total_shards'] = int(total_shards)
+
+ if window != GlobalWindow():
+ kwargs['start'] = window.start.to_utc_datetime().isoformat()
+ kwargs['end'] = window.end.to_utc_datetime().isoformat()
+
+ # TODO(BEAM-3759): Add support for PaneInfo
+ # If the PANE is the ONLY firing in the window, we don't add it.
+ #if pane and not (pane.is_first and pane.is_last):
+ # kwargs['pane'] = pane.index
+
+ if compression:
+ kwargs['compression'] = '.%s' % compression
+
+ return _DEFAULT_FILE_NAME_TEMPLATE.format(**kwargs)
+
+ return _inner
+
+
+def default_file_naming(prefix, suffix=None):
+
+ def _inner(window, pane, shard_index, total_shards, compression,
destination):
+ kwargs = {'prefix': prefix,
+ 'start': '',
+ 'end': '',
+ 'pane': '',
+ 'shard': 0,
+ 'total_shards': 0,
+ 'suffix': '',
+ 'compression': ''}
+ if total_shards is not None and shard_index is not None:
+ kwargs['shard'] = int(shard_index)
+ kwargs['total_shards'] = int(total_shards)
+
+ if window != GlobalWindow():
+ kwargs['start'] = window.start.to_utc_datetime().isoformat()
+ kwargs['end'] = window.end.to_utc_datetime().isoformat()
+
+ # TODO(pabloem): Add support for PaneInfo
+ # If the PANE is the ONLY firing in the window, we don't add it.
+ #if pane and not (pane.is_first and pane.is_last):
+ # kwargs['pane'] = pane.index
+
+ if compression:
+ kwargs['compression'] = '.%s' % compression
+ if suffix:
+ kwargs['suffix'] = suffix
+
+ return _DEFAULT_FILE_NAME_TEMPLATE.format(**kwargs)
+
+ return _inner
+
+
+_FileResult = collections.namedtuple('FileResult',
+ ['file_name',
+ 'shard_index',
+ 'total_shards',
+ 'window',
+ 'pane',
+ 'destination'])
+
+
+# Adding a class to contain PyDoc.
+class FileResult(_FileResult):
+ """A descriptor of a file that has been written."""
+ pass
+
+
+@experimental()
+class WriteToFiles(beam.PTransform):
+ """Write the incoming PCollection to a set of output files.
+
+ The incoming ``PCollection`` may be bounded or unbounded.
+
+ **Note:** For unbounded ``PCollection``s, this transform does not support
+ multiple firings per Window (due to the fact that files are named only by
+ their destination, and window, at the moment).
+ """
+
+ # We allow up to 20 different destinations to be written in a single bundle.
+ # Too many files will add memory pressure to the worker, so we let it be 20.
+ MAX_NUM_WRITERS_PER_BUNDLE = 20
+
+ DEFAULT_SHARDING = 5
+
+ def __init__(self,
+ path,
+ file_naming=None,
+ destination=None,
+ temp_directory=None,
+ sink=None,
+ shards=None,
+ output_fn=None,
+ max_writers_per_bundle=MAX_NUM_WRITERS_PER_BUNDLE):
+ """Initializes a WriteToFiles transform.
+
+ Args:
+ path (str, ValueProvider): The directory to write files into.
+ file_naming (callable): A callable that takes in a window, pane,
+ shard_index, total_shards and compression; and returns a file name.
+ destination (callable): If this argument is provided, the sink parameter
+ must also be a callable.
+ temp_directory (str, ValueProvider): To ensure atomicity in the
transform,
+ the output is written into temporary files, which are written to a
+ directory that is meant to be temporary as well. Once the whole output
+ has been written, the files are moved into their final destination, and
+ given their final names. By default, the temporary directory will be
+ within the temp_location of your pipeline.
+ sink (callable, FileSink): The sink to use to write into a file. It
should
+ implement the methods of a ``FileSink``. If none is provided, a
+ ``TextSink`` is used.
+ shards (int): The number of shards per destination and trigger firing.
+ max_writers_per_bundle (int): The number of writers that can be open
+ concurrently in a single worker that's processing one bundle.
+ """
+ self.path = (
+ path if isinstance(path, ValueProvider) else StaticValueProvider(str,
+ path))
+ self.file_naming_fn = file_naming or default_file_naming('output')
+ self.destination_fn = self._get_destination_fn(destination)
+ self._temp_directory = temp_directory
+ self.sink_fn = self._get_sink_fn(sink)
+ self.shards = shards or WriteToFiles.DEFAULT_SHARDING
+ self.output_fn = output_fn or (lambda x: x)
+
+ self._max_num_writers_per_bundle = max_writers_per_bundle
+
+ @staticmethod
+ def _get_sink_fn(input_sink):
+ if isinstance(input_sink, FileSink):
+ return lambda x: input_sink
+ elif callable(input_sink):
+ return input_sink
+ else:
+ return lambda x: TextSink()
+
+ @staticmethod
+ def _get_destination_fn(destination):
+ if isinstance(destination, ValueProvider):
+ return lambda elm: destination.get()
+ elif callable(destination):
+ return destination
+ else:
+ return lambda elm: destination
+
+ def expand(self, pcoll):
+ p = pcoll.pipeline
+
+ if not self._temp_directory:
+ temp_location = (
+ p.options.view_as(GoogleCloudOptions).temp_location
+ or self.path.get())
+ dir_uid = str(uuid.uuid4())
+ self._temp_directory = StaticValueProvider(
+ str,
+ filesystems.FileSystems.join(temp_location,
+ '.temp%s' % dir_uid))
+ logging.info('Added temporary directory %s', self._temp_directory.get())
+
+ output = (pcoll
+ | beam.ParDo(_WriteUnshardedRecordsFn(
+ base_path=self._temp_directory,
+ destination_fn=self.destination_fn,
+ sink_fn=self.sink_fn,
+ max_writers_per_bundle=self._max_num_writers_per_bundle))
+ .with_outputs(_WriteUnshardedRecordsFn.SPILLED_RECORDS,
+ _WriteUnshardedRecordsFn.WRITTEN_FILES))
+
+ written_files_pc = output[_WriteUnshardedRecordsFn.WRITTEN_FILES]
+ spilled_records_pc = output[_WriteUnshardedRecordsFn.SPILLED_RECORDS]
+
+ more_written_files_pc = (
+ spilled_records_pc
+ | beam.ParDo(_AppendShardedDestination(self.destination_fn,
+ self.shards))
+ | "GroupRecordsByDestinationAndShard" >> beam.GroupByKey()
+ | beam.ParDo(_WriteShardedRecordsFn(self._temp_directory,
+ self.sink_fn,
+ self.shards))
+ )
+
+ files_by_destination_pc = (
+ (written_files_pc, more_written_files_pc)
+ | beam.Flatten()
+ | beam.Map(lambda file_result: (file_result.destination, file_result))
+ | "GroupTempFilesByDestination" >> beam.GroupByKey())
+
+ # Now we should take the temporary files, and write them to the final
+ # destination, with their proper names.
+
+ file_results = (files_by_destination_pc
+ | beam.ParDo(
+ _MoveTempFilesIntoFinalDestinationFn(
+ self.path, self.file_naming_fn,
+ self._temp_directory)))
+
+ return file_results
+
+
+def _create_writer(base_path, writer_key):
+ try:
+ filesystems.FileSystems.mkdirs(base_path)
+ except IOError:
+ # Directory already exists.
+ pass
+
+ # The file name has a prefix determined by destination+window, along with
+ # a random string. This allows us to retrieve orphaned files later on.
+ file_name = '%s_%s' % (abs(hash(writer_key)), uuid.uuid4())
+ full_file_name = filesystems.FileSystems.join(base_path, file_name)
+ return full_file_name, filesystems.FileSystems.create(full_file_name)
+
+
+class _MoveTempFilesIntoFinalDestinationFn(beam.DoFn):
+
+ def __init__(self, path, file_naming_fn, temp_dir):
+ self.path = path
+ self.file_naming_fn = file_naming_fn
+ self.temporary_directory = temp_dir
+
+ def process(self,
+ element,
+ w=beam.DoFn.WindowParam):
+ destination = element[0]
+ file_results = list(element[1])
+
+ for i, r in enumerate(file_results):
+ # TODO(pabloem): Handle compression for files.
+ final_file_name = self.file_naming_fn(r.window,
+ r.pane,
+ i,
+ len(file_results),
+ '',
+ destination)
+
+ logging.info('Moving temporary file %s to dir: %s as %s. Res: %s',
+ r.file_name, self.path.get(), final_file_name, r)
+
+ final_full_path = filesystems.FileSystems.join(self.path.get(),
+ final_file_name)
+
+ # TODO(pabloem): Batch rename requests?
+ try:
+ filesystems.FileSystems.rename([r.file_name],
+ [final_full_path])
+ except BeamIOError:
+ # This error is not serious, because it may happen on a retry of the
+ # bundle. We simply log it.
+ logging.debug('File %s failed to be copied. This may be due to a
bundle'
+ ' being retried.', r.file_name)
+
+ yield FileResult(final_file_name,
+ i,
+ len(file_results),
+ r.window,
+ r.pane,
+ destination)
+
+ logging.info('Cautiously removing temporary files for'
+ ' destination %s and window %s', destination, w)
+ writer_key = (destination, w)
+ self._remove_temporary_files(writer_key)
+
+ def _remove_temporary_files(self, writer_key):
+ try:
+ prefix = filesystems.FileSystems.join(
+ self.temporary_directory.get(), str(abs(hash(writer_key))))
+ match_result = filesystems.FileSystems.match(['%s*' % prefix])
+ orphaned_files = [m.path for m in match_result[0].metadata_list]
+
+ logging.debug('Deleting orphaned files: %s', orphaned_files)
+ filesystems.FileSystems.delete(orphaned_files)
+ except BeamIOError as e:
+ logging.debug('Exceptions when deleting files: %s', e)
+
+
+class _WriteShardedRecordsFn(beam.DoFn):
+
+ def __init__(self, base_path, sink_fn, shards):
+ self.base_path = base_path
+ self.sink_fn = sink_fn
+ self.shards = shards
+
+ def process(self,
+ element,
+ w=beam.DoFn.WindowParam,
+ pane=beam.DoFn.PaneInfoParam):
+ destination_and_shard = element[0]
+ destination = destination_and_shard[0]
+ shard = destination_and_shard[1]
+ records = element[1]
+
+ full_file_name, writer = _create_writer(base_path=self.base_path.get(),
+ writer_key=(destination, w))
+ sink = self.sink_fn(destination)
+ sink.open(writer)
+
+ for r in records:
+ sink.write(r)
+
+ sink.flush()
+ writer.close()
+
+ logging.info('Writing file %s for destination %s and shard %s',
+ full_file_name, destination, repr(shard))
+
+ yield FileResult(full_file_name,
+ shard_index=shard,
+ total_shards=self.shards,
+ window=w,
+ pane=pane,
+ destination=destination)
+
+
+class _AppendShardedDestination(beam.DoFn):
+
+ def __init__(self, destination, shards):
+ self.destination_fn = destination
+ self.shards = shards
+
+ # We start the shards for a single destination at an arbitrary point.
+ self._shard_counter = collections.defaultdict(
+ lambda: random.randrange(self.shards))
+
+ def _next_shard_for_destination(self, destination):
+ self._shard_counter[destination] = (
+ (self._shard_counter[destination] + 1) % self.shards)
+
+ return self._shard_counter[destination]
+
+ def process(self, record):
+ destination = self.destination_fn(record)
+ shard = self._next_shard_for_destination(destination)
+
+ yield ((destination, shard), record)
+
+
+class _WriteUnshardedRecordsFn(beam.DoFn):
+
+ SPILLED_RECORDS = 'spilled_records'
+ WRITTEN_FILES = 'written_files'
+
+ def __init__(self,
+ base_path,
+ destination_fn,
+ sink_fn,
+ max_writers_per_bundle=WriteToFiles.MAX_NUM_WRITERS_PER_BUNDLE):
+ self.base_path = base_path
+ self.destination_fn = destination_fn
+ self.sink_fn = sink_fn
+ self.max_num_writers_per_bundle = max_writers_per_bundle
+
+ def start_bundle(self):
+ self._writers_and_sinks = {}
+ self._file_names = {}
+
+ def process(self,
+ record,
+ w=beam.DoFn.WindowParam,
+ pane=beam.DoFn.PaneInfoParam):
+ destination = self.destination_fn(record)
+
+ writer, sink = self._get_or_create_writer_and_sink(destination, w)
+
+ if not writer:
+ return [beam.pvalue.TaggedOutput(self.SPILLED_RECORDS, record)]
+ else:
+ sink.write(record)
+
+ def _get_or_create_writer_and_sink(self, destination, window):
+ """Returns a tuple of writer, sink."""
+ writer_key = (destination, window)
+
+ if writer_key in self._writers_and_sinks:
+ return self._writers_and_sinks.get(writer_key)
+ elif len(self._writers_and_sinks) >= self.max_num_writers_per_bundle:
+ # The writer does not exist, and we have too many writers already.
+ return None, None
+ else:
+ # The writer does not exist, but we can still create a new one.
+ full_file_name, writer = _create_writer(base_path=self.base_path.get(),
+ writer_key=writer_key)
+ sink = self.sink_fn(destination)
+
+ sink.open(writer)
+
+ self._writers_and_sinks[writer_key] = (writer, sink)
+ self._file_names[writer_key] = full_file_name
+ return self._writers_and_sinks[writer_key]
+
+ def finish_bundle(self):
+ for key, (writer, sink) in self._writers_and_sinks.items():
+
+ sink.flush()
+ writer.close()
+
+ file_result = FileResult(self._file_names[key],
+ shard_index=-1,
+ total_shards=0,
+ window=key[1],
+ pane=None, # TODO(pabloem): get the pane info
+ destination=key[0])
+
+ yield beam.pvalue.TaggedOutput(
+ self.WRITTEN_FILES,
+ beam.transforms.window.WindowedValue(
+ file_result,
+ timestamp=key[1].start,
+ windows=[key[1]] # TODO(pabloem) HOW DO WE GET THE PANE
+ ))
diff --git a/sdks/python/apache_beam/io/fileio_test.py
b/sdks/python/apache_beam/io/fileio_test.py
index 096149b..c533ef8 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -21,20 +21,38 @@ from __future__ import absolute_import
import csv
import io
+import json
import logging
import os
import sys
import unittest
+import uuid
+from hamcrest.library.text import stringmatches
from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.test_utils import compute_hash
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
+from apache_beam.testing.util import matches_all
+from apache_beam.transforms import trigger
+from apache_beam.transforms.window import FixedWindows
+from apache_beam.transforms.window import GlobalWindow
+
+
+def _get_file_reader(readable_file):
+ if sys.version_info >= (3, 0):
+ return io.TextIOWrapper(readable_file.open())
+ else:
+ return readable_file.open()
class MatchTest(_TestCaseWithTempDirCleanUp):
@@ -48,7 +66,9 @@ class MatchTest(_TestCaseWithTempDirCleanUp):
files.append(self._create_temp_file(dir=tempdir))
with TestPipeline() as p:
- files_pc = p | fileio.MatchFiles(tempdir) | beam.Map(lambda x: x.path)
+ files_pc = (p
+ | fileio.MatchFiles(FileSystems.join(tempdir, '*'))
+ | beam.Map(lambda x: x.path))
assert_that(files_pc, equal_to(files))
@@ -66,7 +86,7 @@ class MatchTest(_TestCaseWithTempDirCleanUp):
with TestPipeline() as p:
files_pc = (p
- | beam.Create(directories)
+ | beam.Create([FileSystems.join(d, '*') for d in
directories])
| fileio.MatchAll()
| beam.Map(lambda x: x.path))
@@ -85,7 +105,7 @@ class MatchTest(_TestCaseWithTempDirCleanUp):
with TestPipeline() as p:
files_pc = (
p
- | beam.Create(directories)
+ | beam.Create([FileSystems.join(d, '*') for d in directories])
| fileio.MatchAll(fileio.EmptyMatchTreatment.DISALLOW)
| beam.Map(lambda x: x.path))
@@ -103,7 +123,7 @@ class MatchTest(_TestCaseWithTempDirCleanUp):
with TestPipeline() as p:
files_pc = (
p
- | beam.Create(['%s*' % d for d in directories])
+ | beam.Create([FileSystems.join(d, '*') for d in directories])
| fileio.MatchAll(fileio.EmptyMatchTreatment.ALLOW_IF_WILDCARD)
| beam.Map(lambda x: x.path))
@@ -119,7 +139,7 @@ class ReadTest(_TestCaseWithTempDirCleanUp):
with TestPipeline() as p:
content_pc = (p
- | beam.Create([dir])
+ | beam.Create([FileSystems.join(dir, '*')])
| fileio.MatchAll()
| fileio.ReadMatches()
| beam.FlatMap(
@@ -134,18 +154,12 @@ class ReadTest(_TestCaseWithTempDirCleanUp):
dir = '%s%s' % (self._new_tempdir(), os.sep)
self._create_temp_file(dir=dir, content=content)
- def get_csv_reader(readable_file):
- if sys.version_info >= (3, 0):
- return csv.reader(io.TextIOWrapper(readable_file.open()))
- else:
- return csv.reader(readable_file.open())
-
with TestPipeline() as p:
content_pc = (p
- | beam.Create([dir])
+ | beam.Create([FileSystems.join(dir, '*')])
| fileio.MatchAll()
| fileio.ReadMatches()
- | beam.FlatMap(get_csv_reader))
+ | beam.FlatMap(lambda rf:
csv.reader(_get_file_reader(rf))))
assert_that(content_pc, equal_to(rows))
@@ -160,7 +174,7 @@ class ReadTest(_TestCaseWithTempDirCleanUp):
with TestPipeline() as p:
contents_pc = (p
- | beam.Create(files + [tempdir])
+ | beam.Create(files + ['%s/' % tempdir])
| fileio.ReadMatches()
| beam.FlatMap(
lambda x: x.read().decode('utf-8').splitlines()))
@@ -179,7 +193,7 @@ class ReadTest(_TestCaseWithTempDirCleanUp):
with self.assertRaises(beam.io.filesystem.BeamIOError):
with TestPipeline() as p:
_ = (p
- | beam.Create(files + [tempdir])
+ | beam.Create(files + ['%s/' % tempdir])
| fileio.ReadMatches(skip_directories=False)
| beam.Map(lambda x: x.read_utf8()))
@@ -233,6 +247,336 @@ class MatchIntegrationTest(unittest.TestCase):
label='Assert Checksums')
+class WriteFilesTest(_TestCaseWithTempDirCleanUp):
+
+ SIMPLE_COLLECTION = [
+ {'project': 'beam', 'foundation': 'apache'},
+ {'project': 'prometheus', 'foundation': 'cncf'},
+ {'project': 'flink', 'foundation': 'apache'},
+ {'project': 'grpc', 'foundation': 'cncf'},
+ {'project': 'spark', 'foundation': 'apache'},
+ {'project': 'kubernetes', 'foundation': 'cncf'},
+ {'project': 'spark', 'foundation': 'apache'},
+ {'project': 'knative', 'foundation': 'cncf'},
+ {'project': 'linux', 'foundation': 'linux'},
+ ]
+
+ LARGER_COLLECTION = ['{:05d}'.format(i) for i in range(200)]
+
+ CSV_HEADERS = ['project', 'foundation']
+
+ SIMPLE_COLLECTION_VALIDATION_SET = set([
+ (elm['project'], elm['foundation']) for elm in SIMPLE_COLLECTION])
+
+ class CsvSink(fileio.TextSink):
+ def __init__(self, headers):
+ self.headers = headers
+
+ def write(self, record):
+ self._fh.write(','.join([record[h] for h in
self.headers]).encode('utf8'))
+ self._fh.write('\n'.encode('utf8'))
+
+ class JsonSink(fileio.TextSink):
+
+ def write(self, record):
+ self._fh.write(json.dumps(record).encode('utf8'))
+ self._fh.write('\n'.encode('utf8'))
+
+ def test_write_to_single_file_batch(self):
+
+ dir = self._new_tempdir()
+
+ with TestPipeline() as p:
+ _ = (p
+ | beam.Create(WriteFilesTest.SIMPLE_COLLECTION)
+ | "Serialize" >> beam.Map(json.dumps)
+ | beam.io.fileio.WriteToFiles(path=dir))
+
+ with TestPipeline() as p:
+ result = (p
+ | fileio.MatchFiles(FileSystems.join(dir, '*'))
+ | fileio.ReadMatches()
+ | beam.FlatMap(lambda f: f.read_utf8().strip().split('\n')))
+
+ assert_that(result,
+ equal_to([json.dumps(row) for row in
self.SIMPLE_COLLECTION]))
+
+ def test_write_to_different_file_types_some_spilling(self):
+
+ dir = self._new_tempdir()
+
+ with TestPipeline() as p:
+ _ = (p
+ | beam.Create(WriteFilesTest.SIMPLE_COLLECTION)
+ | beam.io.fileio.WriteToFiles(
+ path=dir,
+ destination=lambda record: record['foundation'],
+ sink=lambda dest: (
+ WriteFilesTest.CsvSink(WriteFilesTest.CSV_HEADERS)
+ if dest == 'apache' else WriteFilesTest.JsonSink()),
+ file_naming=fileio.destination_prefix_naming(),
+ max_writers_per_bundle=1))
+
+ with TestPipeline() as p:
+ cncf_res = (p
+ | fileio.MatchFiles(FileSystems.join(dir, 'cncf*'))
+ | fileio.ReadMatches()
+ | beam.FlatMap(lambda f: f.read_utf8().strip().split('\n')))
+
+ apache_res = (p
+ | "MatchApache" >> fileio.MatchFiles(
+ FileSystems.join(dir, 'apache*'))
+ | "ReadApache" >> fileio.ReadMatches()
+ | "MapApache" >> beam.FlatMap(
+ lambda rf: csv.reader(_get_file_reader(rf))))
+
+ assert_that(cncf_res,
+ equal_to([json.dumps(row)
+ for row in self.SIMPLE_COLLECTION
+ if row['foundation'] == 'cncf']),
+ label='verifyCNCF')
+
+ assert_that(apache_res,
+ equal_to([[row['project'], row['foundation']]
+ for row in self.SIMPLE_COLLECTION
+ if row['foundation'] == 'apache']),
+ label='verifyApache')
+
+ def test_find_orphaned_files(self):
+ dir = self._new_tempdir()
+
+ write_transform = beam.io.fileio.WriteToFiles(path=dir)
+
+ def write_orphaned_file(temp_dir, writer_key):
+ temp_dir_path = FileSystems.join(dir, temp_dir)
+
+ file_prefix_dir = FileSystems.join(
+ temp_dir_path,
+ str(abs(hash(writer_key))))
+
+ file_name = '%s_%s' % (file_prefix_dir, uuid.uuid4())
+ with FileSystems.create(file_name) as f:
+ f.write(b'Hello y\'all')
+
+ return file_name
+
+ with TestPipeline() as p:
+ _ = (p
+ | beam.Create(WriteFilesTest.SIMPLE_COLLECTION)
+ | "Serialize" >> beam.Map(json.dumps)
+ | write_transform)
+
+ # Pre-create the temp directory.
+ temp_dir_path = FileSystems.mkdirs(FileSystems.join(
+ dir, write_transform._temp_directory.get()))
+ write_orphaned_file(write_transform._temp_directory.get(),
+ (None, GlobalWindow()))
+ f2 = write_orphaned_file(write_transform._temp_directory.get(),
+ ('other-dest', GlobalWindow()))
+
+ temp_dir_path = FileSystems.join(dir,
write_transform._temp_directory.get())
+ leftovers = FileSystems.match(['%s%s*' % (temp_dir_path, os.sep)])
+ found_files = [m.path for m in leftovers[0].metadata_list]
+ self.assertListEqual(found_files, [f2])
+
+ def test_write_to_different_file_types(self):
+
+ dir = self._new_tempdir()
+
+ with TestPipeline() as p:
+ _ = (p
+ | beam.Create(WriteFilesTest.SIMPLE_COLLECTION)
+ | beam.io.fileio.WriteToFiles(
+ path=dir,
+ destination=lambda record: record['foundation'],
+ sink=lambda dest: (
+ WriteFilesTest.CsvSink(WriteFilesTest.CSV_HEADERS)
+ if dest == 'apache' else WriteFilesTest.JsonSink()),
+ file_naming=fileio.destination_prefix_naming()))
+
+ with TestPipeline() as p:
+ cncf_res = (p
+ | fileio.MatchFiles(FileSystems.join(dir, 'cncf*'))
+ | fileio.ReadMatches()
+ | beam.FlatMap(lambda f: f.read_utf8().strip().split('\n')))
+
+ apache_res = (p
+ | "MatchApache" >> fileio.MatchFiles(
+ FileSystems.join(dir, 'apache*'))
+ | "ReadApache" >> fileio.ReadMatches()
+ | "MapApache" >> beam.FlatMap(
+ lambda rf: csv.reader(_get_file_reader(rf))))
+
+ assert_that(cncf_res,
+ equal_to([json.dumps(row)
+ for row in self.SIMPLE_COLLECTION
+ if row['foundation'] == 'cncf']),
+ label='verifyCNCF')
+
+ assert_that(apache_res,
+ equal_to([[row['project'], row['foundation']]
+ for row in self.SIMPLE_COLLECTION
+ if row['foundation'] == 'apache']),
+ label='verifyApache')
+
+ def record_dofn(self):
+ class RecordDoFn(beam.DoFn):
+ def process(self, element):
+ WriteFilesTest.all_records.append(element)
+
+ return RecordDoFn()
+
+ def test_streaming_complex_timing(self):
+ # Use state on the TestCase class, since other references would be pickled
+ # into a closure and not have the desired side effects.
+ #
+ # TODO(BEAM-5295): Use assert_that after it works for the cases here in
+ # streaming mode.
+ WriteFilesTest.all_records = []
+
+ dir = self._new_tempdir()
+
+ # Setting up the input (TestStream)
+ ts = TestStream().advance_watermark_to(0)
+ for elm in WriteFilesTest.LARGER_COLLECTION:
+ timestamp = int(elm)
+
+ ts.add_elements([('key', '%s' % elm)])
+ if timestamp % 5 == 0 and timestamp != 0:
+ # TODO(BEAM-3759): Add many firings per window after getting PaneInfo.
+ ts.advance_processing_time(5)
+ ts.advance_watermark_to(timestamp)
+
+ # The pipeline that we are testing
+ options = PipelineOptions()
+ options.view_as(StandardOptions).streaming = True
+ with TestPipeline(options=options) as p:
+ res = (p
+ | ts
+ | beam.WindowInto(
+ FixedWindows(10),
+ trigger=trigger.AfterWatermark(),
+ accumulation_mode=trigger.AccumulationMode.DISCARDING)
+ | beam.GroupByKey()
+ | beam.FlatMap(lambda x: x[1]))
+ # Triggering after 5 processing-time seconds, and on the watermark. Also
+ # discarding old elements.
+
+ _ = (res
+ | beam.io.fileio.WriteToFiles(path=dir,
+ max_writers_per_bundle=0)
+ | beam.Map(lambda fr: FileSystems.join(dir, fr.file_name))
+ | beam.ParDo(self.record_dofn()))
+
+ # Verification pipeline
+ with TestPipeline() as p:
+ files = (p | beam.io.fileio.MatchFiles(FileSystems.join(dir, '*')))
+
+ file_names = (files | beam.Map(lambda fm: fm.path))
+
+ file_contents = (
+ files
+ | beam.io.fileio.ReadMatches()
+ | beam.Map(lambda rf: (rf.metadata.path,
+ rf.read_utf8().strip().split('\n'))))
+
+ content = (file_contents
+ | beam.FlatMap(lambda fc: [ln.strip() for ln in fc[1]]))
+
+ assert_that(file_names, equal_to(WriteFilesTest.all_records),
+ label='AssertFilesMatch')
+ assert_that(content, matches_all(WriteFilesTest.LARGER_COLLECTION),
+ label='AssertContentsMatch')
+
+ def test_streaming_different_file_types(self):
+ dir = self._new_tempdir()
+ input = iter(WriteFilesTest.SIMPLE_COLLECTION)
+ ts = (TestStream()
+ .advance_watermark_to(0)
+ .add_elements([next(input), next(input)])
+ .advance_watermark_to(10)
+ .add_elements([next(input), next(input)])
+ .advance_watermark_to(20)
+ .add_elements([next(input), next(input)])
+ .advance_watermark_to(30)
+ .add_elements([next(input), next(input)])
+ .advance_watermark_to(40))
+
+ with TestPipeline() as p:
+ _ = (p
+ | ts
+ | beam.WindowInto(FixedWindows(10))
+ | beam.io.fileio.WriteToFiles(
+ path=dir,
+ destination=lambda record: record['foundation'],
+ sink=lambda dest: (
+ WriteFilesTest.CsvSink(WriteFilesTest.CSV_HEADERS)
+ if dest == 'apache' else WriteFilesTest.JsonSink()),
+ file_naming=fileio.destination_prefix_naming(),
+ max_writers_per_bundle=0,
+ ))
+
+ with TestPipeline() as p:
+ cncf_files = (p
+ | fileio.MatchFiles(FileSystems.join(dir, 'cncf*'))
+ | "CncfFileNames" >> beam.Map(lambda fm: fm.path))
+
+ apache_files = (p
+ | "MatchApache" >> fileio.MatchFiles(
+ FileSystems.join(dir, 'apache*'))
+ | "ApacheFileNames" >> beam.Map(lambda fm: fm.path))
+
+ assert_that(cncf_files,
+ matches_all([
+ stringmatches.matches_regexp(
+ FileSystems.join(
+ dir,
+
'cncf-1970-01-01T00:00:00-1970-01-01T00:00:10--.*'
+ )
+ ),
+ stringmatches.matches_regexp(
+ FileSystems.join(
+ dir,
+
'cncf-1970-01-01T00:00:10-1970-01-01T00:00:20--.*'
+ )
+ ),
+ stringmatches.matches_regexp(
+ FileSystems.join(
+ dir,
+
'cncf-1970-01-01T00:00:20-1970-01-01T00:00:30--.*'
+ )
+ ),
+ stringmatches.matches_regexp(
+ FileSystems.join(
+ dir,
+
'cncf-1970-01-01T00:00:30-1970-01-01T00:00:40--.*'
+ )
+ )
+ ]),
+ label='verifyCNCFFiles')
+
+ assert_that(apache_files,
+ matches_all([
+ stringmatches.matches_regexp(FileSystems.join(
+ dir,
+ 'apache-1970-01-01T00:00:00-1970-01-01T00:00:10--.*')
+ ),
+ stringmatches.matches_regexp(FileSystems.join(
+ dir,
+ 'apache-1970-01-01T00:00:10-1970-01-01T00:00:20--.*')
+ ),
+ stringmatches.matches_regexp(FileSystems.join(
+ dir,
+ 'apache-1970-01-01T00:00:20-1970-01-01T00:00:30--.*')
+ ),
+ stringmatches.matches_regexp(FileSystems.join(
+ dir,
+ 'apache-1970-01-01T00:00:30-1970-01-01T00:00:40--.*')
+ )
+ ]),
+ label='verifyApacheFiles')
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
diff --git a/sdks/python/apache_beam/io/filesystem.py
b/sdks/python/apache_beam/io/filesystem.py
index cfdf472..efc745a 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -400,8 +400,7 @@ class CompressedFile(object):
class FileMetadata(object):
- """Metadata about a file path that is the output of FileSystem.match
- """
+ """Metadata about a file path that is the output of FileSystem.match."""
def __init__(self, path, size_in_bytes):
assert isinstance(path, (str, unicode)) and path, "Path should be a string"
assert isinstance(size_in_bytes, (int, long)) and size_in_bytes >= 0, \
@@ -430,7 +429,7 @@ class FileMetadata(object):
class MatchResult(object):
"""Result from the ``FileSystem`` match operation which contains the list
- of matched FileMetadata.
+ of matched ``FileMetadata``.
"""
def __init__(self, pattern, metadata_list):
self.metadata_list = metadata_list
diff --git a/sdks/python/apache_beam/testing/util.py
b/sdks/python/apache_beam/testing/util.py
index 3099b0f..2e38d37 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -25,6 +25,9 @@ import io
import tempfile
from builtins import object
+from hamcrest.core import assert_that as hamcrest_assert
+from hamcrest.library.collection import contains_inanyorder
+
from apache_beam import pvalue
from apache_beam.transforms import window
from apache_beam.transforms.core import Create
@@ -41,6 +44,7 @@ __all__ = [
'equal_to',
'is_empty',
'is_not_empty',
+ 'matches_all',
# open_shards is internal and has no backwards compatibility guarantees.
'open_shards',
'TestWindowedValue',
@@ -142,6 +146,21 @@ def equal_to(expected):
return _equal
+def matches_all(expected):
+ """Matcher used by assert_that to check a set of matchers.
+
+ Args:
+ expected: A list of elements or hamcrest matchers to be used to match
+ the elements of a single PCollection.
+ """
+ def _matches(actual):
+ expected_list = list(expected)
+
+ hamcrest_assert(actual, contains_inanyorder(*expected_list))
+
+ return _matches
+
+
def is_empty():
def _empty(actual):
actual = list(actual)
diff --git a/sdks/python/apache_beam/transforms/core.py
b/sdks/python/apache_beam/transforms/core.py
index ca1fb46..ead094b 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -420,11 +420,12 @@ class DoFn(WithTypeHints, HasDisplayData,
urns.RunnerApiFn):
SideInputParam = _DoFnParam('SideInputParam')
TimestampParam = _DoFnParam('TimestampParam')
WindowParam = _DoFnParam('WindowParam')
+ PaneInfoParam = _DoFnParam('PaneInfoParam')
WatermarkReporterParam = _DoFnParam('WatermarkReporterParam')
BundleFinalizerParam = _BundleFinalizerParam
DoFnProcessParams = [ElementParam, SideInputParam, TimestampParam,
- WindowParam, WatermarkReporterParam,
+ WindowParam, WatermarkReporterParam, PaneInfoParam,
BundleFinalizerParam]
# Parameters to access state and timers. Not restricted to use only in the
@@ -2065,11 +2066,15 @@ class WindowInto(ParDo):
new_windows = self.windowing.windowfn.assign(context)
yield WindowedValue(element, context.timestamp, new_windows)
- def __init__(self, windowfn, **kwargs):
+ def __init__(self,
+ windowfn,
+ trigger=None,
+ accumulation_mode=None,
+ timestamp_combiner=None):
"""Initializes a WindowInto transform.
Args:
- windowfn: Function to be used for windowing
+ windowfn (Windowing, WindowFn): Function to be used for windowing.
trigger: (optional) Trigger used for windowing, or None for default.
accumulation_mode: (optional) Accumulation mode used for windowing,
required for non-trivial triggers.
@@ -2080,19 +2085,14 @@ class WindowInto(ParDo):
# Overlay windowing with kwargs.
windowing = windowfn
windowfn = windowing.windowfn
- # Use windowing to fill in defaults for kwargs.
- kwargs = dict(dict(
- trigger=windowing.triggerfn,
- accumulation_mode=windowing.accumulation_mode,
- timestamp_combiner=windowing.timestamp_combiner), **kwargs)
- # Use kwargs to simulate keyword-only arguments.
- triggerfn = kwargs.pop('trigger', None)
- accumulation_mode = kwargs.pop('accumulation_mode', None)
- timestamp_combiner = kwargs.pop('timestamp_combiner', None)
- if kwargs:
- raise ValueError('Unexpected keyword arguments: %s' % list(kwargs))
+
+ # Use windowing to fill in defaults for the extra arguments.
+ trigger = trigger or windowing.triggerfn
+ accumulation_mode = accumulation_mode or windowing.accumulation_mode
+ timestamp_combiner = timestamp_combiner or windowing.timestamp_combiner
+
self.windowing = Windowing(
- windowfn, triggerfn, accumulation_mode, timestamp_combiner)
+ windowfn, trigger, accumulation_mode, timestamp_combiner)
super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing))
def get_windowing(self, unused_inputs):
diff --git a/sdks/python/apache_beam/transforms/trigger.py
b/sdks/python/apache_beam/transforms/trigger.py
index b0d9a25..ddf4c24 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -297,6 +297,7 @@ class AfterProcessingTime(TriggerFn):
"""
def __init__(self, delay=0):
+ """Initialize a processing time trigger with a delay in seconds."""
self.delay = delay
def __repr__(self):
diff --git a/sdks/python/apache_beam/transforms/window.py
b/sdks/python/apache_beam/transforms/window.py
index 1990532..e477303 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -361,12 +361,22 @@ class FixedWindows(NonMergingWindowFn):
Attributes:
size: Size of the window as seconds.
- offset: Offset of this window as seconds since Unix epoch. Windows start at
- t=N * size + offset where t=0 is the epoch. The offset must be a value
- in range [0, size). If it is not it will be normalized to this range.
+ offset: Offset of this window as seconds. Windows start at
+ t=N * size + offset where t=0 is the UNIX epoch. The offset must be a
+ value in range [0, size). If it is not it will be normalized to this
+ range.
"""
def __init__(self, size, offset=0):
+ """Initialize a ``FixedWindows`` function for a given size and offset.
+
+ Args:
+ size (int): Size of the window in seconds.
+ offset(int): Offset of this window as seconds. Windows start at
+ t=N * size + offset where t=0 is the UNIX epoch. The offset must be a
+ value in range [0, size). If it is not it will be normalized to this
+ range.
+ """
if size <= 0:
raise ValueError('The size parameter must be strictly positive.')
self.size = Duration.of(size)
diff --git a/sdks/python/apache_beam/utils/windowed_value.py
b/sdks/python/apache_beam/utils/windowed_value.py
index a0b6622..5570c45 100644
--- a/sdks/python/apache_beam/utils/windowed_value.py
+++ b/sdks/python/apache_beam/utils/windowed_value.py
@@ -46,7 +46,13 @@ class PaneInfoTiming(object):
class PaneInfo(object):
- """Describes the trigger firing information for a given WindowedValue."""
+ """Describes the trigger firing information for a given WindowedValue.
+
+ "Panes" represent individual firings on a single window. ``PaneInfo``s are
+ passed downstream after trigger firings. They contain information about
+ whether it's an early/on time/late firing, if it's the last or first firing
+ from a window, and the index of the firing.
+ """
def __init__(self, is_first, is_last, timing, index, nonspeculative_index):
self._is_first = is_first