z3d1k commented on code in PR #148:
URL: 
https://github.com/apache/flink-connector-aws/pull/148#discussion_r1674196316


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java:
##########
@@ -228,6 +231,97 @@ void testWakeUpIsNoOp() {
         assertThatNoException().isThrownBy(splitReader::wakeUp);
     }
 
+    @Test
+    void testPauseOrResumeSplits() throws Exception {
+        testStreamProxy.addShards(TEST_SHARD_ID);
+        KinesisShardSplit testSplit = getTestSplit(TEST_SHARD_ID);
+
+        List<Record> expectedRecords =
+                Stream.of(getTestRecord("data-1"), getTestRecord("data-2"))
+                        .collect(Collectors.toList());
+        testStreamProxy.addRecords(
+                TestUtil.STREAM_ARN,
+                TEST_SHARD_ID,
+                Collections.singletonList(expectedRecords.get(0)));
+        testStreamProxy.addRecords(
+                TestUtil.STREAM_ARN,
+                TEST_SHARD_ID,
+                Collections.singletonList(expectedRecords.get(1)));
+        splitReader.handleSplitsChanges(new 
SplitsAddition<>(Collections.singletonList(testSplit)));
+
+        // read data from split
+        RecordsWithSplitIds<Record> records = splitReader.fetch();
+        
assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(0));
+
+        // pause split
+        splitReader.pauseOrResumeSplits(
+                Collections.singletonList(testSplit), Collections.emptyList());
+        records = splitReader.fetch();
+        // returns incomplete split with no records
+        assertThat(records.finishedSplits()).isEmpty();
+        assertThat(records.nextSplit()).isNull();
+        assertThat(records.nextRecordFromSplit()).isNull();
+
+        // resume split
+        splitReader.pauseOrResumeSplits(
+                Collections.emptyList(), Collections.singletonList(testSplit));
+        records = splitReader.fetch();
+        
assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(1));
+    }
+
+    @Test
+    void testPauseOrResumeSplitsOnlyPauseReadsFromSpecifiedSplits() throws 
Exception {
+        KinesisShardSplit testSplit1 = getTestSplit(generateShardId(1));
+        KinesisShardSplit testSplit2 = getTestSplit(generateShardId(2));
+
+        shardMetricGroupMap.put(
+                testSplit1.splitId(),
+                new KinesisShardMetrics(testSplit1, 
metricListener.getMetricGroup()));
+        shardMetricGroupMap.put(
+                testSplit2.splitId(),
+                new KinesisShardMetrics(testSplit2, 
metricListener.getMetricGroup()));
+
+        testStreamProxy.addShards(testSplit1.splitId(), testSplit2.splitId());
+
+        List<Record> recordsFromSplit1 =
+                Arrays.asList(getTestRecord("split-1-data-1"), 
getTestRecord("split-1-data-2"));
+        List<Record> recordsFromSplit2 =
+                Arrays.asList(
+                        getTestRecord("split-2-data-1"),
+                        getTestRecord("split-2-data-2"),
+                        getTestRecord("split-2-data-3"));
+
+        recordsFromSplit1.forEach(
+                record ->
+                        testStreamProxy.addRecords(
+                                STREAM_ARN,
+                                testSplit1.getShardId(),
+                                Collections.singletonList(record)));
+        recordsFromSplit2.forEach(
+                record ->
+                        testStreamProxy.addRecords(
+                                STREAM_ARN,
+                                testSplit2.getShardId(),
+                                Collections.singletonList(record)));
+
+        splitReader.handleSplitsChanges(
+                new SplitsAddition<>(Arrays.asList(testSplit1, testSplit2)));
+
+        // pause split 1
+        splitReader.pauseOrResumeSplits(
+                Collections.singletonList(testSplit1), 
Collections.emptyList());
+
+        // read data from splits
+        List<Record> fetchedRecords = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            RecordsWithSplitIds<Record> records = splitReader.fetch();
+            fetchedRecords.addAll(readAllRecords(records));
+        }
+
+        // verify that only records from split 2 were fetched by reader
+        
assertThat(fetchedRecords).containsExactly(recordsFromSplit2.toArray(new 
Record[0]));

Review Comment:
   Updated test case to use 3 splits, pause 2 and then resume 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to