This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cc114e  [BEAM-12926] Translates Reshuffle with Samza native 
repartition operator  (#15545)
1cc114e is described below

commit 1cc114e01431bea0251063985bea37bc747db673
Author: Ke Wu <[email protected]>
AuthorDate: Mon Sep 27 14:19:56 2021 -0700

    [BEAM-12926] Translates Reshuffle with Samza native repartition operator  
(#15545)
---
 .../beam/runners/samza/SamzaPipelineRunner.java    |  7 +-
 .../samza/translation/ReshuffleTranslator.java     | 82 ++++++++++++++++++++++
 .../samza/translation/SamzaPipelineTranslator.java |  6 +-
 3 files changed, 87 insertions(+), 8 deletions(-)

diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
index bbdd1f5..6bfec1e 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.samza;
 
 import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
@@ -41,16 +40,16 @@ public class SamzaPipelineRunner implements 
PortablePipelineRunner {
   private final SamzaPipelineOptions options;
 
   @Override
-  public PortablePipelineResult run(final Pipeline pipeline, JobInfo jobInfo) {
+  public PortablePipelineResult run(final RunnerApi.Pipeline pipeline, JobInfo 
jobInfo) {
     // Expand any splittable DoFns within the graph to enable sizing and 
splitting of bundles.
-    Pipeline pipelineWithSdfExpanded =
+    RunnerApi.Pipeline pipelineWithSdfExpanded =
         ProtoOverrides.updateTransform(
             PTransformTranslation.PAR_DO_TRANSFORM_URN,
             pipeline,
             SplittableParDoExpander.createSizedReplacement());
 
     // Don't let the fuser fuse any subcomponents of native transforms.
-    Pipeline trimmedPipeline =
+    RunnerApi.Pipeline trimmedPipeline =
         TrivialNativeTransformExpander.forKnownUrns(
             pipelineWithSdfExpanded, 
SamzaPortablePipelineTranslator.knownUrns());
 
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java
new file mode 100644
index 0000000..f014040
--- /dev/null
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.beam.runners.samza.util.SamzaCoders;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.serializers.KVSerde;
+
+/**
+ * Translates Reshuffle transform into Samza's native partitionBy operator, 
which will partition
+ * each incoming message by the key into a Task corresponding to that key.
+ */
+public class ReshuffleTranslator<K, InT, OutT>
+    implements TransformTranslator<PTransform<PCollection<KV<K, InT>>, 
PCollection<KV<K, OutT>>>> {
+
+  @Override
+  public void translate(
+      PTransform<PCollection<KV<K, InT>>, PCollection<KV<K, OutT>>> transform,
+      TransformHierarchy.Node node,
+      TranslationContext ctx) {
+
+    final PCollection<KV<K, InT>> input = ctx.getInput(transform);
+    final PCollection<KV<K, OutT>> output = ctx.getOutput(transform);
+    final MessageStream<OpMessage<KV<K, InT>>> inputStream = 
ctx.getMessageStream(input);
+    // input will be OpMessage of Windowed<KV<K, Iterable<V>>>
+    final KvCoder<K, InT> inputCoder = (KvCoder<K, InT>) input.getCoder();
+    final Coder<WindowedValue<KV<K, InT>>> elementCoder = 
SamzaCoders.of(input);
+
+    final MessageStream<OpMessage<KV<K, InT>>> outputStream =
+        doTranslate(
+            inputStream,
+            inputCoder.getKeyCoder(),
+            elementCoder,
+            "rshfl-" + ctx.getTransformId(),
+            ctx.getPipelineOptions().getMaxSourceParallelism() > 1);
+
+    ctx.registerMessageStream(output, outputStream);
+  }
+
+  private static <K, InT> MessageStream<OpMessage<KV<K, InT>>> doTranslate(
+      MessageStream<OpMessage<KV<K, InT>>> inputStream,
+      Coder<K> keyCoder,
+      Coder<WindowedValue<KV<K, InT>>> valueCoder,
+      String partitionById, // will be used in the intermediate stream name
+      boolean needRepartition) {
+
+    return needRepartition
+        ? inputStream
+            .filter(op -> OpMessage.Type.ELEMENT == op.getType())
+            .partitionBy(
+                opMessage -> opMessage.getElement().getValue().getKey(),
+                OpMessage::getElement, // windowed value
+                KVSerde.of(SamzaCoders.toSerde(keyCoder), 
SamzaCoders.toSerde(valueCoder)),
+                partitionById)
+            // convert back to OpMessage
+            .map(kv -> OpMessage.ofElement(kv.getValue()))
+        : inputStream.filter(op -> OpMessage.Type.ELEMENT == op.getType());
+  }
+}
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
index 3514b4f..1dd779d 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
@@ -33,8 +33,6 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PValue;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** This class knows all the translators from a primitive BEAM transform to a 
Samza operator. */
 @SuppressWarnings({
@@ -42,7 +40,6 @@ import org.slf4j.LoggerFactory;
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class SamzaPipelineTranslator {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaPipelineTranslator.class);
 
   private static final Map<String, TransformTranslator<?>> TRANSLATORS = 
loadTranslators();
 
@@ -117,7 +114,7 @@ public class SamzaPipelineTranslator {
   }
 
   private static class SamzaPipelineVisitor extends 
Pipeline.PipelineVisitor.Defaults {
-    private TransformVisitorFn visitorFn;
+    private final TransformVisitorFn visitorFn;
 
     private SamzaPipelineVisitor(TransformVisitorFn visitorFn) {
       this.visitorFn = visitorFn;
@@ -177,6 +174,7 @@ public class SamzaPipelineTranslator {
     public Map<String, TransformTranslator<?>> getTransformTranslators() {
       return ImmutableMap.<String, TransformTranslator<?>>builder()
           .put(PTransformTranslation.READ_TRANSFORM_URN, new 
ReadTranslator<>())
+          .put(PTransformTranslation.RESHUFFLE_URN, new 
ReshuffleTranslator<>())
           .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new 
ParDoBoundMultiTranslator<>())
           .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new 
GroupByKeyTranslator<>())
           .put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new 
GroupByKeyTranslator<>())

Reply via email to