[
https://issues.apache.org/jira/browse/BEAM-3971?focusedWorklogId=117698&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117698
]
ASF GitHub Bot logged work on BEAM-3971:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Jun/18 00:20
Start Date: 30/Jun/18 00:20
Worklog Time Spent: 10m
Work Description: bsidhom commented on a change in pull request #5833:
[BEAM-3971, BEAM-4284] Remove fromProto for Pipeline and PTransform translation.
URL: https://github.com/apache/beam/pull/5833#discussion_r199305248
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
##########
@@ -912,30 +922,81 @@ public void translateNode(
TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));
- TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
- WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
- new WindowDoFnOperator<>(
- reduceFn,
- fullName,
- (Coder) windowedWorkItemCoder,
- mainTag,
- Collections.emptyList(),
- new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag,
outputCoder),
- windowingStrategy,
- new HashMap<>(), /* side-input mapping */
- Collections.emptyList(), /* side inputs */
- context.getPipelineOptions(),
- inputKvCoder.getKeyCoder(),
- keySelector);
+ List<PCollectionView<?>> sideInputs = ((Combine.PerKey)
transform).getSideInputs();
- // our operator excepts WindowedValue<KeyedWorkItem> while our input
stream
- // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java
doesn't like it ...
- @SuppressWarnings("unchecked")
- SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
- keyedWorkItemStream
- .transform(fullName, outputTypeInfo, (OneInputStreamOperator)
doFnOperator)
- .uid(fullName);
- context.setOutputDataStream(context.getOutput(transform), outDataStream);
+ if (sideInputs.isEmpty()) {
+ TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
+ WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+ new WindowDoFnOperator<>(
+ reduceFn,
+ fullName,
+ (Coder) windowedWorkItemCoder,
+ mainTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag,
outputCoder),
+ windowingStrategy,
+ new HashMap<>(), /* side-input mapping */
+ Collections.emptyList(), /* side inputs */
+ context.getPipelineOptions(),
+ inputKvCoder.getKeyCoder(),
+ keySelector);
+
+ // our operator excepts WindowedValue<KeyedWorkItem> while our input
stream
+ // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java
doesn't like it ...
+ @SuppressWarnings("unchecked")
+ SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>>
outDataStream =
+ keyedWorkItemStream
+ .transform(fullName, outputTypeInfo, (OneInputStreamOperator)
doFnOperator)
+ .uid(fullName);
+ context.setOutputDataStream(context.getOutput(transform),
outDataStream);
+ } else {
+ Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>>
transformSideInputs =
+ transformSideInputs(sideInputs, context);
+
+ TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
+ WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+ new WindowDoFnOperator<>(
+ reduceFn,
+ fullName,
+ (Coder) windowedWorkItemCoder,
+ mainTag,
+ Collections.emptyList(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag,
outputCoder),
+ windowingStrategy,
+ transformSideInputs.f0,
+ sideInputs,
+ context.getPipelineOptions(),
+ inputKvCoder.getKeyCoder(),
+ keySelector);
+
+ // we have to manually contruct the two-input transform because we're
not
+ // allowed to have only one input keyed, normally.
+
+ TwoInputTransformation<
+ WindowedValue<SingletonKeyedWorkItem<K, InputT>>,
RawUnionValue,
+ WindowedValue<KV<K, OutputT>>>
+ rawFlinkTransform =
+ new TwoInputTransformation<>(
+ keyedWorkItemStream.getTransformation(),
+ transformSideInputs.f1.broadcast().getTransformation(),
+ transform.getName(),
+ (TwoInputStreamOperator) doFnOperator,
+ outputTypeInfo,
+ keyedWorkItemStream.getParallelism());
+
+ rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
+
rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(),
null);
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>>
outDataStream =
+ new SingleOutputStreamOperator(
+ keyedWorkItemStream.getExecutionEnvironment(),
+ rawFlinkTransform) {}; // we have to cheat around the ctor
being protected
Review comment:
Wow.
Side note: GitHub is painfully naive about generating diffs for these types
of refactor.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 117698)
Time Spent: 5.5h (was: 5h 20m)
> Pipeline translation utilities should not use SDK construction classes
> ----------------------------------------------------------------------
>
> Key: BEAM-3971
> URL: https://issues.apache.org/jira/browse/BEAM-3971
> Project: Beam
> Issue Type: Bug
> Components: runner-core
> Reporter: Ben Sidhom
> Assignee: Ben Sidhom
> Priority: Major
> Time Spent: 5.5h
> Remaining Estimate: 0h
>
> In general, portable runners will require access to pipeline information not
> available in rehydrated pipelines while constructing physical plans.
> Translation utilities should operate directly on protos or on thin,
> information-preserving wrappers.
> The pipeline fusion utilities already operate on protos directly and can be
> used as an example of how this could be done.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)