tweise commented on a change in pull request #6980: [FLINK-5697] [kinesis] Add
periodic per-shard watermark support
URL: https://github.com/apache/flink/pull/6980#discussion_r235154450
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
##########
@@ -609,7 +667,115 @@ public int
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
this.numberOfActiveShards.incrementAndGet();
}
- return subscribedShardsState.size() - 1;
+ int shardStateIndex = subscribedShardsState.size() - 1;
+
+ // track all discovered shards for watermark
determination
+ ShardWatermarkState sws =
shardWatermarks.get(shardStateIndex);
+ if (sws == null) {
+ sws = new ShardWatermarkState();
+ try {
+ sws.periodicWatermarkAssigner =
InstantiationUtil.clone(periodicWatermarkAssigner);
Review comment:
So the watermark assigner is a serializable object, which is the only way it
can be set on the client and deployed to the TM. So this is just creating the
per thread instances using the same mechanism. Note that there is no provision
for a factory method in the core interface.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services