This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch release-2.30.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.30.0 by this push:
new bca0085 [BEAM-12361] Ensure that Reshuffle.AssignToShard numBuckets
is respected (#14720)
new fafbbc1 Merge pull request #14836 from ihji/cherry-pick-12361
bca0085 is described below
commit bca00851d1d5a03a4c2f6edfb2a92c6a52920884
Author: Evan Galpin <[email protected]>
AuthorDate: Tue May 18 16:47:37 2021 -0400
[BEAM-12361] Ensure that Reshuffle.AssignToShard numBuckets is respected
(#14720)
* Ensure that Reshuffle.AssignToShard numBuckets is respected
* Removes UsesTestStream.class from AssignShardFn test category
* Unbox numBuckets as int not long
* Removes unnecessary global windowing
* Appeases checker framework
---
.../org/apache/beam/sdk/transforms/Reshuffle.java | 4 +++-
.../apache/beam/sdk/transforms/ReshuffleTest.java | 21 +++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
index 1dbeff3..3ea95f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.UnsignedInteger;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
@@ -157,7 +158,8 @@ public class Reshuffle<K, V> extends
PTransform<PCollection<KV<K, V>>, PCollecti
//
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Hashing.smear().
int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51,
15);
if (numBuckets != null) {
- hashOfShard %= numBuckets;
+ UnsignedInteger unsignedNumBuckets =
UnsignedInteger.fromIntBits(numBuckets);
+ hashOfShard =
UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue();
}
r.output(KV.of(hashOfShard, element));
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
index a07c7c4..d1ab626 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
@@ -34,6 +35,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Reshuffle.AssignShardFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -274,4 +276,23 @@ public class ReshuffleTest implements Serializable {
pipeline.run();
}
+
+ @Test
+ @Category({ValidatesRunner.class})
+ public void testAssignShardFn() {
+
+ PCollection<KV<String, Integer>> input =
+ pipeline.apply(
+
Create.of(ARBITRARY_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of())));
+
+ PCollection<Integer> output =
+ input
+ .apply(ParDo.of(new AssignShardFn<>(2)))
+ .apply(GroupByKey.create())
+ .apply(MapElements.into(integers()).via(KV::getKey));
+
+ PAssert.that(output).containsInAnyOrder(ImmutableList.of(0, 1));
+
+ pipeline.run();
+ }
}