kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1578071267


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java:
##########
@@ -42,6 +42,16 @@
 public class ReshuffleTranslator<K, InT, OutT>
     implements TransformTranslator<PTransform<PCollection<KV<K, InT>>, 
PCollection<KV<K, OutT>>>> {
 
+  private final String prefix;
+
+  ReshuffleTranslator(String prefix) {
+    this.prefix = prefix;
+  }
+
+  ReshuffleTranslator() {
+    this("rhfl-");

Review Comment:
   fixed



##########
runners/flink/flink_runner.gradle:
##########
@@ -309,6 +311,8 @@ def createValidatesRunnerTask(Map m) {
 
         // Flink reshuffle override does not preserve all metadata
         excludeTestsMatching 
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata'
+        // Flink redistribute override does not preserve all metadata

Review Comment:
   OK I re-added and it succeeded. I think I added this during debugging. The 
non-portable runner works, if I have invoked the tests right. I'll watch the CI 
results.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.NativeTransforms;
+import org.apache.beam.sdk.util.construction.graph.PipelineNode;
+import org.apache.beam.sdk.util.construction.graph.QueryablePipeline;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Translates Reshuffle transform into Samza's native partitionBy operator, 
which will partition
+ * each incoming message by the key into a Task corresponding to that key.
+ */
+public class RedistributeByKeyTranslator<K, V>
+    implements TransformTranslator<PTransform<PCollection<KV<K, V>>, 
PCollection<KV<K, V>>>> {
+
+  private final ReshuffleTranslator<K, V, V> reshuffleTranslator =
+      new ReshuffleTranslator<>("rdstr-");
+
+  @Override
+  public void translate(
+      PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> transform,
+      TransformHierarchy.Node node,
+      TranslationContext ctx) {
+    reshuffleTranslator.translate(transform, node, ctx);
+  }
+
+  @Override
+  public void translatePortable(
+      PipelineNode.PTransformNode transform,
+      QueryablePipeline pipeline,
+      PortableTranslationContext ctx) {
+    reshuffleTranslator.translatePortable(transform, pipeline, ctx);
+  }
+
+  /** Predicate to determine whether a URN is a Samza native transform. */
+  @AutoService(NativeTransforms.IsNativeTransform.class)
+  public static class IsSamzaNativeTransform implements 
NativeTransforms.IsNativeTransform {
+    @Override
+    public boolean test(RunnerApi.PTransform pTransform) {
+      return false;
+      // Re-enable after https://github.com/apache/beam/issues/21188 is 
completed

Review Comment:
   Just removed this comment. It is simply not a native transform, which is a 
hack someone added to support having transforms that are not a Beam primitive.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentityWindowFn;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.SdkComponents;
+import 
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedInteger;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * A family of {@link PTransform PTransforms} that returns a {@link 
PCollection} equivalent to its
+ * input but functions as an operational hint to a runner that redistributing 
the data in some way
+ * is likely useful.
+ */
+public class Redistribute {
+  /** @return a {@link RedistributeArbitrarily} transform with default 
configuration. */
+  public static <T> RedistributeArbitrarily<T> arbitrarily() {
+    return new RedistributeArbitrarily<>(null, false);
+  }
+
+  /** @return a {@link RedistributeByKey} transform with default 
configuration. */
+  public static <K, V> RedistributeByKey<K, V> byKey() {
+    return new RedistributeByKey<>(false);
+  }
+
+  /**
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class RedistributeByKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+    private final boolean allowDuplicates;
+
+    private RedistributeByKey(boolean allowDuplicates) {
+      this.allowDuplicates = allowDuplicates;
+    }
+
+    public RedistributeByKey<K, V> withAllowDuplicates(boolean 
newAllowDuplicates) {
+      return new RedistributeByKey<>(newAllowDuplicates);
+    }
+
+    public boolean getAllowDuplicates() {
+      return allowDuplicates;
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+      WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
+      // If the input has already had its windows merged, then the GBK that 
performed the merge
+      // will have set originalStrategy.getWindowFn() to InvalidWindows, 
causing the GBK contained
+      // here to fail. Instead, we install a valid WindowFn that leaves all 
windows unchanged.
+      // The TimestampCombiner is set to ensure the GroupByKey does not shift 
elements forwards in
+      // time.
+      // Because this outputs as fast as possible, this should not hold the 
watermark.
+      Window<KV<K, V>> rewindow =
+          Window.<KV<K, V>>into(
+                  new 
IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
+              .triggering(new ReshuffleTrigger<>())
+              .discardingFiredPanes()
+              .withTimestampCombiner(TimestampCombiner.EARLIEST)
+              
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+
+      PCollection<KV<K, ValueInSingleWindow<V>>> reified =
+          input
+              .apply("SetIdentityWindow", rewindow)
+              .apply("ReifyOriginalMetadata", Reify.windowsInValue());
+
+      PCollection<KV<K, Iterable<ValueInSingleWindow<V>>>> grouped =
+          reified.apply(GroupByKey.create());
+      return grouped
+          .apply(
+              "ExpandIterable",
+              ParDo.of(
+                  new DoFn<
+                      KV<K, Iterable<ValueInSingleWindow<V>>>, KV<K, 
ValueInSingleWindow<V>>>() {
+                    @ProcessElement
+                    public void processElement(
+                        @Element KV<K, Iterable<ValueInSingleWindow<V>>> 
element,
+                        OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
+                      K key = element.getKey();
+                      for (ValueInSingleWindow<V> value : element.getValue()) {
+                        r.output(KV.of(key, value));
+                      }
+                    }
+                  }))
+          .apply("RestoreMetadata", new RestoreMetadata<>())
+          // Set the windowing strategy directly, so that it doesn't get 
counted as the user having
+          // set allowed lateness.
+          .setWindowingStrategyInternal(originalStrategy);
+    }
+  }
+
+  /**
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class RedistributeByKeyAllowingDuplicates<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
+      return input.apply(Redistribute.byKey());
+    }
+  }
+
+  /**
+   * Noop transform that hints to the runner to try to redistribute the work 
evenly, or via whatever
+   * clever strategy the runner comes up with.
+   */
+  public static class RedistributeArbitrarily<T>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+    // The number of buckets to shard into.
+    // A runner is free to ignore this (a runner may ignore the transorm
+    // entirely!) This is a performance optimization to prevent having
+    // unit sized bundles on the output. If unset, uses a random integer key.
+    private @Nullable Integer numBuckets = null;
+    private boolean allowDuplicates = false;
+
+    private RedistributeArbitrarily(@Nullable Integer numBuckets, boolean 
allowDuplicates) {
+      this.numBuckets = numBuckets;
+      this.allowDuplicates = allowDuplicates;
+    }
+
+    public RedistributeArbitrarily<T> withNumBuckets(@Nullable Integer 
numBuckets) {
+      return new RedistributeArbitrarily<>(numBuckets, this.allowDuplicates);
+    }
+
+    public RedistributeArbitrarily<T> withAllowDuplicates(boolean 
allowDuplicates) {
+      return new RedistributeArbitrarily<>(this.numBuckets, allowDuplicates);
+    }
+
+    public boolean getAllowDuplicates() {
+      return allowDuplicates;
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return input
+          .apply("Pair with random key", ParDo.of(new 
AssignShardFn<>(numBuckets)))
+          .apply(Redistribute.<Integer, 
T>byKey().withAllowDuplicates(this.allowDuplicates))
+          .apply(Values.create());
+    }
+  }
+
+  private static class RestoreMetadata<K, V>
+      extends PTransform<PCollection<KV<K, ValueInSingleWindow<V>>>, 
PCollection<KV<K, V>>> {
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<KV<K, 
ValueInSingleWindow<V>>> input) {
+      return input.apply(
+          ParDo.of(
+              new DoFn<KV<K, ValueInSingleWindow<V>>, KV<K, V>>() {
+                @Override
+                public Duration getAllowedTimestampSkew() {
+                  return Duration.millis(Long.MAX_VALUE);
+                }
+
+                @ProcessElement
+                public void processElement(
+                    @Element KV<K, ValueInSingleWindow<V>> kv, 
OutputReceiver<KV<K, V>> r) {
+                  r.outputWindowedValue(
+                      KV.of(kv.getKey(), kv.getValue().getValue()),
+                      kv.getValue().getTimestamp(),
+                      Collections.singleton(kv.getValue().getWindow()),
+                      kv.getValue().getPane());
+                }
+              }));
+    }
+  }
+
+  public static class AssignShardFn<T> extends DoFn<T, KV<Integer, T>> {
+    private int shard;
+    private @Nullable Integer numBuckets;
+
+    public AssignShardFn(@Nullable Integer numBuckets) {
+      this.numBuckets = numBuckets;
+    }
+
+    @Setup
+    public void setup() {
+      shard = ThreadLocalRandom.current().nextInt();
+    }
+
+    @ProcessElement
+    public void processElement(@Element T element, OutputReceiver<KV<Integer, 
T>> r) {
+      ++shard;
+      // Smear the shard into something more random-looking, to avoid issues
+      // with runners that don't properly hash the key being shuffled, but rely
+      // on it being random-looking. E.g. Spark takes the Java hashCode() of 
keys,
+      // which for Integer is a no-op and it is an issue:
+      // 
http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in-
+      // spark.html
+      // This hashing strategy is copied from
+      // 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear().
+      int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51, 
15);
+      if (numBuckets != null) {
+        UnsignedInteger unsignedNumBuckets = 
UnsignedInteger.fromIntBits(numBuckets);

Review Comment:
   In other words I'm just trying to not rock the boat. I think this is update 
compatible in the sense that if we change the shard assignment for redistribute 
arbitrarily we are fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to