I'd like to know how you envision dealing with resharding in relation to
the watermark state.   Imagine that a given shard S1 has a watermark of T1,
and is then split into two shards S2 and S3.   The new shards are assigned
to subtasks according to a hash function.  The current watermarks of those
subtasks could be far ahead of T1, and thus the events in S2/S3 will be
considered late.

The problem of a chaotic event time clock is exacerbated by any source that
uses dynamic partitioning.  Would a per-shard watermark generator really
solve the problem that is motivating you?

Thanks,
Eron

On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise <t...@apache.org> wrote:

> Based on my draft implementation, the changes that are needed in the Flink
> connector are as follows:
>
> I need to be able to override the following to track last record timestamp
> and idle time per shard.
>
>     protected final void emitRecordAndUpdateState(T record, long
> recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
>         synchronized (checkpointLock) {
>             sourceContext.collectWithTimestamp(record, recordTimestamp);
>             updateState(shardStateIndex, lastSequenceNumber);
>         }
>     }
>
> Any objection removing final from it?
>
> Also, why is sourceContext.collectWithTimestamp in the synchronized block?
> My custom class will need to emit watermarks - I assume there is no need to
> acquire checkpointLock for that? Otherwise I would also need to add
> emitWatermark() to the base class.
>
> Let me know if anything else should be considered, I will open a JIRA and
> PR otherwise.
>
> Thanks,
> Thomas
>
>
> On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise <t...@apache.org> wrote:
>
> > -->
> >
> > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org
> >
> > wrote:
> >
> >> Regarding the two hooks you would like to be available:
> >>
> >>
> >>    - Provide hook to override discovery (not to hit Kinesis from every
> >>    subtask)
> >>
> >> Yes, I think we can easily provide a way, for example setting -1 for
> >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
> >> Though, the user would then have to savepoint and restore in order to
> >> pick up new shards after a Kinesis stream reshard (which is in practice
> the
> >> best way to by-pass the Kinesis API rate limitations).
> >> +1 to provide that.
> >>
> >
> > I'm considering a customization of KinesisDataFetcher with override for
> > discoverNewShardsToSubscribe. We still want shards to be discovered, just
> > not by hitting Kinesis from every subtask.
> >
> >
> >>
> >>
> >>    - Provide hook to support custom watermark generation (somewhere
> >>    around KinesisDataFetcher.emitRecordAndUpdateState)
> >>
> >> Per-partition watermark generation on the Kinesis side is slightly more
> >> complex than Kafka, due to how Kinesis’s dynamic resharding works.
> >> I think we need to additionally allow new shards to be consumed only
> >> after its parent shard is fully read, otherwise “per-shard time
> >> characteristics” can be broken because of this out-of-orderness
> consumption
> >> across the boundaries of a closed parent shard and its child.
> >> There theses JIRAs [1][2] which has a bit more details on the topic.
> >> Otherwise, in general I’m also +1 to providing this also in the Kinesis
> >> consumer.
> >>
> >
> > Here I'm thinking to customize emitRecordAndUpdateState (method would
> need
> > to be made non-final). Using getSubscribedShardsState with additional
> > transient state to keep track of watermark per shard and emit watermark
> as
> > appropriate.
> >
> > That's the idea - haven't written any code for it yet.
> >
> > Thanks,
> > Thomas
> >
> >
>

Reply via email to