Based on my draft implementation, the changes that are needed in the Flink
connector are as follows:
I need to be able to override the following to track last record timestamp
and idle time per shard.
protected final void emitRecordAndUpdateState(T record, long
recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
sourceContext.collectWithTimestamp(record, recordTimestamp);
updateState(shardStateIndex, lastSequenceNumber);
}
}
Any objection removing final from it?
Also, why is sourceContext.collectWithTimestamp in the synchronized block?
My custom class will need to emit watermarks - I assume there is no need to
acquire checkpointLock for that? Otherwise I would also need to add
emitWatermark() to the base class.
Let me know if anything else should be considered, I will open a JIRA and
PR otherwise.
Thanks,
Thomas
On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise <[email protected]> wrote:
> -->
>
> On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai <[email protected]>
> wrote:
>
>> Regarding the two hooks you would like to be available:
>>
>>
>> - Provide hook to override discovery (not to hit Kinesis from every
>> subtask)
>>
>> Yes, I think we can easily provide a way, for example setting -1 for
>> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
>> Though, the user would then have to savepoint and restore in order to
>> pick up new shards after a Kinesis stream reshard (which is in practice the
>> best way to by-pass the Kinesis API rate limitations).
>> +1 to provide that.
>>
>
> I'm considering a customization of KinesisDataFetcher with override for
> discoverNewShardsToSubscribe. We still want shards to be discovered, just
> not by hitting Kinesis from every subtask.
>
>
>>
>>
>> - Provide hook to support custom watermark generation (somewhere
>> around KinesisDataFetcher.emitRecordAndUpdateState)
>>
>> Per-partition watermark generation on the Kinesis side is slightly more
>> complex than Kafka, due to how Kinesis’s dynamic resharding works.
>> I think we need to additionally allow new shards to be consumed only
>> after its parent shard is fully read, otherwise “per-shard time
>> characteristics” can be broken because of this out-of-orderness consumption
>> across the boundaries of a closed parent shard and its child.
>> There theses JIRAs [1][2] which has a bit more details on the topic.
>> Otherwise, in general I’m also +1 to providing this also in the Kinesis
>> consumer.
>>
>
> Here I'm thinking to customize emitRecordAndUpdateState (method would need
> to be made non-final). Using getSubscribedShardsState with additional
> transient state to keep track of watermark per shard and emit watermark as
> appropriate.
>
> That's the idea - haven't written any code for it yet.
>
> Thanks,
> Thomas
>
>