This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch release-2.30.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.30.0 by this push:
     new bca0085  [BEAM-12361] Ensure that Reshuffle.AssignToShard numBuckets 
is respected (#14720)
     new fafbbc1  Merge pull request #14836 from ihji/cherry-pick-12361
bca0085 is described below

commit bca00851d1d5a03a4c2f6edfb2a92c6a52920884
Author: Evan Galpin <[email protected]>
AuthorDate: Tue May 18 16:47:37 2021 -0400

    [BEAM-12361] Ensure that Reshuffle.AssignToShard numBuckets is respected 
(#14720)
    
    * Ensure that Reshuffle.AssignToShard numBuckets is respected
    
    * Removes UsesTestStream.class from AssignShardFn test category
    
    * Unbox numBuckets as int not long
    
    * Removes unnecessary global windowing
    
    * Appeases checker framework
---
 .../org/apache/beam/sdk/transforms/Reshuffle.java   |  4 +++-
 .../apache/beam/sdk/transforms/ReshuffleTest.java   | 21 +++++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
index 1dbeff3..3ea95f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.UnsignedInteger;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 
@@ -157,7 +158,8 @@ public class Reshuffle<K, V> extends 
PTransform<PCollection<KV<K, V>>, PCollecti
       // 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Hashing.smear().
       int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51, 
15);
       if (numBuckets != null) {
-        hashOfShard %= numBuckets;
+        UnsignedInteger unsignedNumBuckets = 
UnsignedInteger.fromIntBits(numBuckets);
+        hashOfShard = 
UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue();
       }
       r.output(KV.of(hashOfShard, element));
     }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
index a07c7c4..d1ab626 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms;
 
 import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
@@ -34,6 +35,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Reshuffle.AssignShardFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -274,4 +276,23 @@ public class ReshuffleTest implements Serializable {
 
     pipeline.run();
   }
+
+  @Test
+  @Category({ValidatesRunner.class})
+  public void testAssignShardFn() {
+
+    PCollection<KV<String, Integer>> input =
+        pipeline.apply(
+            
Create.of(ARBITRARY_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of())));
+
+    PCollection<Integer> output =
+        input
+            .apply(ParDo.of(new AssignShardFn<>(2)))
+            .apply(GroupByKey.create())
+            .apply(MapElements.into(integers()).via(KV::getKey));
+
+    PAssert.that(output).containsInAnyOrder(ImmutableList.of(0, 1));
+
+    pipeline.run();
+  }
 }

Reply via email to