xinyuiscool commented on a change in pull request #14883:
URL: https://github.com/apache/beam/pull/14883#discussion_r640158357



##########
File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
##########
@@ -223,15 +236,39 @@ public void translatePortable(
     String inputId = stagePayload.getInput();
     final MessageStream<OpMessage<InT>> inputStream = 
ctx.getMessageStreamById(inputId);
 
-    // TODO: support side input
-    if (!stagePayload.getSideInputsList().isEmpty()) {
-      throw new UnsupportedOperationException(
-          "Side inputs in portable pipelines are not supported in samza");
+    // Analyze side inputs
+    final List<MessageStream<OpMessage<Iterable<?>>>> sideInputStreams = new 
ArrayList<>();
+    final Map<SideInputId, PCollectionView<?>> sideInputMapping = new 
HashMap<>();
+    final Map<String, PCollectionView<?>> idToViewMapping = new HashMap<>();
+    final RunnerApi.Components components = stagePayload.getComponents();
+    for (SideInputId sideInputId : stagePayload.getSideInputsList()) {
+      final String sideInputCollectionId =
+          components
+              .getTransformsOrThrow(sideInputId.getTransformId())
+              .getInputsOrThrow(sideInputId.getLocalName());
+      final WindowingStrategy<?, BoundedWindow> windowingStrategy =
+          ctx.getPortableWindowStrategy(sideInputCollectionId, components);
+      final WindowedValue.WindowedValueCoder<?> coder =
+          (WindowedValue.WindowedValueCoder) 
instantiateCoder(sideInputCollectionId, components);
+
+      // Create a runner-side view
+      final PCollectionView<?> view = createPCollectionView(sideInputId, 
coder, windowingStrategy);
+
+      // Use GBK to aggregate the side inputs and then broadcast it out
+      final MessageStream<OpMessage<Iterable<?>>> broadcastSideInput =
+          groupAndBroadcastSideInput(
+              sideInputId,
+              sideInputCollectionId,
+              components.getPcollectionsOrThrow(sideInputCollectionId),
+              (WindowingStrategy) windowingStrategy,
+              coder,
+              ctx);
+
+      sideInputStreams.add(broadcastSideInput);
+      sideInputMapping.put(sideInputId, view);

Review comment:
       Currently we need two mappings, one is map from the view id to view, 
that's the idToViewMapping, used by us to look up the view from a samza 
OpMessage. The other map is from the side input Id to the view mapping, used by 
sideinputHandler. The naming is a bit hard. I put some java comments in the 
variable names.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to