robertwb commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1538190898
##########
runners/flink/flink_runner.gradle:
##########
@@ -309,6 +311,8 @@ def createValidatesRunnerTask(Map m) {
// Flink reshuffle override does not preserve all metadata
excludeTestsMatching
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
+ // Flink redistribute override does not preserve all metadata
Review Comment:
This seems bad. Is it not possible to fix this (e.g. by reifying the
metadata if needed)? I see that reshuffle didn't, but ideally the new, improved
transform should do things the right way.
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -301,6 +305,24 @@ private <K, V> void translateReshuffle(
Iterables.getOnlyElement(transform.getOutputsMap().values()),
inputDataStream.rebalance());
}
+ private <K, V> void translateRedistributeByKey(
+ String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext
context) {
+ RunnerApi.PTransform transform =
pipeline.getComponents().getTransformsOrThrow(id);
+ DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+
context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+ context.addDataStream(
+ Iterables.getOnlyElement(transform.getOutputsMap().values()),
inputDataStream.rebalance());
Review Comment:
Where is the key being used here? (This implementation looks identical to
the one below.)
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentityWindowFn;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedInteger;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * A family of {@link PTransform PTransforms} that returns a {@link
PCollection} equivalent to its
+ * input but functions as an operational hint to a runner that redistributing
the data in some way
+ * is likely useful.
+ */
+public class Redistribute {
+ /** @return a {@link RedistributeArbitrarily} transform with default
configuration. */
+ public static <T> RedistributeArbitrarily<T> arbitrarily() {
+ return new RedistributeArbitrarily<>(null, false);
+ }
+
+ /** @return a {@link RedistributeByKey} transform with default
configuration. */
+ public static <K, V> RedistributeByKey<K, V> byKey() {
+ return new RedistributeByKey<>(false);
+ }
+
+ /**
+ * @param <K> The type of key being reshuffled on.
+ * @param <V> The type of value being reshuffled.
+ */
+ public static class RedistributeByKey<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+ private final boolean allowDuplicates;
+
+ private RedistributeByKey(boolean allowDuplicates) {
+ this.allowDuplicates = allowDuplicates;
+ }
+
+ public RedistributeByKey<K, V> withAllowDuplicates(boolean
newAllowDuplicates) {
+ return new RedistributeByKey<>(newAllowDuplicates);
+ }
+
+ public boolean getAllowDuplicates() {
+ return allowDuplicates;
+ }
+
+ @Override
+ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+ WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
+ // If the input has already had its windows merged, then the GBK that
performed the merge
+ // will have set originalStrategy.getWindowFn() to InvalidWindows,
causing the GBK contained
+ // here to fail. Instead, we install a valid WindowFn that leaves all
windows unchanged.
+ // The TimestampCombiner is set to ensure the GroupByKey does not shift
elements forwards in
+ // time.
+ // Because this outputs as fast as possible, this should not hold the
watermark.
+ Window<KV<K, V>> rewindow =
+ Window.<KV<K, V>>into(
+ new
IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
+ .triggering(new ReshuffleTrigger<>())
+ .discardingFiredPanes()
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
+
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+
+ PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+ input
+ .apply("SetIdentityWindow", rewindow)
+ .apply("ReifyOriginalMetadata", Reify.windowsInValue());
+
+ PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
+ reified.apply(GroupByKey.create());
+ return grouped
+ .apply(
+ "ExpandIterable",
+ ParDo.of(
+ new DoFn<
+ KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K,
ValueInSingleWindow<V>>>() {
+ @ProcessElement
+ public void processElement(
+ @Element KV<K, Iterable<ValueInSingleWindow<V>>>
element,
+ OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
+ K key = element.getKey();
+ for (ValueInSingleWindow<V> value : element.getValue()) {
+ r.output(KV.of(key, value));
+ }
+ }
+ }))
+ .apply("RestoreMetadata", new RestoreMetadata<>())
+ // Set the windowing strategy directly, so that it doesn't get
counted as the user having
+ // set allowed lateness.
+ .setWindowingStrategyInternal(originalStrategy);
+ }
+ }
+
+ /**
+ * @param <K> The type of key being reshuffled on.
+ * @param <V> The type of value being reshuffled.
+ */
+ public static class RedistributeByKeyAllowingDuplicates<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+ @Override
+ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+ return input.apply(Redistribute.byKey());
+ }
+ }
+
+ /**
+ * Noop transform that hints to the runner to try to redistribute the work
evenly, or via whatever
+ * clever strategy the runner comes up with.
+ */
+ public static class RedistributeArbitrarily<T>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+ // The number of buckets to shard into.
+ // A runner is free to ignore this (a runner may ignore the transorm
+ // entirely!) This is a performance optimization to prevent having
+ // unit sized bundles on the output. If unset, uses a random integer key.
+ private @Nullable Integer numBuckets = null;
+ private boolean allowDuplicates = false;
+
+ private RedistributeArbitrarily(@Nullable Integer numBuckets, boolean
allowDuplicates) {
+ this.numBuckets = numBuckets;
+ this.allowDuplicates = allowDuplicates;
+ }
+
+ public RedistributeArbitrarily<T> withNumBuckets(@Nullable Integer
numBuckets) {
+ return new RedistributeArbitrarily<>(numBuckets, this.allowDuplicates);
+ }
+
+ public RedistributeArbitrarily<T> withAllowDuplicates(boolean
allowDuplicates) {
+ return new RedistributeArbitrarily<>(this.numBuckets, allowDuplicates);
+ }
+
+ public boolean getAllowDuplicates() {
+ return allowDuplicates;
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<T> input) {
+ return input
+ .apply("Pair with random key", ParDo.of(new
AssignShardFn<>(numBuckets)))
+ .apply(Redistribute.<Integer,
T>byKey().withAllowDuplicates(this.allowDuplicates))
+ .apply(Values.create());
+ }
+ }
+
+ private static class RestoreMetadata<K, V>
+ extends PTransform<PCollection<KV<K, ValueInSingleWindow<V>>>,
PCollection<KV<K, V>>> {
+ @Override
+ public PCollection<KV<K, V>> expand(PCollection<KV<K,
ValueInSingleWindow<V>>> input) {
+ return input.apply(
+ ParDo.of(
+ new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() {
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return Duration.millis(Long.MAX_VALUE);
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV<K, ValueInSingleWindow<V>> kv,
OutputReceiver<KV<K, V>> r) {
+ r.outputWindowedValue(
+ KV.of(kv.getKey(), kv.getValue().getValue()),
+ kv.getValue().getTimestamp(),
+ Collections.singleton(kv.getValue().getWindow()),
+ kv.getValue().getPane());
+ }
+ }));
+ }
+ }
+
+ public static class AssignShardFn<T> extends DoFn<T, KV<Integer, T>> {
+ private int shard;
+ private @Nullable Integer numBuckets;
+
+ public AssignShardFn(@Nullable Integer numBuckets) {
+ this.numBuckets = numBuckets;
+ }
+
+ @Setup
+ public void setup() {
+ shard = ThreadLocalRandom.current().nextInt();
+ }
+
+ @ProcessElement
+ public void processElement(@Element T element, OutputReceiver<KV<Integer,
T>> r) {
+ ++shard;
+ // Smear the shard into something more random-looking, to avoid issues
+ // with runners that don't properly hash the key being shuffled, but rely
+ // on it being random-looking. E.g. Spark takes the Java hashCode() of
keys,
+ // which for Integer is a no-op and it is an issue:
+ //
http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in-
+ // spark.html
+ // This hashing strategy is copied from
+ //
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear().
+ int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51,
15);
+ if (numBuckets != null) {
+ UnsignedInteger unsignedNumBuckets =
UnsignedInteger.fromIntBits(numBuckets);
Review Comment:
Why not just do something like Math.abs(hashOfShard) % numBuckets?
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java:
##########
@@ -42,6 +42,16 @@
public class ReshuffleTranslator<K, InT, OutT>
implements TransformTranslator<PTransform<PCollection<KV<K, InT>>,
PCollection<KV<K, OutT>>>> {
+ private final String prefix;
+
+ ReshuffleTranslator(String prefix) {
+ this.prefix = prefix;
+ }
+
+ ReshuffleTranslator() {
+ this("rhfl-");
Review Comment:
Was the dropping of the s in "rshfl" intentional?
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java:
##########
@@ -423,6 +431,76 @@ public void translateNode(
}
}
+ private static class RedistributeByKeyTranslatorBatch<K, InputT>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ Redistribute.RedistributeByKey<K, InputT>> {
+
+ @Override
+ public void translateNode(
+ Redistribute.RedistributeByKey<K, InputT> transform,
FlinkBatchTranslationContext context) {
+ final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
+ // Construct an instance of CoderTypeInformation which contains the
pipeline options.
+ // This will be used to initialized FileSystems.
+ final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType =
+ ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>)
inputDataSet.getType())
+ .withPipelineOptions(context.getPipelineOptions());
+ // We insert a NOOP here to initialize the FileSystems via the above
CoderTypeInformation.
Review Comment:
Is there an upstream bug or other reference about why this hack is needed?
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java:
##########
@@ -917,6 +919,41 @@ private <K1, K2, V> void groupByKeyAndSortValuesHelper(
}
});
+ registerTransformTranslator(
+ RedistributeByKey.class,
Review Comment:
Is there a reason to not simply retain the existing composite (reshuffle)
implementation (which can then be detected and overwritten by any runner?)
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java:
##########
@@ -119,6 +119,9 @@ public class PTransformTranslation {
public static final String COMBINE_PER_KEY_TRANSFORM_URN =
"beam:transform:combine_per_key:v1";
public static final String COMBINE_GLOBALLY_TRANSFORM_URN =
"beam:transform:combine_globally:v1";
public static final String RESHUFFLE_URN = "beam:transform:reshuffle:v1";
+ public static final String REDISTRIBUTE_BY_KEY_URN =
"beam:transform:redistribute_by_key:v1";
Review Comment:
Should these get added as constants in the proto files?
##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto:
##########
@@ -813,6 +813,10 @@ message GroupIntoBatchesPayload {
int64 max_buffering_duration_millis = 2;
}
+message RedistributePayload {
Review Comment:
Looks fine to me.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.NativeTransforms;
+import org.apache.beam.sdk.util.construction.graph.PipelineNode;
+import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Translates Reshuffle transform into Samza's native partitionBy operator,
which will partition
+ * each incoming message by the key into a Task corresponding to that key.
+ */
+public class RedistributeByKeyTranslator<K, V>
+ implements TransformTranslator<PTransform<PCollection<KV<K, V>>,
PCollection<KV<K, V>>>> {
+
+ private final ReshuffleTranslator<K, V, V> reshuffleTranslator =
+ new ReshuffleTranslator<>("rdstr-");
+
+ @Override
+ public void translate(
+ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> transform,
+ TransformHierarchy.Node node,
+ TranslationContext ctx) {
+ reshuffleTranslator.translate(transform, node, ctx);
+ }
+
+ @Override
+ public void translatePortable(
+ PipelineNode.PTransformNode transform,
+ QueryablePipeline pipeline,
+ PortableTranslationContext ctx) {
+ reshuffleTranslator.translatePortable(transform, pipeline, ctx);
+ }
+
+ /** Predicate to determine whether a URN is a Samza native transform. */
+ @AutoService(NativeTransforms.IsNativeTransform.class)
+ public static class IsSamzaNativeTransform implements
NativeTransforms.IsNativeTransform {
+ @Override
+ public boolean test(RunnerApi.PTransform pTransform) {
+ return false;
+ // Re-enable after https://github.com/apache/beam/issues/21188 is
completed
Review Comment:
Isn't this bug what this PR is attempting to address?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]