This is an automated email from the ASF dual-hosted git repository.
mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 21e040c14e4 [AWS] Fix bug in KinesisIOReadTest (closes #27666) (#27686)
21e040c14e4 is described below
commit 21e040c14e4c1e6f0b993a3040ca304d800c126c
Author: Moritz Mack <[email protected]>
AuthorDate: Wed Jul 26 16:04:41 2023 +0200
[AWS] Fix bug in KinesisIOReadTest (closes #27666) (#27686)
---
.../org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
diff --git
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java
index 02d28eaad58..82a31b9d037 100644
---
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java
+++
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIOReadTest.java
@@ -111,12 +111,13 @@ public class KinesisIOReadTest {
@Test
public void testReadWithEFOFromShards() {
SubscribeToShardEvent shard0event = eventWithRecords(3);
- SubscribeToShardEvent shard1event = eventWithRecords(3);
- SubscribeToShardEvent shard2event = eventWithRecords(3);
+ SubscribeToShardEvent shard1event = eventWithRecords(4);
+ SubscribeToShardEvent shard2event = eventWithRecords(5);
EFOStubbedKinesisAsyncClient asyncClientStub = new
EFOStubbedKinesisAsyncClient(10);
asyncClientStub.stubSubscribeToShard("0", shard0event);
asyncClientStub.stubSubscribeToShard("1", shard1event);
- asyncClientStub.stubSubscribeToShard("2", shard1event);
+ asyncClientStub.stubSubscribeToShard("2", shard2event);
+
MockClientBuilderFactory.set(p, KinesisAsyncClientBuilder.class,
asyncClientStub);
Iterable<Record> expectedRecords =
concat(shard0event.records(), shard1event.records(),
shard2event.records());
@@ -128,7 +129,7 @@ public class KinesisIOReadTest {
.withConsumerArn("consumer")
.withInitialPositionInStream(TRIM_HORIZON)
.withArrivalTimeWatermarkPolicy()
- .withMaxNumRecords(9);
+ .withMaxNumRecords(12);
PCollection<Record> result = p.apply(read).apply(ParDo.of(new
KinesisIOReadTest.ToRecord()));
PAssert.that(result).containsInAnyOrder(expectedRecords);