Repository: beam Updated Branches: refs/heads/master 395d14e33 -> 9575694ca
[BEAM-1283] Finish bundle should only emit windowed values Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bf88605 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bf88605 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bf88605 Branch: refs/heads/master Commit: 3bf886056680ca50e11d36dc0da402bb0196a7c7 Parents: 395d14e Author: Sourabh Bajaj <[email protected]> Authored: Mon May 1 15:00:24 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Tue May 9 09:48:03 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/snippets/snippets_test.py | 10 ++++++++-- sdks/python/apache_beam/io/iobase.py | 4 +++- sdks/python/apache_beam/runners/common.py | 13 ++----------- sdks/python/apache_beam/transforms/ptransform_test.py | 8 +++++--- 4 files changed, 18 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3bf88605/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 0148096..da0a962 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -33,6 +33,7 @@ from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.examples.snippets import snippets +from apache_beam.utils.windowed_value import WindowedValue # pylint: disable=expression-not-assigned from apache_beam.test_pipeline import TestPipeline @@ -366,6 +367,7 @@ class TypeHintsTest(unittest.TestCase): class SnippetsTest(unittest.TestCase): # Replacing text read/write transforms with dummy transforms for testing. + class DummyReadTransform(beam.PTransform): """A transform that will replace iobase.ReadFromText. @@ -387,16 +389,20 @@ class SnippetsTest(unittest.TestCase): pass def finish_bundle(self): + from apache_beam.transforms import window + assert self.file_to_read for file_name in glob.glob(self.file_to_read): if self.compression_type is None: with open(file_name) as file: for record in file: - yield self.coder.decode(record.rstrip('\n')) + value = self.coder.decode(record.rstrip('\n')) + yield WindowedValue(value, -1, [window.GlobalWindow()]) else: with gzip.open(file_name, 'r') as file: for record in file: - yield self.coder.decode(record.rstrip('\n')) + value = self.coder.decode(record.rstrip('\n')) + yield WindowedValue(value, -1, [window.GlobalWindow()]) def expand(self, pcoll): return pcoll | beam.Create([None]) | 'DummyReadForTesting' >> beam.ParDo( http://git-wip-us.apache.org/repos/asf/beam/blob/3bf88605/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 312542a..d47ef5b 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -44,6 +44,7 @@ from apache_beam.transforms import ptransform from apache_beam.transforms import window from apache_beam.transforms.display import HasDisplayData from apache_beam.transforms.display import DisplayDataItem +from apache_beam.utils.windowed_value import WindowedValue # Encapsulates information about a bundle of a source generated when method @@ -931,7 +932,8 @@ class _WriteBundleDoFn(core.DoFn): def finish_bundle(self): if self.writer is not None: - yield window.TimestampedValue(self.writer.close(), window.MAX_TIMESTAMP) + yield WindowedValue(self.writer.close(), window.MAX_TIMESTAMP, + [window.GlobalWindow()]) class _WriteKeyedBundleDoFn(core.DoFn): http://git-wip-us.apache.org/repos/asf/beam/blob/3bf88605/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 74c61ab..ec1f5dc 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -487,18 +487,9 @@ class OutputProcessor(object): if isinstance(result, WindowedValue): windowed_value = result - elif isinstance(result, TimestampedValue): - value = result.value - timestamp = result.timestamp - assign_context = NoContext(value, timestamp) - windowed_value = WindowedValue( - value, timestamp, self.window_fn.assign(assign_context)) else: - value = result - timestamp = -1 - assign_context = NoContext(value) - windowed_value = WindowedValue( - value, timestamp, self.window_fn.assign(assign_context)) + raise RuntimeError('Finish Bundle should only output WindowedValue ' +\ + 'type but got %s' % type(result)) if tag is None: self.main_receivers.receive(windowed_value) http://git-wip-us.apache.org/repos/asf/beam/blob/3bf88605/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index e712661..5948460 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -30,8 +30,10 @@ import apache_beam as beam from apache_beam.metrics import Metrics from apache_beam.metrics.metric import MetricsFilter from apache_beam.io.iobase import Read -from apache_beam.test_pipeline import TestPipeline +from apache_beam.options.pipeline_options import TypeOptions import apache_beam.pvalue as pvalue +from apache_beam.test_pipeline import TestPipeline +from apache_beam.transforms import window import apache_beam.transforms.combiners as combine from apache_beam.transforms.display import DisplayData, DisplayDataItem from apache_beam.transforms.ptransform import PTransform @@ -40,7 +42,7 @@ import apache_beam.typehints as typehints from apache_beam.typehints import with_input_types from apache_beam.typehints import with_output_types from apache_beam.typehints.typehints_test import TypeHintTestCase -from apache_beam.options.pipeline_options import TypeOptions +from apache_beam.utils.windowed_value import WindowedValue # Disable frequent lint warning due to pipe operator for chaining transforms. @@ -280,7 +282,7 @@ class PTransformTest(unittest.TestCase): pass def finish_bundle(self): - yield 'finish' + yield WindowedValue('finish', -1, [window.GlobalWindow()]) pipeline = TestPipeline() pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3])
