It is valuable to consider the behavior of a consumer in both a real-time processing context, which consists mostly of tail reads, and a historical processing context, where there's an abundance of backlogged data. In the historical processing context, system internals (e.g. shard selection logic) have an outsized influence on the order of observation and potentially the progression of the event time clock. In a real-time context, the order of observation is, by definition, mostly determined by the order in which events are produced.
My point is, it would be good to explore the efficacy of these improvements in both contexts. On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise <t...@apache.org> wrote: > I don't think there is a generic solution to the problem you are > describing; we don't know how long it will take for resharding to take > effect and those changes to become visible to the connector. Depending on > how latency sensitive the pipeline is, possibly a configurable watermark > hold period could be used to cushion the event time chaos introduced by > resharding. > > This isn't the primary motivation for the connector customization I'm > working on though. We face issues with restart from older checkpoints where > parent and child shards are consumed in parallel. > > > -- > sent from mobile > > > On Feb 12, 2018 4:36 PM, "Eron Wright" <eronwri...@gmail.com> wrote: > > I'd like to know how you envision dealing with resharding in relation to > the watermark state. Imagine that a given shard S1 has a watermark of T1, > and is then split into two shards S2 and S3. The new shards are assigned > to subtasks according to a hash function. The current watermarks of those > subtasks could be far ahead of T1, and thus the events in S2/S3 will be > considered late. > > The problem of a chaotic event time clock is exacerbated by any source that > uses dynamic partitioning. Would a per-shard watermark generator really > solve the problem that is motivating you? > > Thanks, > Eron > > On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise <t...@apache.org> wrote: > > > Based on my draft implementation, the changes that are needed in the > Flink > > connector are as follows: > > > > I need to be able to override the following to track last record > timestamp > > and idle time per shard. > > > > protected final void emitRecordAndUpdateState(T record, long > > recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) > { > > synchronized (checkpointLock) { > > sourceContext.collectWithTimestamp(record, recordTimestamp); > > updateState(shardStateIndex, lastSequenceNumber); > > } > > } > > > > Any objection removing final from it? > > > > Also, why is sourceContext.collectWithTimestamp in the synchronized > block? > > My custom class will need to emit watermarks - I assume there is no need > to > > acquire checkpointLock for that? Otherwise I would also need to add > > emitWatermark() to the base class. > > > > Let me know if anything else should be considered, I will open a JIRA and > > PR otherwise. > > > > Thanks, > > Thomas > > > > > > On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise <t...@apache.org> wrote: > > > > > --> > > > > > > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai < > tzuli...@apache.org > > > > > > wrote: > > > > > >> Regarding the two hooks you would like to be available: > > >> > > >> > > >> - Provide hook to override discovery (not to hit Kinesis from every > > >> subtask) > > >> > > >> Yes, I think we can easily provide a way, for example setting -1 for > > >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery. > > >> Though, the user would then have to savepoint and restore in order to > > >> pick up new shards after a Kinesis stream reshard (which is in > practice > > the > > >> best way to by-pass the Kinesis API rate limitations). > > >> +1 to provide that. > > >> > > > > > > I'm considering a customization of KinesisDataFetcher with override for > > > discoverNewShardsToSubscribe. We still want shards to be discovered, > just > > > not by hitting Kinesis from every subtask. > > > > > > > > >> > > >> > > >> - Provide hook to support custom watermark generation (somewhere > > >> around KinesisDataFetcher.emitRecordAndUpdateState) > > >> > > >> Per-partition watermark generation on the Kinesis side is slightly > more > > >> complex than Kafka, due to how Kinesis’s dynamic resharding works. > > >> I think we need to additionally allow new shards to be consumed only > > >> after its parent shard is fully read, otherwise “per-shard time > > >> characteristics” can be broken because of this out-of-orderness > > consumption > > >> across the boundaries of a closed parent shard and its child. > > >> There theses JIRAs [1][2] which has a bit more details on the topic. > > >> Otherwise, in general I’m also +1 to providing this also in the > Kinesis > > >> consumer. > > >> > > > > > > Here I'm thinking to customize emitRecordAndUpdateState (method would > > need > > > to be made non-final). Using getSubscribedShardsState with additional > > > transient state to keep track of watermark per shard and emit watermark > > as > > > appropriate. > > > > > > That's the idea - haven't written any code for it yet. > > > > > > Thanks, > > > Thomas > > > > > > > > >