This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f3b1f5e [BEAM-12470] Increase input size of
ReshuffleTest.testAssignShardFn to avoid flakiness
new 81a46a9 Merge pull request #14981 from y1chi/BEAM-12470
f3b1f5e is described below
commit f3b1f5e9429150bf5ce6e5d21c00ca641bc0d955
Author: Yichi Zhang <[email protected]>
AuthorDate: Wed Jun 9 12:38:23 2021 -0700
[BEAM-12470] Increase input size of ReshuffleTest.testAssignShardFn to
avoid flakiness
---
.../test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
index d1ab626..0a1dbff 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
@@ -26,6 +26,7 @@ import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import java.io.Serializable;
+import java.util.List;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -47,6 +48,7 @@ import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
@@ -280,10 +282,14 @@ public class ReshuffleTest implements Serializable {
@Test
@Category({ValidatesRunner.class})
public void testAssignShardFn() {
+ List<KV<String, Integer>> inputKvs = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ inputKvs.addAll(ARBITRARY_KVS);
+ }
PCollection<KV<String, Integer>> input =
pipeline.apply(
-
Create.of(ARBITRARY_KVS).withCoder(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of())));
+ Create.of(inputKvs).withCoder(KvCoder.of(StringUtf8Coder.of(),
VarIntCoder.of())));
PCollection<Integer> output =
input