This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1cc114e [BEAM-12926] Translates Reshuffle with Samza native
repartition operator (#15545)
1cc114e is described below
commit 1cc114e01431bea0251063985bea37bc747db673
Author: Ke Wu <[email protected]>
AuthorDate: Mon Sep 27 14:19:56 2021 -0700
[BEAM-12926] Translates Reshuffle with Samza native repartition operator
(#15545)
---
.../beam/runners/samza/SamzaPipelineRunner.java | 7 +-
.../samza/translation/ReshuffleTranslator.java | 82 ++++++++++++++++++++++
.../samza/translation/SamzaPipelineTranslator.java | 6 +-
3 files changed, 87 insertions(+), 8 deletions(-)
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
index bbdd1f5..6bfec1e 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.samza;
import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
@@ -41,16 +40,16 @@ public class SamzaPipelineRunner implements
PortablePipelineRunner {
private final SamzaPipelineOptions options;
@Override
- public PortablePipelineResult run(final Pipeline pipeline, JobInfo jobInfo) {
+ public PortablePipelineResult run(final RunnerApi.Pipeline pipeline, JobInfo
jobInfo) {
// Expand any splittable DoFns within the graph to enable sizing and
splitting of bundles.
- Pipeline pipelineWithSdfExpanded =
+ RunnerApi.Pipeline pipelineWithSdfExpanded =
ProtoOverrides.updateTransform(
PTransformTranslation.PAR_DO_TRANSFORM_URN,
pipeline,
SplittableParDoExpander.createSizedReplacement());
// Don't let the fuser fuse any subcomponents of native transforms.
- Pipeline trimmedPipeline =
+ RunnerApi.Pipeline trimmedPipeline =
TrivialNativeTransformExpander.forKnownUrns(
pipelineWithSdfExpanded,
SamzaPortablePipelineTranslator.knownUrns());
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java
new file mode 100644
index 0000000..f014040
--- /dev/null
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.beam.runners.samza.util.SamzaCoders;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.serializers.KVSerde;
+
+/**
+ * Translates Reshuffle transform into Samza's native partitionBy operator,
which will partition
+ * each incoming message by the key into a Task corresponding to that key.
+ */
+public class ReshuffleTranslator<K, InT, OutT>
+ implements TransformTranslator<PTransform<PCollection<KV<K, InT>>,
PCollection<KV<K, OutT>>>> {
+
+ @Override
+ public void translate(
+ PTransform<PCollection<KV<K, InT>>, PCollection<KV<K, OutT>>> transform,
+ TransformHierarchy.Node node,
+ TranslationContext ctx) {
+
+ final PCollection<KV<K, InT>> input = ctx.getInput(transform);
+ final PCollection<KV<K, OutT>> output = ctx.getOutput(transform);
+ final MessageStream<OpMessage<KV<K, InT>>> inputStream =
ctx.getMessageStream(input);
+ // input will be OpMessage of Windowed<KV<K, Iterable<V>>>
+ final KvCoder<K, InT> inputCoder = (KvCoder<K, InT>) input.getCoder();
+ final Coder<WindowedValue<KV<K, InT>>> elementCoder =
SamzaCoders.of(input);
+
+ final MessageStream<OpMessage<KV<K, InT>>> outputStream =
+ doTranslate(
+ inputStream,
+ inputCoder.getKeyCoder(),
+ elementCoder,
+ "rshfl-" + ctx.getTransformId(),
+ ctx.getPipelineOptions().getMaxSourceParallelism() > 1);
+
+ ctx.registerMessageStream(output, outputStream);
+ }
+
+ private static <K, InT> MessageStream<OpMessage<KV<K, InT>>> doTranslate(
+ MessageStream<OpMessage<KV<K, InT>>> inputStream,
+ Coder<K> keyCoder,
+ Coder<WindowedValue<KV<K, InT>>> valueCoder,
+ String partitionById, // will be used in the intermediate stream name
+ boolean needRepartition) {
+
+ return needRepartition
+ ? inputStream
+ .filter(op -> OpMessage.Type.ELEMENT == op.getType())
+ .partitionBy(
+ opMessage -> opMessage.getElement().getValue().getKey(),
+ OpMessage::getElement, // windowed value
+ KVSerde.of(SamzaCoders.toSerde(keyCoder),
SamzaCoders.toSerde(valueCoder)),
+ partitionById)
+ // convert back to OpMessage
+ .map(kv -> OpMessage.ofElement(kv.getValue()))
+ : inputStream.filter(op -> OpMessage.Type.ELEMENT == op.getType());
+ }
+}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
index 3514b4f..1dd779d 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
@@ -33,8 +33,6 @@ import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PValue;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/** This class knows all the translators from a primitive BEAM transform to a
Samza operator. */
@SuppressWarnings({
@@ -42,7 +40,6 @@ import org.slf4j.LoggerFactory;
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class SamzaPipelineTranslator {
- private static final Logger LOG =
LoggerFactory.getLogger(SamzaPipelineTranslator.class);
private static final Map<String, TransformTranslator<?>> TRANSLATORS =
loadTranslators();
@@ -117,7 +114,7 @@ public class SamzaPipelineTranslator {
}
private static class SamzaPipelineVisitor extends
Pipeline.PipelineVisitor.Defaults {
- private TransformVisitorFn visitorFn;
+ private final TransformVisitorFn visitorFn;
private SamzaPipelineVisitor(TransformVisitorFn visitorFn) {
this.visitorFn = visitorFn;
@@ -177,6 +174,7 @@ public class SamzaPipelineTranslator {
public Map<String, TransformTranslator<?>> getTransformTranslators() {
return ImmutableMap.<String, TransformTranslator<?>>builder()
.put(PTransformTranslation.READ_TRANSFORM_URN, new
ReadTranslator<>())
+ .put(PTransformTranslation.RESHUFFLE_URN, new
ReshuffleTranslator<>())
.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new
ParDoBoundMultiTranslator<>())
.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new
GroupByKeyTranslator<>())
.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new
GroupByKeyTranslator<>())