kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1541202528


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java:
##########
@@ -423,6 +431,76 @@ public void translateNode(
     }
   }
 
+  private static class RedistributeByKeyTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          Redistribute.RedistributeByKey<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Redistribute.RedistributeByKey<K, InputT> transform, 
FlinkBatchTranslationContext context) {
+      final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+      // Construct an instance of CoderTypeInformation which contains the 
pipeline options.
+      // This will be used to initialized FileSystems.
+      final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType =
+          ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>) 
inputDataSet.getType())
+              .withPipelineOptions(context.getPipelineOptions());
+      // We insert a NOOP here to initialize the FileSystems via the above 
CoderTypeInformation.

Review Comment:
   I couldn't find anything. Guess I'll dive into the history of the line this 
is copied from and see if there was something clear. Might be obsolete too.



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java:
##########
@@ -917,6 +919,41 @@ private <K1, K2, V> void groupByKeyAndSortValuesHelper(
           }
         });
 
+    registerTransformTranslator(
+        RedistributeByKey.class,

Review Comment:
   I don't understand the question? Does this help?
   
    - `Redistribute.arbitrarily()` has the same composite structure as 
`Reshuffle.viaRandomKey()`
    - `Redistribute.byKey()` has the same composite structure as 
`Reshuffle.of()`
    - This code here is the Dataflow v1 translation for `Redistribute.byKey()` 
which is simplified from the translation of `Reshuffle.of()`
    - The proposal for the future is to do even better for the `arbitrarily` 
case by having Dataflow expose a primitive rather than the existing cludge on 
top of GroupByKey.
   
   This thread didn't mention it but now I realize there's a potential problem 
because the purpose of the override was to save some data shuffled by not 
reifying the timestamps since they are available elsewhere in Dataflow-specific 
shuffle metadata. Now I'm on the fence, because I'd rather not rely on that 
always being the case, as it would be update-incompatible to change it, whereas 
reifying all metadata in a standard way is robust to changes.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -301,6 +305,24 @@ private <K, V> void translateReshuffle(
         Iterables.getOnlyElement(transform.getOutputsMap().values()), 
inputDataStream.rebalance());
   }
 
+  private <K, V> void translateRedistributeByKey(
+      String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext 
context) {
+    RunnerApi.PTransform transform = 
pipeline.getComponents().getTransformsOrThrow(id);
+    DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+        
context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+    context.addDataStream(
+        Iterables.getOnlyElement(transform.getOutputsMap().values()), 
inputDataStream.rebalance());

Review Comment:
   Yea, we could share the same implementation. I followed my rule that code 
should be shared if it is representing the same thing by logical necessity, 
otherwise not shared. In this case there are two very similar things that are 
temporarily having the same implementation. I don't care too much, could re-use 
the same lines of code for now until we choose to diverge. I have a slight 
preference for keeping them separate to make it obvious that there is no 
logical necessity that they be in sync.



##########
runners/flink/flink_runner.gradle:
##########
@@ -309,6 +311,8 @@ def createValidatesRunnerTask(Map m) {
 
         // Flink reshuffle override does not preserve all metadata
         excludeTestsMatching 
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
+        // Flink redistribute override does not preserve all metadata

Review Comment:
   Yea, I don't know what all is needed. I'll take another look and see if it 
is obvious.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java:
##########
@@ -42,6 +42,16 @@
 public class ReshuffleTranslator<K, InT, OutT>
     implements TransformTranslator<PTransform<PCollection<KV<K, InT>>, 
PCollection<KV<K, OutT>>>> {
 
+  private final String prefix;
+
+  ReshuffleTranslator(String prefix) {
+    this.prefix = prefix;
+  }
+
+  ReshuffleTranslator() {
+    this("rhfl-");

Review Comment:
   It was not! Thank you



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentityWindowFn;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import 
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedInteger;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * A family of {@link PTransform PTransforms} that returns a {@link 
PCollection} equivalent to its
+ * input but functions as an operational hint to a runner that redistributing 
the data in some way
+ * is likely useful.
+ */
+public class Redistribute {
+  /** @return a {@link RedistributeArbitrarily} transform with default 
configuration. */
+  public static <T> RedistributeArbitrarily<T> arbitrarily() {
+    return new RedistributeArbitrarily<>(null, false);
+  }
+
+  /** @return a {@link RedistributeByKey} transform with default 
configuration. */
+  public static <K, V> RedistributeByKey<K, V> byKey() {
+    return new RedistributeByKey<>(false);
+  }
+
+  /**
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class RedistributeByKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+    private final boolean allowDuplicates;
+
+    private RedistributeByKey(boolean allowDuplicates) {
+      this.allowDuplicates = allowDuplicates;
+    }
+
+    public RedistributeByKey<K, V> withAllowDuplicates(boolean 
newAllowDuplicates) {
+      return new RedistributeByKey<>(newAllowDuplicates);
+    }
+
+    public boolean getAllowDuplicates() {
+      return allowDuplicates;
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+      WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
+      // If the input has already had its windows merged, then the GBK that 
performed the merge
+      // will have set originalStrategy.getWindowFn() to InvalidWindows, 
causing the GBK contained
+      // here to fail. Instead, we install a valid WindowFn that leaves all 
windows unchanged.
+      // The TimestampCombiner is set to ensure the GroupByKey does not shift 
elements forwards in
+      // time.
+      // Because this outputs as fast as possible, this should not hold the 
watermark.
+      Window<KV<K, V>> rewindow =
+          Window.<KV<K, V>>into(
+                  new 
IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
+              .triggering(new ReshuffleTrigger<>())
+              .discardingFiredPanes()
+              .withTimestampCombiner(TimestampCombiner.EARLIEST)
+              
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+
+      PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+          input
+              .apply("SetIdentityWindow", rewindow)
+              .apply("ReifyOriginalMetadata", Reify.windowsInValue());
+
+      PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
+          reified.apply(GroupByKey.create());
+      return grouped
+          .apply(
+              "ExpandIterable",
+              ParDo.of(
+                  new DoFn<
+                      KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K, 
ValueInSingleWindow<V>>>() {
+                    @ProcessElement
+                    public void processElement(
+                        @Element KV<K, Iterable<ValueInSingleWindow<V>>> 
element,
+                        OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
+                      K key = element.getKey();
+                      for (ValueInSingleWindow<V> value : element.getValue()) {
+                        r.output(KV.of(key, value));
+                      }
+                    }
+                  }))
+          .apply("RestoreMetadata", new RestoreMetadata<>())
+          // Set the windowing strategy directly, so that it doesn't get 
counted as the user having
+          // set allowed lateness.
+          .setWindowingStrategyInternal(originalStrategy);
+    }
+  }
+
+  /**
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class RedistributeByKeyAllowingDuplicates<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+      return input.apply(Redistribute.byKey());
+    }
+  }
+
+  /**
+   * Noop transform that hints to the runner to try to redistribute the work 
evenly, or via whatever
+   * clever strategy the runner comes up with.
+   */
+  public static class RedistributeArbitrarily<T>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+    // The number of buckets to shard into.
+    // A runner is free to ignore this (a runner may ignore the transorm
+    // entirely!) This is a performance optimization to prevent having
+    // unit sized bundles on the output. If unset, uses a random integer key.
+    private @Nullable Integer numBuckets = null;
+    private boolean allowDuplicates = false;
+
+    private RedistributeArbitrarily(@Nullable Integer numBuckets, boolean 
allowDuplicates) {
+      this.numBuckets = numBuckets;
+      this.allowDuplicates = allowDuplicates;
+    }
+
+    public RedistributeArbitrarily<T> withNumBuckets(@Nullable Integer 
numBuckets) {
+      return new RedistributeArbitrarily<>(numBuckets, this.allowDuplicates);
+    }
+
+    public RedistributeArbitrarily<T> withAllowDuplicates(boolean 
allowDuplicates) {
+      return new RedistributeArbitrarily<>(this.numBuckets, allowDuplicates);
+    }
+
+    public boolean getAllowDuplicates() {
+      return allowDuplicates;
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return input
+          .apply("Pair with random key", ParDo.of(new 
AssignShardFn<>(numBuckets)))
+          .apply(Redistribute.<Integer, 
T>byKey().withAllowDuplicates(this.allowDuplicates))
+          .apply(Values.create());
+    }
+  }
+
+  private static class RestoreMetadata<K, V>
+      extends PTransform<PCollection<KV<K, ValueInSingleWindow<V>>>, 
PCollection<KV<K, V>>> {
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, 
ValueInSingleWindow<V>>> input) {
+      return input.apply(
+          ParDo.of(
+              new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() {
+                @Override
+                public Duration getAllowedTimestampSkew() {
+                  return Duration.millis(Long.MAX_VALUE);
+                }
+
+                @ProcessElement
+                public void processElement(
+                    @Element KV<K, ValueInSingleWindow<V>> kv, 
OutputReceiver<KV<K, V>> r) {
+                  r.outputWindowedValue(
+                      KV.of(kv.getKey(), kv.getValue().getValue()),
+                      kv.getValue().getTimestamp(),
+                      Collections.singleton(kv.getValue().getWindow()),
+                      kv.getValue().getPane());
+                }
+              }));
+    }
+  }
+
+  public static class AssignShardFn<T> extends DoFn<T, KV<Integer, T>> {
+    private int shard;
+    private @Nullable Integer numBuckets;
+
+    public AssignShardFn(@Nullable Integer numBuckets) {
+      this.numBuckets = numBuckets;
+    }
+
+    @Setup
+    public void setup() {
+      shard = ThreadLocalRandom.current().nextInt();
+    }
+
+    @ProcessElement
+    public void processElement(@Element T element, OutputReceiver<KV<Integer, 
T>> r) {
+      ++shard;
+      // Smear the shard into something more random-looking, to avoid issues
+      // with runners that don't properly hash the key being shuffled, but rely
+      // on it being random-looking. E.g. Spark takes the Java hashCode() of 
keys,
+      // which for Integer is a no-op and it is an issue:
+      // 
http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in-
+      // spark.html
+      // This hashing strategy is copied from
+      // 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear().
+      int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51, 
15);
+      if (numBuckets != null) {
+        UnsignedInteger unsignedNumBuckets = 
UnsignedInteger.fromIntBits(numBuckets);

Review Comment:
   I don't know! Better consult the history of `Reshuffle.java`...



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.NativeTransforms;
+import org.apache.beam.sdk.util.construction.graph.PipelineNode;
+import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Translates Reshuffle transform into Samza's native partitionBy operator, 
which will partition
+ * each incoming message by the key into a Task corresponding to that key.
+ */
+public class RedistributeByKeyTranslator<K, V>
+    implements TransformTranslator<PTransform<PCollection<KV<K, V>>, 
PCollection<KV<K, V>>>> {
+
+  private final ReshuffleTranslator<K, V, V> reshuffleTranslator =
+      new ReshuffleTranslator<>("rdstr-");
+
+  @Override
+  public void translate(
+      PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> transform,
+      TransformHierarchy.Node node,
+      TranslationContext ctx) {
+    reshuffleTranslator.translate(transform, node, ctx);
+  }
+
+  @Override
+  public void translatePortable(
+      PipelineNode.PTransformNode transform,
+      QueryablePipeline pipeline,
+      PortableTranslationContext ctx) {
+    reshuffleTranslator.translatePortable(transform, pipeline, ctx);
+  }
+
+  /** Predicate to determine whether a URN is a Samza native transform. */
+  @AutoService(NativeTransforms.IsNativeTransform.class)
+  public static class IsSamzaNativeTransform implements 
NativeTransforms.IsNativeTransform {
+    @Override
+    public boolean test(RunnerApi.PTransform pTransform) {
+      return false;
+      // Re-enable after https://github.com/apache/beam/issues/21188 is 
completed

Review Comment:
   Haha I just copy/pasted the comment too. TBH I don't have context here and 
I'm going to leave the behavior but alter the comment...



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java:
##########
@@ -119,6 +119,9 @@ public class PTransformTranslation {
   public static final String COMBINE_PER_KEY_TRANSFORM_URN = 
"beam:transform:combine_per_key:v1";
   public static final String COMBINE_GLOBALLY_TRANSFORM_URN = 
"beam:transform:combine_globally:v1";
   public static final String RESHUFFLE_URN = "beam:transform:reshuffle:v1";
+  public static final String REDISTRIBUTE_BY_KEY_URN = 
"beam:transform:redistribute_by_key:v1";

Review Comment:
   Yes. I think all of them should migrate over there as well. I've added the 
new ones there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to