[
https://issues.apache.org/jira/browse/BEAM-2717?focusedWorklogId=159059&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-159059
]
ASF GitHub Bot logged work on BEAM-2717:
----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Oct/18 09:33
Start Date: 26/Oct/18 09:33
Worklog Time Spent: 10m
Work Description: robertwb closed pull request #6813: [BEAM-2717] Emit
coders in generated protos.
URL: https://github.com/apache/beam/pull/6813
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/pipeline.py
b/sdks/python/apache_beam/pipeline.py
index 68e313909a3..10f4a02c065 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -400,7 +400,9 @@ def run(self, test_runner_api=True):
# When possible, invoke a round trip through the runner API.
if test_runner_api and self._verify_runner_api_compatible():
return Pipeline.from_runner_api(
- self.to_runner_api(), self.runner, self._options).run(False)
+ self.to_runner_api(use_fake_coders=True),
+ self.runner,
+ self._options).run(False)
if self._options.view_as(TypeOptions).runtime_type_check:
from apache_beam.typehints import typecheck
@@ -603,12 +605,39 @@ def visit_value(self, value, _):
self.visit(Visitor())
return Visitor.ok
- def to_runner_api(self, return_context=False, context=None):
+ def to_runner_api(
+ self, return_context=False, context=None, use_fake_coders=False):
"""For internal use only; no backwards-compatibility guarantees."""
from apache_beam.runners import pipeline_context
from apache_beam.portability.api import beam_runner_api_pb2
if context is None:
- context = pipeline_context.PipelineContext()
+ context = pipeline_context.PipelineContext(
+ use_fake_coders=use_fake_coders)
+
+ # The RunnerAPI spec requires certain transforms to have KV inputs
+ # (and corresponding outputs).
+ # Currently we only upgrade to KV pairs. If there is a need for more
+ # general shapes, potential conflicts will have to be resolved.
+ # We also only handle single-input, and (for fixing the output) single
+ # output, which is sufficient.
+ class ForceKvInputTypes(PipelineVisitor):
+ def enter_composite_transform(self, transform_node):
+ self.visit_transform(transform_node)
+
+ def visit_transform(self, transform_node):
+ if (transform_node.transform
+ and transform_node.transform.runner_api_requires_keyed_input()):
+ pcoll = transform_node.inputs[0]
+ pcoll.element_type = typehints.coerce_to_kv_type(
+ pcoll.element_type, transform_node.full_label)
+ if len(transform_node.outputs) == 1:
+ # The runner often has expectations about the output types as well.
+ output, = transform_node.outputs.values()
+ output.element_type = transform_node.transform.infer_output_type(
+ pcoll.element_type)
+
+ self.visit(ForceKvInputTypes())
+
# Mutates context; placing inline would force dependence on
# argument evaluation order.
root_transform_id = context.transforms.get_id(self._root_transform())
diff --git a/sdks/python/apache_beam/pipeline_test.py
b/sdks/python/apache_beam/pipeline_test.py
index 41f749d2f79..410d3d1645a 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -549,7 +549,8 @@ def expand(self, p):
p = beam.Pipeline()
p | MyPTransform() # pylint: disable=expression-not-assigned
- p = Pipeline.from_runner_api(Pipeline.to_runner_api(p), None, None)
+ p = Pipeline.from_runner_api(
+ Pipeline.to_runner_api(p, use_fake_coders=True), None, None)
self.assertIsNotNone(p.transforms_stack[0].parts[0].parent)
self.assertEquals(p.transforms_stack[0].parts[0].parent,
p.transforms_stack[0])
diff --git a/sdks/python/apache_beam/pvalue.py
b/sdks/python/apache_beam/pvalue.py
index 62cefa12bc8..0e4ef106b52 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -142,7 +142,7 @@ def to_runner_api(self, context):
return beam_runner_api_pb2.PCollection(
unique_name='%d%s.%s' % (
len(self.producer.full_label), self.producer.full_label, self.tag),
- coder_id=pickler.dumps(self.element_type),
+ coder_id=context.coder_id_from_element_type(self.element_type),
is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED,
windowing_strategy_id=context.windowing_strategies.get_id(
self.windowing))
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index ecaeda07c46..30f466b40ef 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -335,14 +335,6 @@ def run_pipeline(self, pipeline):
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True)
- # TODO(BEAM-2717): Remove once Coders are already in proto.
- for pcoll in self.proto_pipeline.components.pcollections.values():
- if pcoll.coder_id not in self.proto_context.coders:
- coder = coders.registry.get_coder(pickler.loads(pcoll.coder_id))
- pcoll.coder_id = self.proto_context.coders.get_id(coder)
- self.proto_context.coders.populate_map(
- self.proto_pipeline.components.coders)
-
# Add setup_options for all the BeamPlugin imports
setup_options = pipeline._options.view_as(SetupOptions)
plugins = BeamPlugin.get_all_plugin_paths()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index be6f8c22332..45bc246d4ac 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -109,13 +109,13 @@ def run_pipeline(self, pipeline):
# Invoke a round trip through the runner API. This makes sure the Pipeline
# proto is stable.
pipeline = beam.pipeline.Pipeline.from_runner_api(
- pipeline.to_runner_api(),
+ pipeline.to_runner_api(use_fake_coders=True),
pipeline.runner,
pipeline._options)
# Snapshot the pipeline in a portable proto before mutating it.
pipeline_proto, original_context = pipeline.to_runner_api(
- return_context=True)
+ return_context=True, use_fake_coders=True)
pcolls_to_pcoll_id = self._pcolls_to_pcoll_id(pipeline, original_context)
analyzer = pipeline_analyzer.PipelineAnalyzer(self._cache_manager,
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer.py
b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer.py
index 9f815a36ff4..cd9bedc5c27 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer.py
@@ -54,7 +54,8 @@ def __init__(self, cache_manager, pipeline_proto,
underlying_runner,
options=options)
# context returned from to_runner_api is more informative than that
returned
# from from_runner_api.
- _, self._context = self._pipeline.to_runner_api(return_context=True)
+ _, self._context = self._pipeline.to_runner_api(
+ return_context=True, use_fake_coders=True)
self._pipeline_info = PipelineInfo(self._pipeline_proto.components)
# Result of the analysis that can be queried by the user.
diff --git
a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
index caefbe04ba6..53a4a33c962 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
@@ -38,9 +38,9 @@ def to_stable_runner_api(p):
"""The extra round trip ensures a stable pipeline proto.
"""
return (beam.pipeline.Pipeline.from_runner_api(
- p.to_runner_api(),
+ p.to_runner_api(use_fake_coders=True),
p.runner,
- p._options).to_runner_api())
+ p._options).to_runner_api(use_fake_coders=True))
class PipelineAnalyzerTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py
b/sdks/python/apache_beam/runners/pipeline_context.py
index dbe2953ae8d..5f774f8db60 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -27,6 +27,7 @@
from apache_beam import coders
from apache_beam import pipeline
from apache_beam import pvalue
+from apache_beam.internal import pickler
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.transforms import core
@@ -109,7 +110,8 @@ class PipelineContext(object):
'environments': Environment,
}
- def __init__(self, proto=None, default_environment=None):
+ def __init__(
+ self, proto=None, default_environment=None, use_fake_coders=False):
if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor):
proto = beam_runner_api_pb2.Components(
coders=dict(proto.coders.items()),
@@ -124,6 +126,17 @@ def __init__(self, proto=None, default_environment=None):
Environment(default_environment))
else:
self._default_environment_id = None
+ self.use_fake_coders = use_fake_coders
+
+ # If fake coders are requested, return a pickled version of the element type
+ # rather than an actual coder. The element type is required for some runners,
+ # as well as performing a round-trip through protos.
+ # TODO(BEAM-2717): Remove once this is no longer needed.
+ def coder_id_from_element_type(self, element_type):
+ if self.use_fake_coders:
+ return pickler.dumps(element_type)
+ else:
+ return self.coders.get_id(coders.registry.get_coder(element_type))
@staticmethod
def from_runner_api(proto):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 393ba3ef68c..67414088c8e 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -34,11 +34,8 @@
import apache_beam as beam # pylint: disable=ungrouped-imports
from apache_beam import coders
from apache_beam import metrics
-from apache_beam.coders import WindowedValueCoder
-from apache_beam.coders import registry
from apache_beam.coders.coder_impl import create_InputStream
from apache_beam.coders.coder_impl import create_OutputStream
-from apache_beam.internal import pickler
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.execution import MetricsEnvironment
@@ -963,19 +960,15 @@ def process(stage):
pipeline_components = copy.deepcopy(pipeline_proto.components)
- # Reify coders.
- # TODO(BEAM-2717): Remove once Coders are already in proto.
- coders = pipeline_context.PipelineContext(pipeline_components).coders
+ # Some SDK workers require windowed coders for their PCollections.
+ # TODO(BEAM-4150): Consistently use unwindowed coders everywhere.
for pcoll in pipeline_components.pcollections.values():
- if pcoll.coder_id not in coders:
- window_coder = coders[
+ if (pipeline_components.coders[pcoll.coder_id].spec.spec.urn
+ != common_urns.coders.WINDOWED_VALUE.urn):
+ pcoll.coder_id = windowed_coder_id(
+ pcoll.coder_id,
pipeline_components.windowing_strategies[
- pcoll.windowing_strategy_id].window_coder_id]
- coder = WindowedValueCoder(
- registry.get_coder(pickler.loads(pcoll.coder_id)),
- window_coder=window_coder)
- pcoll.coder_id = coders.get_id(coder)
- coders.populate_map(pipeline_components.coders)
+ pcoll.windowing_strategy_id].window_coder_id)
known_composites = set(
[common_urns.primitives.GROUP_BY_KEY.urn,
diff --git a/sdks/python/apache_beam/transforms/core.py
b/sdks/python/apache_beam/transforms/core.py
index 63546f8a692..5505de55522 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -991,6 +991,9 @@ def from_runner_api_parameter(pardo_payload, context):
result.side_inputs = [si for _, si in sorted(indexed_side_inputs)]
return result
+ def runner_api_requires_keyed_input(self):
+ return userstate.is_stateful_dofn(self.fn)
+
class _MultiParDo(PTransform):
@@ -1375,6 +1378,9 @@ def from_runner_api_parameter(combine_payload, context):
return CombinePerKey(
CombineFn.from_runner_api(combine_payload.combine_fn, context))
+ def runner_api_requires_keyed_input(self):
+ return True
+
# TODO(robertwb): Rename to CombineGroupedValues?
class CombineValues(PTransformWithSideInputs):
@@ -1606,6 +1612,10 @@ def expand(self, pcoll):
| 'GroupByKey' >> _GroupByKeyOnly()
| 'GroupByWindow' >> _GroupAlsoByWindow(pcoll.windowing))
+ def infer_output_type(self, input_type):
+ key_type, value_type = trivial_inference.key_value_types(input_type)
+ return KV[key_type, Iterable[value_type]]
+
def to_runner_api_parameter(self, unused_context):
return common_urns.primitives.GROUP_BY_KEY.urn, None
@@ -1613,6 +1623,9 @@ def to_runner_api_parameter(self, unused_context):
def from_runner_api_parameter(unused_payload, unused_context):
return GroupByKey()
+ def runner_api_requires_keyed_input(self):
+ return True
+
@typehints.with_input_types(typehints.KV[K, V])
@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
diff --git a/sdks/python/apache_beam/transforms/ptransform.py
b/sdks/python/apache_beam/transforms/ptransform.py
index 4a89cbf8419..9939278d380 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -571,6 +571,9 @@ def to_runner_api_pickled(self, unused_context):
return (python_urns.PICKLED_TRANSFORM,
pickler.dumps(self))
+ def runner_api_requires_keyed_input(self):
+ return False
+
@PTransform.register_urn(python_urns.GENERIC_COMPOSITE_TRANSFORM, None)
def _create_transform(payload, unused_context):
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 159059)
Time Spent: 1h 50m (was: 1h 40m)
> Infer coders in SDK prior to handing off pipeline to Runner
> -----------------------------------------------------------
>
> Key: BEAM-2717
> URL: https://issues.apache.org/jira/browse/BEAM-2717
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Robert Bradshaw
> Priority: Minor
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> Currently all runners have to duplicate this work, and there's also a hack
> storing the element type rather than the coder in the Runner protos.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)