kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1541202528
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java:
##########
@@ -423,6 +431,76 @@ public void translateNode(
}
}
+ private static class RedistributeByKeyTranslatorBatch<K, InputT>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ Redistribute.RedistributeByKey<K, InputT>> {
+
+ @Override
+ public void translateNode(
+ Redistribute.RedistributeByKey<K, InputT> transform,
FlinkBatchTranslationContext context) {
+ final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
+ // Construct an instance of CoderTypeInformation which contains the
pipeline options.
+ // This will be used to initialized FileSystems.
+ final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType =
+ ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>)
inputDataSet.getType())
+ .withPipelineOptions(context.getPipelineOptions());
+ // We insert a NOOP here to initialize the FileSystems via the above
CoderTypeInformation.
Review Comment:
I couldn't find anything. Guess I'll dive into the history of the line this
is copied from and see if there was something clear. Might be obsolete too.
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java:
##########
@@ -917,6 +919,41 @@ private <K1, K2, V> void groupByKeyAndSortValuesHelper(
}
});
+ registerTransformTranslator(
+ RedistributeByKey.class,
Review Comment:
I don't understand the question? Does this help?
- `Redistribute.arbitrarily()` has the same composite structure as
`Reshuffle.viaRandomKey()`
- `Redistribute.byKey()` has the same composite structure as
`Reshuffle.of()`
- This code here is the Dataflow v1 translation for `Redistribute.byKey()`
which is simplified from the translation of `Reshuffle.of()`
- The proposal for the future is to do even better for the `arbitrarily`
case by having Dataflow expose a primitive rather than the existing cludge on
top of GroupByKey.
This thread didn't mention it but now I realize there's a potential problem
because the purpose of the override was to save some data shuffled by not
reifying the timestamps since they are available elsewhere in Dataflow-specific
shuffle metadata. Now I'm on the fence, because I'd rather not rely on that
always being the case, as it would be update-incompatible to change it, whereas
reifying all metadata in a standard way is robust to changes.
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -301,6 +305,24 @@ private <K, V> void translateReshuffle(
Iterables.getOnlyElement(transform.getOutputsMap().values()),
inputDataStream.rebalance());
}
+ private <K, V> void translateRedistributeByKey(
+ String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext
context) {
+ RunnerApi.PTransform transform =
pipeline.getComponents().getTransformsOrThrow(id);
+ DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+
context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+ context.addDataStream(
+ Iterables.getOnlyElement(transform.getOutputsMap().values()),
inputDataStream.rebalance());
Review Comment:
Yea, we could share the same implementation. I followed my rule that code
should be shared if it is representing the same thing by logical necessity,
otherwise not shared. In this case there are two very similar things that are
temporarily having the same implementation. I don't care too much, could re-use
the same lines of code for now until we choose to diverge. I have a slight
preference for keeping them separate to make it obvious that there is no
logical necessity that they be in sync.
##########
runners/flink/flink_runner.gradle:
##########
@@ -309,6 +311,8 @@ def createValidatesRunnerTask(Map m) {
// Flink reshuffle override does not preserve all metadata
excludeTestsMatching
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
+ // Flink redistribute override does not preserve all metadata
Review Comment:
Yea, I don't know what all is needed. I'll take another look and see if it
is obvious.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java:
##########
@@ -42,6 +42,16 @@
public class ReshuffleTranslator<K, InT, OutT>
implements TransformTranslator<PTransform<PCollection<KV<K, InT>>,
PCollection<KV<K, OutT>>>> {
+ private final String prefix;
+
+ ReshuffleTranslator(String prefix) {
+ this.prefix = prefix;
+ }
+
+ ReshuffleTranslator() {
+ this("rhfl-");
Review Comment:
It was not! Thank you
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentityWindowFn;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedInteger;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * A family of {@link PTransform PTransforms} that returns a {@link
PCollection} equivalent to its
+ * input but functions as an operational hint to a runner that redistributing
the data in some way
+ * is likely useful.
+ */
+public class Redistribute {
+ /** @return a {@link RedistributeArbitrarily} transform with default
configuration. */
+ public static <T> RedistributeArbitrarily<T> arbitrarily() {
+ return new RedistributeArbitrarily<>(null, false);
+ }
+
+ /** @return a {@link RedistributeByKey} transform with default
configuration. */
+ public static <K, V> RedistributeByKey<K, V> byKey() {
+ return new RedistributeByKey<>(false);
+ }
+
+ /**
+ * @param <K> The type of key being reshuffled on.
+ * @param <V> The type of value being reshuffled.
+ */
+ public static class RedistributeByKey<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+ private final boolean allowDuplicates;
+
+ private RedistributeByKey(boolean allowDuplicates) {
+ this.allowDuplicates = allowDuplicates;
+ }
+
+ public RedistributeByKey<K, V> withAllowDuplicates(boolean
newAllowDuplicates) {
+ return new RedistributeByKey<>(newAllowDuplicates);
+ }
+
+ public boolean getAllowDuplicates() {
+ return allowDuplicates;
+ }
+
+ @Override
+ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+ WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
+ // If the input has already had its windows merged, then the GBK that
performed the merge
+ // will have set originalStrategy.getWindowFn() to InvalidWindows,
causing the GBK contained
+ // here to fail. Instead, we install a valid WindowFn that leaves all
windows unchanged.
+ // The TimestampCombiner is set to ensure the GroupByKey does not shift
elements forwards in
+ // time.
+ // Because this outputs as fast as possible, this should not hold the
watermark.
+ Window<KV<K, V>> rewindow =
+ Window.<KV<K, V>>into(
+ new
IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
+ .triggering(new ReshuffleTrigger<>())
+ .discardingFiredPanes()
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
+
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+
+ PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+ input
+ .apply("SetIdentityWindow", rewindow)
+ .apply("ReifyOriginalMetadata", Reify.windowsInValue());
+
+ PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
+ reified.apply(GroupByKey.create());
+ return grouped
+ .apply(
+ "ExpandIterable",
+ ParDo.of(
+ new DoFn<
+ KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K,
ValueInSingleWindow<V>>>() {
+ @ProcessElement
+ public void processElement(
+ @Element KV<K, Iterable<ValueInSingleWindow<V>>>
element,
+ OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
+ K key = element.getKey();
+ for (ValueInSingleWindow<V> value : element.getValue()) {
+ r.output(KV.of(key, value));
+ }
+ }
+ }))
+ .apply("RestoreMetadata", new RestoreMetadata<>())
+ // Set the windowing strategy directly, so that it doesn't get
counted as the user having
+ // set allowed lateness.
+ .setWindowingStrategyInternal(originalStrategy);
+ }
+ }
+
+ /**
+ * @param <K> The type of key being reshuffled on.
+ * @param <V> The type of value being reshuffled.
+ */
+ public static class RedistributeByKeyAllowingDuplicates<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+ @Override
+ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+ return input.apply(Redistribute.byKey());
+ }
+ }
+
+ /**
+ * Noop transform that hints to the runner to try to redistribute the work
evenly, or via whatever
+ * clever strategy the runner comes up with.
+ */
+ public static class RedistributeArbitrarily<T>
+ extends PTransform<PCollection<T>, PCollection<T>> {
+ // The number of buckets to shard into.
+ // A runner is free to ignore this (a runner may ignore the transorm
+ // entirely!) This is a performance optimization to prevent having
+ // unit sized bundles on the output. If unset, uses a random integer key.
+ private @Nullable Integer numBuckets = null;
+ private boolean allowDuplicates = false;
+
+ private RedistributeArbitrarily(@Nullable Integer numBuckets, boolean
allowDuplicates) {
+ this.numBuckets = numBuckets;
+ this.allowDuplicates = allowDuplicates;
+ }
+
+ public RedistributeArbitrarily<T> withNumBuckets(@Nullable Integer
numBuckets) {
+ return new RedistributeArbitrarily<>(numBuckets, this.allowDuplicates);
+ }
+
+ public RedistributeArbitrarily<T> withAllowDuplicates(boolean
allowDuplicates) {
+ return new RedistributeArbitrarily<>(this.numBuckets, allowDuplicates);
+ }
+
+ public boolean getAllowDuplicates() {
+ return allowDuplicates;
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<T> input) {
+ return input
+ .apply("Pair with random key", ParDo.of(new
AssignShardFn<>(numBuckets)))
+ .apply(Redistribute.<Integer,
T>byKey().withAllowDuplicates(this.allowDuplicates))
+ .apply(Values.create());
+ }
+ }
+
+ private static class RestoreMetadata<K, V>
+ extends PTransform<PCollection<KV<K, ValueInSingleWindow<V>>>,
PCollection<KV<K, V>>> {
+ @Override
+ public PCollection<KV<K, V>> expand(PCollection<KV<K,
ValueInSingleWindow<V>>> input) {
+ return input.apply(
+ ParDo.of(
+ new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() {
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return Duration.millis(Long.MAX_VALUE);
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV<K, ValueInSingleWindow<V>> kv,
OutputReceiver<KV<K, V>> r) {
+ r.outputWindowedValue(
+ KV.of(kv.getKey(), kv.getValue().getValue()),
+ kv.getValue().getTimestamp(),
+ Collections.singleton(kv.getValue().getWindow()),
+ kv.getValue().getPane());
+ }
+ }));
+ }
+ }
+
+ public static class AssignShardFn<T> extends DoFn<T, KV<Integer, T>> {
+ private int shard;
+ private @Nullable Integer numBuckets;
+
+ public AssignShardFn(@Nullable Integer numBuckets) {
+ this.numBuckets = numBuckets;
+ }
+
+ @Setup
+ public void setup() {
+ shard = ThreadLocalRandom.current().nextInt();
+ }
+
+ @ProcessElement
+ public void processElement(@Element T element, OutputReceiver<KV<Integer,
T>> r) {
+ ++shard;
+ // Smear the shard into something more random-looking, to avoid issues
+ // with runners that don't properly hash the key being shuffled, but rely
+ // on it being random-looking. E.g. Spark takes the Java hashCode() of
keys,
+ // which for Integer is a no-op and it is an issue:
+ //
http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in-
+ // spark.html
+ // This hashing strategy is copied from
+ //
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear().
+ int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51,
15);
+ if (numBuckets != null) {
+ UnsignedInteger unsignedNumBuckets =
UnsignedInteger.fromIntBits(numBuckets);
Review Comment:
I don't know! Better consult the history of `Reshuffle.java`...
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.NativeTransforms;
+import org.apache.beam.sdk.util.construction.graph.PipelineNode;
+import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Translates Reshuffle transform into Samza's native partitionBy operator,
which will partition
+ * each incoming message by the key into a Task corresponding to that key.
+ */
+public class RedistributeByKeyTranslator<K, V>
+ implements TransformTranslator<PTransform<PCollection<KV<K, V>>,
PCollection<KV<K, V>>>> {
+
+ private final ReshuffleTranslator<K, V, V> reshuffleTranslator =
+ new ReshuffleTranslator<>("rdstr-");
+
+ @Override
+ public void translate(
+ PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> transform,
+ TransformHierarchy.Node node,
+ TranslationContext ctx) {
+ reshuffleTranslator.translate(transform, node, ctx);
+ }
+
+ @Override
+ public void translatePortable(
+ PipelineNode.PTransformNode transform,
+ QueryablePipeline pipeline,
+ PortableTranslationContext ctx) {
+ reshuffleTranslator.translatePortable(transform, pipeline, ctx);
+ }
+
+ /** Predicate to determine whether a URN is a Samza native transform. */
+ @AutoService(NativeTransforms.IsNativeTransform.class)
+ public static class IsSamzaNativeTransform implements
NativeTransforms.IsNativeTransform {
+ @Override
+ public boolean test(RunnerApi.PTransform pTransform) {
+ return false;
+ // Re-enable after https://github.com/apache/beam/issues/21188 is
completed
Review Comment:
Haha I just copy/pasted the comment too. TBH I don't have context here and
I'm going to leave the behavior but alter the comment...
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java:
##########
@@ -119,6 +119,9 @@ public class PTransformTranslation {
public static final String COMBINE_PER_KEY_TRANSFORM_URN =
"beam:transform:combine_per_key:v1";
public static final String COMBINE_GLOBALLY_TRANSFORM_URN =
"beam:transform:combine_globally:v1";
public static final String RESHUFFLE_URN = "beam:transform:reshuffle:v1";
+ public static final String REDISTRIBUTE_BY_KEY_URN =
"beam:transform:redistribute_by_key:v1";
Review Comment:
Yes. I think all of them should migrate over there as well. I've added the
new ones there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]