[
https://issues.apache.org/jira/browse/BEAM-4582?focusedWorklogId=112943&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112943
]
ASF GitHub Bot logged work on BEAM-4582:
----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Jun/18 23:10
Start Date: 18/Jun/18 23:10
Worklog Time Spent: 10m
Work Description: charlesccychen closed pull request #5675: [BEAM-4582]
Fix streaming Create transform on Dataflow
URL: https://github.com/apache/beam/pull/5675
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 2e5ed79efd4..9c8520250e7 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -312,7 +312,12 @@ def run_pipeline(self, pipeline):
if apiclient._use_fnapi(pipeline._options):
pipeline.visit(self.side_input_visitor())
- # Snapshot the pipeline in a portable proto before mutating it
+ # Performing configured PTransform overrides. Note that this is currently
+ # done before Runner API serialization, since the new proto needs to
contain
+ # any added PTransforms.
+ pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES)
+
+ # Snapshot the pipeline in a portable proto.
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True)
@@ -324,9 +329,6 @@ def run_pipeline(self, pipeline):
self.proto_context.coders.populate_map(
self.proto_pipeline.components.coders)
- # Performing configured PTransform overrides.
- pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES)
-
# Add setup_options for all the BeamPlugin imports
setup_options = pipeline._options.view_as(SetupOptions)
plugins = BeamPlugin.get_all_plugin_paths()
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 112943)
Time Spent: 1h 20m (was: 1h 10m)
> Incorrectly translates
> apache_beam.runners.dataflow.native_io.streaming_create.DecodeAndEmitDoFn
> when creating the Dataflow pipeline json description
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: BEAM-4582
> URL: https://issues.apache.org/jira/browse/BEAM-4582
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Reporter: Luke Cwik
> Assignee: Charles Chen
> Priority: Major
> Labels: portability
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> When executing against Dataflow, the JSON pipeline description contains the
> following JSON which doesn't appear in the pipeline proto:
>
> {code:java}
> {
> "kind": "ParallelDo",
> "name": "s2",
> "properties": {
> "display_data": [
> {
> "key": "fn",
> "label": "Transform Function",
> "namespace": "apache_beam.transforms.core.ParDo",
> "shortValue": "DecodeAndEmitDoFn",
> "type": "STRING",
> "value":
> "apache_beam.runners.dataflow.native_io.streaming_create.DecodeAndEmitDoFn"
> }
> ],
> "non_parallel_inputs": {},
> "output_info": [
> {
> "encoding": {
> "@type": "kind:windowed_value",
> "component_encodings": [
> {
> "@type":
> "FastPrimitivesCoder$eNprYEpOLEhMzkiNT0pNzNVLzk9JLSqGUlxuicUlAUWZuZklmWWpxc4gQa5CBs3GQsbaQqZQ/vi0xJycpMTk7Hiw+kJmPEYFZCZn56RCjWABGsFaW8iWVJykBwDlGS3/",
>
> "component_encodings": [
> {
> "@type":
> "FastPrimitivesCoder$eNprYEpOLEhMzkiNT0pNzNVLzk9JLSqGUlxuicUlAUWZuZklmWWpxc4gQa5CBs3GQsbaQqZQ/vi0xJycpMTk7Hiw+kJmPEYFZCZn56RCjWABGsFaW8iWVJykBwDlGS3/",
>
> "component_encodings": []
> },
> {
> "@type":
> "FastPrimitivesCoder$eNprYEpOLEhMzkiNT0pNzNVLzk9JLSqGUlxuicUlAUWZuZklmWWpxc4gQa5CBs3GQsbaQqZQ/vi0xJycpMTk7Hiw+kJmPEYFZCZn56RCjWABGsFaW8iWVJykBwDlGS3/",
>
> "component_encodings": []
> }
> ],
> "is_pair_like": true
> },
> {
> "@type": "kind:global_window"
> }
> ],
> "is_wrapper": true
> },
> "output_name": "out",
> "user_name": "Some Numbers/Decode Values.out"
> }
> ],
> "parallel_input": {
> "@type": "OutputReference",
> "output_name": "out",
> "step_name": "s1"
> },
> "serialized_fn": "ref_AppliedPTransform_AppliedPTransform_45",
> "user_name": "Some Numbers/Decode Values"
> }
> },
> {code}
> This causes the DataflowRunner to use a legacy code path and ask the Python
> SDK harness to execute a transform with a payload
> *ref_AppliedPTransform_AppliedPTransform_45* instead of sending the
> PTransform proto.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)