Rename SideOutputValue to OutputValue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3418863f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3418863f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3418863f Branch: refs/heads/master Commit: 3418863fce834c458fb13fc82baa1c7c660030ef Parents: 22d84c9 Author: Thomas Groh <[email protected]> Authored: Mon Apr 17 17:59:10 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Tue Apr 18 13:22:25 2017 -0700 ---------------------------------------------------------------------- .../examples/cookbook/multiple_output_pardo.py | 47 ++++++++++---------- .../examples/snippets/snippets_test.py | 30 ++++++------- sdks/python/apache_beam/pvalue.py | 10 ++--- sdks/python/apache_beam/runners/common.pxd | 2 +- sdks/python/apache_beam/runners/common.py | 6 +-- .../runners/dataflow/dataflow_runner.py | 8 ++-- .../consumer_tracking_pipeline_visitor_test.py | 2 +- .../runners/direct/transform_evaluator.py | 12 ++--- sdks/python/apache_beam/runners/runner.py | 4 +- sdks/python/apache_beam/transforms/core.py | 8 ++-- .../apache_beam/transforms/ptransform_test.py | 18 ++++---- sdks/python/apache_beam/typehints/typecheck.py | 4 +- 12 files changed, 76 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py index 978e4ed..b324ed1 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py @@ -18,9 +18,9 @@ """A workflow demonstrating a DoFn with multiple outputs. -DoFns may produce a main output and additional side outputs. These side outputs -are marked with a tag at output time and later the same tag will be used to get -the corresponding result (a PCollection) for that side output. +DoFns may produce multiple outputs. Outputs that are not the default ("main") +output are marked with a tag at output time and later the same tag will be used +to get the corresponding result (a PCollection) for that output. This is a slightly modified version of the basic wordcount example. In this example words are divided into 2 buckets as shorts words (3 characters in length @@ -68,43 +68,44 @@ class SplitLinesToWordsFn(beam.DoFn): This transform will have 3 outputs: - main output: all words that are longer than 3 characters. - - short words side output: all other words. - - character count side output: Number of characters in each processed line. + - short words output: all other words. + - character count output: Number of characters in each processed line. """ - # These tags will be used to tag the side outputs of this DoFn. - SIDE_OUTPUT_TAG_SHORT_WORDS = 'tag_short_words' - SIDE_OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count' + # These tags will be used to tag the outputs of this DoFn. + OUTPUT_TAG_SHORT_WORDS = 'tag_short_words' + OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count' def process(self, element): - """Receives a single element (a line) and produces words and side outputs. + """Receives a single element (a line) and produces words and character + counts. Important things to note here: - For a single element you may produce multiple main outputs: words of a single line. - - For that same input you may produce multiple side outputs, along with - multiple main outputs. - - Side outputs may have different types (count) or may share the same type + - For that same input you may produce multiple outputs, potentially + across multiple PCollections + - Outputs may have different types (count) or may share the same type (words) as with the main output. Args: element: processing element. Yields: - words as main output, short words as side output, line character count as - side output. + words as main output, short words as tagged output, line character count + as tagged output. """ - # yield a count (integer) to the SIDE_OUTPUT_TAG_CHARACTER_COUNT tagged + # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged # collection. - yield pvalue.SideOutputValue(self.SIDE_OUTPUT_TAG_CHARACTER_COUNT, - len(element)) + yield pvalue.OutputValue(self.OUTPUT_TAG_CHARACTER_COUNT, + len(element)) words = re.findall(r'[A-Za-z\']+', element) for word in words: if len(word) <= 3: - # yield word as a side output to the SIDE_OUTPUT_TAG_SHORT_WORDS tagged + # yield word as an output to the OUTPUT_TAG_SHORT_WORDS tagged # collection. - yield pvalue.SideOutputValue(self.SIDE_OUTPUT_TAG_SHORT_WORDS, word) + yield pvalue.OutputValue(self.OUTPUT_TAG_SHORT_WORDS, word) else: # yield word to add it to the main collection. yield word @@ -144,18 +145,18 @@ def run(argv=None): lines = p | ReadFromText(known_args.input) - # with_outputs allows accessing the side outputs of a DoFn. + # with_outputs allows accessing the explicitly tagged outputs of a DoFn. split_lines_result = (lines | beam.ParDo(SplitLinesToWordsFn()).with_outputs( - SplitLinesToWordsFn.SIDE_OUTPUT_TAG_SHORT_WORDS, - SplitLinesToWordsFn.SIDE_OUTPUT_TAG_CHARACTER_COUNT, + SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS, + SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT, main='words')) # split_lines_result is an object of type DoOutputsTuple. It supports # accessing result in alternative ways. words, _, _ = split_lines_result short_words = split_lines_result[ - SplitLinesToWordsFn.SIDE_OUTPUT_TAG_SHORT_WORDS] + SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS] character_count = split_lines_result.tag_character_count # pylint: disable=expression-not-assigned http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index c3984bb..2aee350 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -176,8 +176,8 @@ class ParDoTest(unittest.TestCase): # [END model_pardo_side_input_dofn] self.assertEqual({'a', 'bb', 'ccc'}, set(small_words)) - def test_pardo_with_side_outputs(self): - # [START model_pardo_emitting_values_on_side_outputs] + def test_pardo_with_tagged_outputs(self): + # [START model_pardo_emitting_values_on_tagged_outputs] class ProcessWords(beam.DoFn): def process(self, element, cutoff_length, marker): @@ -185,48 +185,48 @@ class ParDoTest(unittest.TestCase): # Emit this short word to the main output. yield element else: - # Emit this word's long length to a side output. - yield pvalue.SideOutputValue( + # Emit this word's long length to the 'above_cutoff_lengths' output. + yield pvalue.OutputValue( 'above_cutoff_lengths', len(element)) if element.startswith(marker): - # Emit this word to a different side output. - yield pvalue.SideOutputValue('marked strings', element) - # [END model_pardo_emitting_values_on_side_outputs] + # Emit this word to a different output with the 'marked strings' tag. + yield pvalue.OutputValue('marked strings', element) + # [END model_pardo_emitting_values_on_tagged_outputs] words = ['a', 'an', 'the', 'music', 'xyz'] - # [START model_pardo_with_side_outputs] + # [START model_pardo_with_tagged_outputs] results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x') .with_outputs('above_cutoff_lengths', 'marked strings', main='below_cutoff_strings')) below = results.below_cutoff_strings above = results.above_cutoff_lengths marked = results['marked strings'] # indexing works as well - # [END model_pardo_with_side_outputs] + # [END model_pardo_with_tagged_outputs] self.assertEqual({'a', 'an'}, set(below)) self.assertEqual({3, 5}, set(above)) self.assertEqual({'xyz'}, set(marked)) - # [START model_pardo_with_side_outputs_iter] + # [START model_pardo_with_tagged_outputs_iter] below, above, marked = (words | beam.ParDo( ProcessWords(), cutoff_length=2, marker='x') .with_outputs('above_cutoff_lengths', 'marked strings', main='below_cutoff_strings')) - # [END model_pardo_with_side_outputs_iter] + # [END model_pardo_with_tagged_outputs_iter] self.assertEqual({'a', 'an'}, set(below)) self.assertEqual({3, 5}, set(above)) self.assertEqual({'xyz'}, set(marked)) - def test_pardo_with_undeclared_side_outputs(self): + def test_pardo_with_undeclared_outputs(self): numbers = [1, 2, 3, 4, 5, 10, 20] - # [START model_pardo_with_side_outputs_undeclared] + # [START model_pardo_with_undeclared_outputs] def even_odd(x): - yield pvalue.SideOutputValue('odd' if x % 2 else 'even', x) + yield pvalue.OutputValue('odd' if x % 2 else 'even', x) if x % 10 == 0: yield x @@ -235,7 +235,7 @@ class ParDoTest(unittest.TestCase): evens = results.even odds = results.odd tens = results[None] # the undeclared main output - # [END model_pardo_with_side_outputs_undeclared] + # [END model_pardo_with_undeclared_outputs] self.assertEqual({2, 4, 10, 20}, set(evens)) self.assertEqual({1, 3, 5}, set(odds)) http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/pvalue.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 5709b38..d873669 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -206,14 +206,14 @@ class DoOutputsTuple(object): elif self._tags and tag not in self._tags: raise ValueError( "Tag '%s' is neither the main tag '%s' " - "nor any of the side tags %s" % ( + "nor any of the tags %s" % ( tag, self._main_tag, self._tags)) # Check if we accessed this tag before. if tag in self._pcolls: return self._pcolls[tag] if tag is not None: - self._transform.side_output_tags.add(tag) + self._transform.output_tags.add(tag) pcoll = PCollection(self._pipeline, tag=tag) # Transfer the producer from the DoOutputsTuple to the resulting # PCollection. @@ -230,19 +230,19 @@ class DoOutputsTuple(object): return pcoll -class SideOutputValue(object): +class OutputValue(object): """An object representing a tagged value. ParDo, Map, and FlatMap transforms can emit values on multiple outputs which are distinguished by string tags. The DoFn will return plain values - if it wants to emit on the main output and SideOutputValue objects + if it wants to emit on the main output and OutputValue objects if it wants to emit a value on a specific tagged output. """ def __init__(self, tag, value): if not isinstance(tag, basestring): raise TypeError( - 'Attempting to create a SideOutputValue with non-string tag %s' % tag) + 'Attempting to create a OutputValue with non-string tag %s' % tag) self.tag = tag self.value = value http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/common.pxd ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 781d96b..5952942 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -21,7 +21,7 @@ from apache_beam.utils.windowed_value cimport WindowedValue from apache_beam.metrics.execution cimport ScopedMetricsContainer -cdef type SideOutputValue, TimestampedValue +cdef type OutputValue, TimestampedValue cdef class Receiver(object): http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 8f86b75..64d6d00 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -23,7 +23,7 @@ import sys from apache_beam.internal import util from apache_beam.metrics.execution import ScopedMetricsContainer -from apache_beam.pvalue import SideOutputValue +from apache_beam.pvalue import OutputValue from apache_beam.transforms import core from apache_beam.transforms.window import TimestampedValue from apache_beam.transforms.window import WindowFn @@ -283,14 +283,14 @@ class DoFnRunner(Receiver): def _process_outputs(self, windowed_input_element, results): """Dispatch the result of computation to the appropriate receivers. - A value wrapped in a SideOutputValue object will be unwrapped and + A value wrapped in a OutputValue object will be unwrapped and then dispatched to the appropriate indexed output. """ if results is None: return for result in results: tag = None - if isinstance(result, SideOutputValue): + if isinstance(result, OutputValue): tag = result.tag if not isinstance(tag, basestring): raise TypeError('In %s, tag %s is not a string' % (self, tag)) http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 2e9fc52..bdbd2dd 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -373,7 +373,7 @@ class DataflowRunner(PipelineRunner): transform_node.full_label + ( '/Do' if transform_node.side_inputs else ''), transform_node, - transform_node.transform.side_output_tags) + transform_node.transform.output_tags) fn_data = self._pardo_fn_data(transform_node, lookup_label) step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(fn_data)) step.add_property( @@ -384,7 +384,7 @@ class DataflowRunner(PipelineRunner): # Add side inputs if any. step.add_property(PropertyNames.NON_PARALLEL_INPUTS, si_dict) - # Generate description for main output and side outputs. The output names + # Generate description for the outputs. The output names # will be 'out' for main output and 'out_<tag>' for a tagged output. # Using 'out' as a tag will not clash with the name for main since it will # be transformed into 'out_out' internally. @@ -397,8 +397,8 @@ class DataflowRunner(PipelineRunner): '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), PropertyNames.ENCODING: step.encoding, PropertyNames.OUTPUT_NAME: PropertyNames.OUT}) - for side_tag in transform.side_output_tags: - # The assumption here is that side outputs will have the same typehint + for side_tag in transform.output_tags: + # The assumption here is that all outputs will have the same typehint # and coder as the main output. This is certainly the case right now # but conceivably it could change in the future. outputs.append( http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py index 154284b..3ed553e 100644 --- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py +++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py @@ -76,7 +76,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): def process(self, element): if element < 0: - yield pvalue.SideOutputValue('tag_negative', element) + yield pvalue.OutputValue('tag_negative', element) else: yield element http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index f34513c..16b3131 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -248,13 +248,13 @@ class _TaggedReceivers(dict): return self._undeclared_in_memory_tag_values class NullReceiver(object): - """Ignores undeclared side outputs, default execution mode.""" + """Ignores undeclared outputs, default execution mode.""" def output(self, element): pass class InMemoryReceiver(object): - """Buffers undeclared side outputs to the given dictionary.""" + """Buffers undeclared outputs to the given dictionary.""" def __init__(self, target, tag): self._target = target @@ -282,12 +282,12 @@ class _ParDoEvaluator(_TransformEvaluator): transform = self._applied_ptransform.transform self._tagged_receivers = _TaggedReceivers(self._evaluation_context) - for side_output_tag in self._applied_ptransform.outputs: - output_pcollection = pvalue.PCollection(None, tag=side_output_tag) + for output_tag in self._applied_ptransform.outputs: + output_pcollection = pvalue.PCollection(None, tag=output_tag) output_pcollection.producer = self._applied_ptransform - self._tagged_receivers[side_output_tag] = ( + self._tagged_receivers[output_tag] = ( self._evaluation_context.create_bundle(output_pcollection)) - self._tagged_receivers[side_output_tag].tag = side_output_tag + self._tagged_receivers[output_tag].tag = output_tag self._counter_factory = counters.CounterFactory() http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/runners/runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 6c05951..4d33802 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -325,8 +325,8 @@ class PValueCache(object): except KeyError: if (pvalue.tag is not None and self.to_cache_key(pvalue.real_producer, None) in self._cache): - # This is an undeclared, empty side output of a DoFn executed - # in the local runner before this side output referenced. + # This is an undeclared, empty output of a DoFn executed + # in the local runner before this output was referenced. return [] else: raise http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 3def9ef..bdfddbb 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -184,7 +184,7 @@ class DoFn(WithTypeHints, HasDisplayData): trivial_inference.infer_return_type(self.process, [input_type])) def _strip_output_annotations(self, type_hint): - annotations = (TimestampedValue, WindowedValue, pvalue.SideOutputValue) + annotations = (TimestampedValue, WindowedValue, pvalue.OutputValue) # TODO(robertwb): These should be parameterized types that the # type inferencer understands. if (type_hint in annotations @@ -614,7 +614,7 @@ class ParDo(PTransformWithSideInputs): 'fn_dd': self.fn} def expand(self, pcoll): - self.side_output_tags = set() + self.output_tags = set() # TODO(robertwb): Change all uses of the dofn attribute to use fn instead. self.dofn = self.fn return pvalue.PCollection(pcoll.pipeline) @@ -1154,9 +1154,9 @@ class Partition(PTransformWithSideInputs): raise ValueError( 'PartitionFn specified out-of-bounds partition index: ' '%d not in [0, %d)' % (partition, n)) - # Each input is directed into the side output that corresponds to the + # Each input is directed into the output that corresponds to the # selected partition. - yield pvalue.SideOutputValue(str(partition), element) + yield pvalue.OutputValue(str(partition), element) def make_fn(self, fn): return fn if isinstance(fn, PartitionFn) else CallableWrapperPartitionFn(fn) http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 4da5443..b92af83 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -159,7 +159,7 @@ class PTransformTest(unittest.TestCase): 'is discouraged.') self.assertStartswith(cm.exception.message, expected_error_prefix) - def test_do_with_side_outputs_maintains_unique_name(self): + def test_do_with_multiple_outputs_maintains_unique_name(self): pipeline = TestPipeline() pcoll = pipeline | 'Start' >> beam.Create([1, 2, 3]) r1 = pcoll | 'A' >> beam.FlatMap(lambda x: [x + 1]).with_outputs(main='m') @@ -176,9 +176,9 @@ class PTransformTest(unittest.TestCase): def process(self, element): yield element if element % 2 == 0: - yield pvalue.SideOutputValue('even', element) + yield pvalue.OutputValue('even', element) else: - yield pvalue.SideOutputValue('odd', element) + yield pvalue.OutputValue('odd', element) pipeline = TestPipeline() nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) @@ -193,8 +193,8 @@ class PTransformTest(unittest.TestCase): def test_par_do_with_multiple_outputs_and_using_return(self): def some_fn(v): if v % 2 == 0: - return [v, pvalue.SideOutputValue('even', v)] - return [v, pvalue.SideOutputValue('odd', v)] + return [v, pvalue.OutputValue('even', v)] + return [v, pvalue.OutputValue('odd', v)] pipeline = TestPipeline() nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) @@ -206,12 +206,12 @@ class PTransformTest(unittest.TestCase): pipeline.run() @attr('ValidatesRunner') - def test_undeclared_side_outputs(self): + def test_undeclared_outputs(self): pipeline = TestPipeline() nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) results = nums | 'ClassifyNumbers' >> beam.FlatMap( lambda x: [x, - pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)] + pvalue.OutputValue('even' if x % 2 == 0 else 'odd', x)] ).with_outputs() assert_that(results[None], equal_to([1, 2, 3, 4])) assert_that(results.odd, equal_to([1, 3]), label='assert:odd') @@ -219,12 +219,12 @@ class PTransformTest(unittest.TestCase): pipeline.run() @attr('ValidatesRunner') - def test_empty_side_outputs(self): + def test_multiple_empty_outputs(self): pipeline = TestPipeline() nums = pipeline | 'Some Numbers' >> beam.Create([1, 3, 5]) results = nums | 'ClassifyNumbers' >> beam.FlatMap( lambda x: [x, - pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)] + pvalue.OutputValue('even' if x % 2 == 0 else 'odd', x)] ).with_outputs() assert_that(results[None], equal_to([1, 3, 5])) assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd') http://git-wip-us.apache.org/repos/asf/beam/blob/3418863f/sdks/python/apache_beam/typehints/typecheck.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py index 2e7176e..e475d9d 100644 --- a/sdks/python/apache_beam/typehints/typecheck.py +++ b/sdks/python/apache_beam/typehints/typecheck.py @@ -22,7 +22,7 @@ import inspect import sys import types -from apache_beam.pvalue import SideOutputValue +from apache_beam.pvalue import OutputValue from apache_beam.transforms.core import DoFn from apache_beam.transforms.window import WindowedValue from apache_beam.typehints import check_constraint @@ -136,7 +136,7 @@ class TypeCheckWrapperDoFn(AbstractDoFnWrapper): def type_check_output(o): # TODO(robertwb): Multi-output. - x = o.value if isinstance(o, (SideOutputValue, WindowedValue)) else o + x = o.value if isinstance(o, (OutputValue, WindowedValue)) else o self._type_check(self._output_type_hint, x, is_input=False) # If the return type is a generator, then we will need to interleave our
