Based on my draft implementation, the changes that are needed in the Flink
connector are as follows:

I need to be able to override the following to track last record timestamp
and idle time per shard.

    protected final void emitRecordAndUpdateState(T record, long
recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
        synchronized (checkpointLock) {
            sourceContext.collectWithTimestamp(record, recordTimestamp);
            updateState(shardStateIndex, lastSequenceNumber);

Any objection removing final from it?

Also, why is sourceContext.collectWithTimestamp in the synchronized block?
My custom class will need to emit watermarks - I assume there is no need to
acquire checkpointLock for that? Otherwise I would also need to add
emitWatermark() to the base class.

Let me know if anything else should be considered, I will open a JIRA and
PR otherwise.


On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise <> wrote:

> -->
> On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai <>
> wrote:
>> Regarding the two hooks you would like to be available:
>>    - Provide hook to override discovery (not to hit Kinesis from every
>>    subtask)
>> Yes, I think we can easily provide a way, for example setting -1 for
>> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
>> Though, the user would then have to savepoint and restore in order to
>> pick up new shards after a Kinesis stream reshard (which is in practice the
>> best way to by-pass the Kinesis API rate limitations).
>> +1 to provide that.
> I'm considering a customization of KinesisDataFetcher with override for
> discoverNewShardsToSubscribe. We still want shards to be discovered, just
> not by hitting Kinesis from every subtask.
>>    - Provide hook to support custom watermark generation (somewhere
>>    around KinesisDataFetcher.emitRecordAndUpdateState)
>> Per-partition watermark generation on the Kinesis side is slightly more
>> complex than Kafka, due to how Kinesis’s dynamic resharding works.
>> I think we need to additionally allow new shards to be consumed only
>> after its parent shard is fully read, otherwise “per-shard time
>> characteristics” can be broken because of this out-of-orderness consumption
>> across the boundaries of a closed parent shard and its child.
>> There theses JIRAs [1][2] which has a bit more details on the topic.
>> Otherwise, in general I’m also +1 to providing this also in the Kinesis
>> consumer.
> Here I'm thinking to customize emitRecordAndUpdateState (method would need
> to be made non-final). Using getSubscribedShardsState with additional
> transient state to keep track of watermark per shard and emit watermark as
> appropriate.
> That's the idea - haven't written any code for it yet.
> Thanks,
> Thomas

Reply via email to