hlteoh37 commented on code in PR #148:
URL:
https://github.com/apache/flink-connector-aws/pull/148#discussion_r1674154510
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java:
##########
@@ -228,6 +231,97 @@ void testWakeUpIsNoOp() {
assertThatNoException().isThrownBy(splitReader::wakeUp);
}
+ @Test
+ void testPauseOrResumeSplits() throws Exception {
+ testStreamProxy.addShards(TEST_SHARD_ID);
+ KinesisShardSplit testSplit = getTestSplit(TEST_SHARD_ID);
+
+ List<Record> expectedRecords =
+ Stream.of(getTestRecord("data-1"), getTestRecord("data-2"))
+ .collect(Collectors.toList());
+ testStreamProxy.addRecords(
+ TestUtil.STREAM_ARN,
+ TEST_SHARD_ID,
+ Collections.singletonList(expectedRecords.get(0)));
+ testStreamProxy.addRecords(
+ TestUtil.STREAM_ARN,
+ TEST_SHARD_ID,
+ Collections.singletonList(expectedRecords.get(1)));
+ splitReader.handleSplitsChanges(new
SplitsAddition<>(Collections.singletonList(testSplit)));
+
+ // read data from split
+ RecordsWithSplitIds<Record> records = splitReader.fetch();
+
assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(0));
+
+ // pause split
+ splitReader.pauseOrResumeSplits(
+ Collections.singletonList(testSplit), Collections.emptyList());
+ records = splitReader.fetch();
+ // returns incomplete split with no records
+ assertThat(records.finishedSplits()).isEmpty();
+ assertThat(records.nextSplit()).isNull();
+ assertThat(records.nextRecordFromSplit()).isNull();
+
+ // resume split
+ splitReader.pauseOrResumeSplits(
+ Collections.emptyList(), Collections.singletonList(testSplit));
+ records = splitReader.fetch();
+
assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(1));
+ }
+
+ @Test
+ void testPauseOrResumeSplitsOnlyPauseReadsFromSpecifiedSplits() throws
Exception {
+ KinesisShardSplit testSplit1 = getTestSplit(generateShardId(1));
+ KinesisShardSplit testSplit2 = getTestSplit(generateShardId(2));
+
+ shardMetricGroupMap.put(
+ testSplit1.splitId(),
+ new KinesisShardMetrics(testSplit1,
metricListener.getMetricGroup()));
+ shardMetricGroupMap.put(
+ testSplit2.splitId(),
+ new KinesisShardMetrics(testSplit2,
metricListener.getMetricGroup()));
+
+ testStreamProxy.addShards(testSplit1.splitId(), testSplit2.splitId());
+
+ List<Record> recordsFromSplit1 =
+ Arrays.asList(getTestRecord("split-1-data-1"),
getTestRecord("split-1-data-2"));
+ List<Record> recordsFromSplit2 =
+ Arrays.asList(
+ getTestRecord("split-2-data-1"),
+ getTestRecord("split-2-data-2"),
+ getTestRecord("split-2-data-3"));
+
+ recordsFromSplit1.forEach(
+ record ->
+ testStreamProxy.addRecords(
+ STREAM_ARN,
+ testSplit1.getShardId(),
+ Collections.singletonList(record)));
+ recordsFromSplit2.forEach(
+ record ->
+ testStreamProxy.addRecords(
+ STREAM_ARN,
+ testSplit2.getShardId(),
+ Collections.singletonList(record)));
+
+ splitReader.handleSplitsChanges(
+ new SplitsAddition<>(Arrays.asList(testSplit1, testSplit2)));
+
+ // pause split 1
+ splitReader.pauseOrResumeSplits(
+ Collections.singletonList(testSplit1),
Collections.emptyList());
+
+ // read data from splits
+ List<Record> fetchedRecords = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ RecordsWithSplitIds<Record> records = splitReader.fetch();
+ fetchedRecords.addAll(readAllRecords(records));
+ }
+
+ // verify that only records from split 2 were fetched by reader
+
assertThat(fetchedRecords).containsExactly(recordsFromSplit2.toArray(new
Record[0]));
Review Comment:
Should we test resuming from the paused split?
Or 3x split, 2x pause and only resume 1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]