karubian commented on code in PR #145:
URL:
https://github.com/apache/flink-connector-aws/pull/145#discussion_r1670455315
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -143,17 +156,101 @@ public void addReader(int subtaskId) {
@Override
public KinesisStreamsSourceEnumeratorState snapshotState(long
checkpointId) throws Exception {
- return new KinesisStreamsSourceEnumeratorState(unassignedSplits,
lastSeenShardId);
+ List<KinesisShardSplitWithAssignmentStatus> splitStates =
+ splitTracker.snapshotState(checkpointId);
+ return new KinesisStreamsSourceEnumeratorState(splitStates,
lastSeenShardId);
}
@Override
public void close() throws IOException {
streamProxy.close();
}
- private List<KinesisShardSplit> initialDiscoverSplits() {
- List<Shard> shards = streamProxy.listShards(streamArn,
lastSeenShardId);
- return mapToSplits(shards, sourceConfig.get(STREAM_INITIAL_POSITION));
+ @VisibleForTesting
Review Comment:
Can we avoid these "VisibleForTesting" methods by modifying the
TestKinesisStreamProxy or adding a test implementation of this class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]