I'd like to know how you envision dealing with resharding in relation to the watermark state. Imagine that a given shard S1 has a watermark of T1, and is then split into two shards S2 and S3. The new shards are assigned to subtasks according to a hash function. The current watermarks of those subtasks could be far ahead of T1, and thus the events in S2/S3 will be considered late.
The problem of a chaotic event time clock is exacerbated by any source that uses dynamic partitioning. Would a per-shard watermark generator really solve the problem that is motivating you? Thanks, Eron On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise <t...@apache.org> wrote: > Based on my draft implementation, the changes that are needed in the Flink > connector are as follows: > > I need to be able to override the following to track last record timestamp > and idle time per shard. > > protected final void emitRecordAndUpdateState(T record, long > recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { > synchronized (checkpointLock) { > sourceContext.collectWithTimestamp(record, recordTimestamp); > updateState(shardStateIndex, lastSequenceNumber); > } > } > > Any objection removing final from it? > > Also, why is sourceContext.collectWithTimestamp in the synchronized block? > My custom class will need to emit watermarks - I assume there is no need to > acquire checkpointLock for that? Otherwise I would also need to add > emitWatermark() to the base class. > > Let me know if anything else should be considered, I will open a JIRA and > PR otherwise. > > Thanks, > Thomas > > > On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise <t...@apache.org> wrote: > > > --> > > > > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org > > > > wrote: > > > >> Regarding the two hooks you would like to be available: > >> > >> > >> - Provide hook to override discovery (not to hit Kinesis from every > >> subtask) > >> > >> Yes, I think we can easily provide a way, for example setting -1 for > >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery. > >> Though, the user would then have to savepoint and restore in order to > >> pick up new shards after a Kinesis stream reshard (which is in practice > the > >> best way to by-pass the Kinesis API rate limitations). > >> +1 to provide that. > >> > > > > I'm considering a customization of KinesisDataFetcher with override for > > discoverNewShardsToSubscribe. We still want shards to be discovered, just > > not by hitting Kinesis from every subtask. > > > > > >> > >> > >> - Provide hook to support custom watermark generation (somewhere > >> around KinesisDataFetcher.emitRecordAndUpdateState) > >> > >> Per-partition watermark generation on the Kinesis side is slightly more > >> complex than Kafka, due to how Kinesis’s dynamic resharding works. > >> I think we need to additionally allow new shards to be consumed only > >> after its parent shard is fully read, otherwise “per-shard time > >> characteristics” can be broken because of this out-of-orderness > consumption > >> across the boundaries of a closed parent shard and its child. > >> There theses JIRAs [1][2] which has a bit more details on the topic. > >> Otherwise, in general I’m also +1 to providing this also in the Kinesis > >> consumer. > >> > > > > Here I'm thinking to customize emitRecordAndUpdateState (method would > need > > to be made non-final). Using getSubscribedShardsState with additional > > transient state to keep track of watermark per shard and emit watermark > as > > appropriate. > > > > That's the idea - haven't written any code for it yet. > > > > Thanks, > > Thomas > > > > >