[
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137860&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137860
]
ASF GitHub Bot logged work on BEAM-2930:
----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Aug/18 15:55
Start Date: 24/Aug/18 15:55
Worklog Time Spent: 10m
Work Description: tweise commented on a change in pull request #6208:
[BEAM-2930] Side input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208#discussion_r212674697
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -520,6 +567,158 @@ private void translateImpulse(
}
}
+ private static LinkedHashMap<RunnerApi.ExecutableStagePayload.SideInputId,
PCollectionView<?>>
+ getSideInputIdToPCollectionViewMap(
+ RunnerApi.ExecutableStagePayload stagePayload, RunnerApi.Components
components) {
+
+ RehydratedComponents rehydratedComponents =
RehydratedComponents.forComponents(components);
+
+ LinkedHashMap<RunnerApi.ExecutableStagePayload.SideInputId,
PCollectionView<?>> sideInputs =
+ new LinkedHashMap<>();
+ for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId :
+ stagePayload.getSideInputsList()) {
+
+ // TODO: local name is unique as long as only one transform with side
input can be within a stage
+ String sideInputTag = sideInputId.getLocalName();
+ // for PCollectionView compatibility, not used to transform
materialization
+ ViewFn<Iterable<WindowedValue<?>>, ?> viewFn =
+ (ViewFn) new
PCollectionViews.MultimapViewFn<Iterable<WindowedValue<Void>>, Void>();
+
+ String collectionId =
+ components
+ .getTransformsOrThrow(sideInputId.getTransformId())
+ .getInputsOrThrow(sideInputId.getLocalName());
+ RunnerApi.WindowingStrategy windowingStrategyProto =
+ components.getWindowingStrategiesOrThrow(
+
components.getPcollectionsOrThrow(collectionId).getWindowingStrategyId());
+
+ final WindowingStrategy<?, ?> windowingStrategy;
+ try {
+ windowingStrategy =
+ WindowingStrategyTranslation.fromProto(windowingStrategyProto,
rehydratedComponents);
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalStateException(
+ String.format(
+ "Unable to hydrate side input windowing strategy %s.",
windowingStrategyProto),
+ e);
+ }
+
+ Coder<WindowedValue<Object>> coder = instantiateCoder(collectionId,
components);
+ // side input materialization via GBK (T -> Iterable<T>)
+ WindowedValueCoder wvCoder = (WindowedValueCoder) coder;
+ coder =
wvCoder.withValueCoder(IterableCoder.of(wvCoder.getValueCoder()));
+
+ sideInputs.put(
+ sideInputId,
+ new RunnerPCollectionView<>(
+ null,
+ new TupleTag<>(sideInputTag),
+ viewFn,
+ // TODO: support custom mapping fn
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+ windowingStrategy,
+ coder));
+ }
+ return sideInputs;
+ }
+
+ private Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>>
transformSideInputs(
+ RunnerApi.ExecutableStagePayload stagePayload,
+ RunnerApi.Components components,
+ StreamingTranslationContext context) {
+
+ LinkedHashMap<RunnerApi.ExecutableStagePayload.SideInputId,
PCollectionView<?>> sideInputs =
+ getSideInputIdToPCollectionViewMap(stagePayload, components);
+
+ Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>();
+ Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>();
+ List<WindowedValueCoder<KV<Void, Object>>> kvCoders = new ArrayList<>();
+ List<Coder<?>> viewCoders = new ArrayList<>();
+
+ int count = 0;
+ for (Map.Entry<RunnerApi.ExecutableStagePayload.SideInputId,
PCollectionView<?>> sideInput :
+ sideInputs.entrySet()) {
+ TupleTag<?> tag = sideInput.getValue().getTagInternal();
+ intToViewMapping.put(count, sideInput.getValue());
+ tagToIntMapping.put(tag, count);
+ count++;
+ String collectionId =
+ components
+ .getTransformsOrThrow(sideInput.getKey().getTransformId())
+ .getInputsOrThrow(sideInput.getKey().getLocalName());
+ DataStream<Object> sideInputStream =
context.getDataStreamOrThrow(collectionId);
+ TypeInformation<Object> tpe = sideInputStream.getType();
+ if (!(tpe instanceof CoderTypeInformation)) {
+ throw new IllegalStateException("Input Stream TypeInformation is no
CoderTypeInformation.");
+ }
+
+ WindowedValueCoder<Object> coder =
+ (WindowedValueCoder) ((CoderTypeInformation) tpe).getCoder();
+ Coder<KV<Void, Object>> kvCoder = KvCoder.of(VoidCoder.of(),
coder.getValueCoder());
+ kvCoders.add(coder.withValueCoder(kvCoder));
+ // coder for materialized view matching GBK below
+ WindowedValueCoder<KV<Void, Iterable<Object>>> viewCoder =
+ coder.withValueCoder(KvCoder.of(VoidCoder.of(),
IterableCoder.of(coder.getValueCoder())));
+ viewCoders.add(viewCoder);
+ }
+
+ // second pass, now that we gathered the input coders
+ UnionCoder unionCoder = UnionCoder.of(viewCoders);
+
+ CoderTypeInformation<RawUnionValue> unionTypeInformation =
+ new CoderTypeInformation<>(unionCoder);
+
+ // transform each side input to RawUnionValue and union them
+ DataStream<RawUnionValue> sideInputUnion = null;
+
+ for (Map.Entry<RunnerApi.ExecutableStagePayload.SideInputId,
PCollectionView<?>> sideInput :
+ sideInputs.entrySet()) {
+ TupleTag<?> tag = sideInput.getValue().getTagInternal();
+ final int intTag = tagToIntMapping.get(tag);
+ String collectionId =
+ components
+ .getTransformsOrThrow(sideInput.getKey().getTransformId())
+ .getInputsOrThrow(sideInput.getKey().getLocalName());
+ DataStream<WindowedValue<?>> sideInputStream =
context.getDataStreamOrThrow(collectionId);
+
+ // insert GBK to materialize side input view
+ String viewName =
+ sideInput.getKey().getTransformId() + "-" +
sideInput.getKey().getLocalName();
+ WindowedValueCoder<KV<Void, Object>> kvCoder = kvCoders.get(intTag);
+ DataStream<WindowedValue<KV<Void, Object>>> keyedSideInputStream =
+ sideInputStream.map(new ToVoidKeyValue());
+
+ SingleOutputStreamOperator<WindowedValue<KV<Void, Iterable<Object>>>>
viewStream =
+ addGBK(
+ keyedSideInputStream,
+ sideInput.getValue().getWindowingStrategyInternal(),
+ kvCoder,
+ viewName,
+ context);
+
+ DataStream<RawUnionValue> unionValueStream =
+ viewStream
+ .map(new FlinkStreamingTransformTranslators.ToRawUnion<>(intTag))
+ .returns(unionTypeInformation);
+
+ if (sideInputUnion == null) {
+ sideInputUnion = unionValueStream;
+ } else {
+ sideInputUnion = sideInputUnion.union(unionValueStream);
+ }
+ }
+
+ return new Tuple2<>(intToViewMapping, sideInputUnion);
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 137860)
Time Spent: 5h 50m (was: 5h 40m)
> Flink support for portable side input
> -------------------------------------
>
> Key: BEAM-2930
> URL: https://issues.apache.org/jira/browse/BEAM-2930
> Project: Beam
> Issue Type: Sub-task
> Components: runner-flink
> Reporter: Henning Rohde
> Assignee: Thomas Weise
> Priority: Major
> Labels: portability
> Time Spent: 5h 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)