Check type of coder for step feeding into GroupByKey in Dataflow runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3b53028 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3b53028 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3b53028 Branch: refs/heads/python-sdk Commit: e3b53028457f3a1969bd3bd0a98d5def50de9335 Parents: b75c0c0 Author: Charles Chen <[email protected]> Authored: Mon Jul 18 17:54:34 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Tue Jul 19 18:58:03 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/iobase.py | 6 ++++++ .../apache_beam/runners/dataflow_runner.py | 18 +++++++++-------- sdks/python/apache_beam/runners/runner_test.py | 21 ++++++++++++++++++++ .../python/apache_beam/transforms/ptransform.py | 21 ++++++++++++++++++++ 4 files changed, 58 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3b53028/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 71ef46b..de5e9d4 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -938,6 +938,12 @@ class Read(ptransform.PTransform): def get_windowing(self, unused_inputs): return core.Windowing(window.GlobalWindows()) + def _infer_output_coder(self, input_type=None, input_coder=None): + if isinstance(self.source, BoundedSource): + return self.source.default_output_coder() + else: + return self.source.coder + class Write(ptransform.PTransform): """A ``PTransform`` that writes to a sink. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3b53028/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 4fafc9f..f794c8b 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -30,7 +30,6 @@ import time from apache_beam import coders from apache_beam import pvalue from apache_beam.internal import pickler -from apache_beam.io import iobase from apache_beam.pvalue import PCollectionView from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner @@ -326,7 +325,15 @@ class DataflowPipelineRunner(PipelineRunner): PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) def apply_GroupByKey(self, transform, pcoll): - coder = self._get_coder(pcoll.element_type or typehints.Any, None) + # Infer coder of parent. + # + # TODO(ccy): make Coder inference and checking less specialized and more + # comprehensive. + parent = pcoll.producer + if parent: + coder = parent.transform._infer_output_coder() # pylint: disable=protected-access + if not coder: + coder = self._get_coder(pcoll.element_type or typehints.Any, None) if not coder.is_kv_coder(): raise ValueError(('Coder for the GroupByKey operation "%s" is not a ' 'key-value coder: %s.') % (transform.label, @@ -525,16 +532,11 @@ class DataflowPipelineRunner(PipelineRunner): else: step.add_property(PropertyNames.FORMAT, transform.source.format) - if isinstance(transform.source, iobase.BoundedSource): - coder = transform.source.default_output_coder() - else: - coder = transform.source.coder - # Wrap coder in WindowedValueCoder: this is necessary as the encoding of a # step should be the type of value outputted by each step. Read steps # automatically wrap output values in a WindowedValue wrapper, if necessary. # This is also necessary for proper encoding for size estimation. - coder = coders.WindowedValueCoder(coder) + coder = coders.WindowedValueCoder(transform._infer_output_coder()) # pylint: disable=protected-access step.encoding = self._get_cloud_encoding(coder) step.add_property( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3b53028/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index d2e70d7..2863756 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -24,6 +24,8 @@ caching and clearing values that are not tested elsewhere. import unittest +import apache_beam as beam + from apache_beam.internal import apiclient from apache_beam.pipeline import Pipeline from apache_beam.runners import create_runner @@ -64,6 +66,25 @@ class RunnerTest(unittest.TestCase): remote_runner.job = apiclient.Job(p.options) super(DataflowPipelineRunner, remote_runner).run(p) + def test_no_group_by_key_directly_after_bigquery(self): + remote_runner = DataflowPipelineRunner() + p = Pipeline(remote_runner, + options=PipelineOptions([ + '--dataflow_endpoint=ignored', + '--job_name=test-job', + '--project=test-project', + '--staging_location=ignored', + '--temp_location=/dev/null', + '--no_auth=True' + ])) + rows = p | beam.io.Read('read', + beam.io.BigQuerySource('dataset.faketable')) + with self.assertRaises(ValueError, + msg=('Coder for the GroupByKey operation' + '"GroupByKey" is not a key-value coder: ' + 'RowAsDictJsonCoder')): + unused_invalid = rows | beam.GroupByKey() + if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3b53028/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index bde05b5..da8b671 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -330,6 +330,27 @@ class PTransform(WithTypeHints): input_or_output.title(), self.label, at_context, hint, pvalue_.element_type)) + def _infer_output_coder(self, input_type=None, input_coder=None): + """Returns the output coder to use for output of this transform. + + Note: this API is experimental and is subject to change; please do not rely + on behavior induced by this method. + + The Coder returned here should not be wrapped in a WindowedValueCoder + wrapper. + + Args: + input_type: An instance of an allowed built-in type, a custom class, or a + typehints.TypeConstraint for the input type, or None if not available. + input_coder: Coder object for encoding input to this PTransform, or None + if not available. + + Returns: + Coder object for encoding output of this PTransform or None if unknown. + """ + # TODO(ccy): further refine this API. + return None + def clone(self, new_label): """Clones the current transform instance under a new label.""" transform = copy.copy(self)
