dxichen commented on a change in pull request #14883:
URL: https://github.com/apache/beam/pull/14883#discussion_r639304482
##########
File path:
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
##########
@@ -368,6 +405,80 @@ public void translatePortable(
return Collections.emptyMap();
}
+ @SuppressWarnings("unchecked")
+ private static final ViewFn<Iterable<WindowedValue<?>>, ?> VIEW_FN =
+ (ViewFn)
+ new PCollectionViews.MultimapViewFn<>(
+
(PCollectionViews.TypeDescriptorSupplier<Iterable<WindowedValue<Void>>>)
+ () -> TypeDescriptors.iterables(new
TypeDescriptor<WindowedValue<Void>>() {}),
+ (PCollectionViews.TypeDescriptorSupplier<Void>)
TypeDescriptors::voids);
+
+ // This method follows the same way in Flink to create a runner-side Java
+ // PCollectionView to represent a portable side input.
+ private static PCollectionView<?> createPCollectionView(
+ SideInputId sideInputId,
+ WindowedValue.WindowedValueCoder<?> coder,
+ WindowingStrategy<?, BoundedWindow> windowingStrategy) {
+
+ return new RunnerPCollectionView<>(
+ null,
+ new TupleTag<>(sideInputId.getLocalName()),
+ VIEW_FN,
+ // TODO: support custom mapping fn
+ windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+ windowingStrategy,
+ coder.getValueCoder());
+ }
+
+ // Group the side input globally with a null key and then broadcast it
+ // to all tasks.
+ private static <SideInputT>
+ MessageStream<OpMessage<Iterable<SideInputT>>>
groupAndBroadcastSideInput(
+ SideInputId sideInputId,
+ String sideInputCollectionId,
+ RunnerApi.PCollection sideInputPCollection,
+ WindowingStrategy<SideInputT, BoundedWindow> windowingStrategy,
+ WindowedValue.WindowedValueCoder<SideInputT> coder,
+ PortableTranslationContext ctx) {
+ final MessageStream<OpMessage<SideInputT>> sideInput =
+ ctx.getMessageStreamById(sideInputCollectionId);
+ final MessageStream<OpMessage<KV<Void, SideInputT>>> keyedSideInput =
+ sideInput.map(
+ opMessage -> {
+ WindowedValue<SideInputT> wv = opMessage.getElement();
+ return OpMessage.ofElement(wv.withValue(KV.of(null,
wv.getValue())));
Review comment:
why not use a placeholder string or byte for sideinputs instead? Is this
the suggested way in beam to have a kv where only the value is relevant?
##########
File path:
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
##########
@@ -274,10 +310,11 @@ public void translatePortable(
windowedInputCoder.getValueCoder(), // input coder not in use
windowedInputCoder,
Collections.emptyMap(), // output coders not in use
- Collections.emptyList(), // sideInputs not in use until side input
support
+ new ArrayList<>(
+ sideInputMapping.values()), // sideInputs not in use until
side input support
new ArrayList<>(idToTupleTagMap.values()), // used by java runner
only
- SamzaPipelineTranslatorUtils.getPortableWindowStrategy(transform,
pipeline),
- Collections.emptyMap(), // idToViewMap not in use until side input
support
+ ctx.getPortableWindowStrategy(inputId,
stagePayload.getComponents()),
+ idToViewMapping, // idToViewMap not in use until side input support
Review comment:
lets remove the comment here and L314
##########
File path:
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
##########
@@ -223,15 +236,39 @@ public void translatePortable(
String inputId = stagePayload.getInput();
final MessageStream<OpMessage<InT>> inputStream =
ctx.getMessageStreamById(inputId);
- // TODO: support side input
- if (!stagePayload.getSideInputsList().isEmpty()) {
- throw new UnsupportedOperationException(
- "Side inputs in portable pipelines are not supported in samza");
+ // Analyze side inputs
+ final List<MessageStream<OpMessage<Iterable<?>>>> sideInputStreams = new
ArrayList<>();
+ final Map<SideInputId, PCollectionView<?>> sideInputMapping = new
HashMap<>();
+ final Map<String, PCollectionView<?>> idToViewMapping = new HashMap<>();
+ final RunnerApi.Components components = stagePayload.getComponents();
+ for (SideInputId sideInputId : stagePayload.getSideInputsList()) {
+ final String sideInputCollectionId =
+ components
+ .getTransformsOrThrow(sideInputId.getTransformId())
+ .getInputsOrThrow(sideInputId.getLocalName());
+ final WindowingStrategy<?, BoundedWindow> windowingStrategy =
+ ctx.getPortableWindowStrategy(sideInputCollectionId, components);
+ final WindowedValue.WindowedValueCoder<?> coder =
+ (WindowedValue.WindowedValueCoder)
instantiateCoder(sideInputCollectionId, components);
+
+ // Create a runner-side view
+ final PCollectionView<?> view = createPCollectionView(sideInputId,
coder, windowingStrategy);
+
+ // Use GBK to aggregate the side inputs and then broadcast it out
+ final MessageStream<OpMessage<Iterable<?>>> broadcastSideInput =
+ groupAndBroadcastSideInput(
+ sideInputId,
+ sideInputCollectionId,
+ components.getPcollectionsOrThrow(sideInputCollectionId),
+ (WindowingStrategy) windowingStrategy,
+ coder,
+ ctx);
+
+ sideInputStreams.add(broadcastSideInput);
+ sideInputMapping.put(sideInputId, view);
Review comment:
Is this mapping still required later?
Also we should rename `idToViewMapping` -> `SideInputIdToView` to be more
specific
##########
File path: runners/samza/job-server/build.gradle
##########
@@ -102,6 +100,7 @@ createPortableValidatesRunnerTask(
excludeCategories
'org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs'
},
testFilter: {
+ excludeTestsMatching
"org.apache.beam.sdk.transforms.FlattenTest.testEmptyFlattenAsSideInput"
Review comment:
What are we missing for this test?
##########
File path:
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
##########
@@ -274,10 +310,11 @@ public void translatePortable(
windowedInputCoder.getValueCoder(), // input coder not in use
windowedInputCoder,
Collections.emptyMap(), // output coders not in use
- Collections.emptyList(), // sideInputs not in use until side input
support
+ new ArrayList<>(
+ sideInputMapping.values()), // sideInputs not in use until
side input support
Review comment:
Can use idToViewMapping.values() instead since the contents are the same
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]