Another nice thing is that readers can potentially also read from different
sources (historic/latest). To arrive at a general connector pattern, it
will also be necessary to consider the ordering relationship between
restrictions/splits/blocks/segments when it is important for the processing
logic - which is what Jamie refers to. For Kinesis, the most obvious case
is reading parent before child shards, but also throttling unrelated shards
if they are unbalanced WRT event time.

We are now implementing a stop gap watermark solution in custom code
because the Kinesis consumer really needs a revamp as part of general
connector overhaul.


On Thu, Feb 22, 2018 at 2:28 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> While we're on this: https://beam.apache.org/blog/
> 2017/08/16/splittable-do-fn.html
>
> This is a concrete way of separating partition/shard/split discovery from
> their reading. The nice thing about this is that you can mix-and-match
> "discovery components" and "reader components". For example, for Kafka we
> would have a TopicReader and I can envision different discovery
> implementations: one very simple, no-frills, but rock solid, another one
> that does automatic discovery of new partitions, regex matching, etc...
>
>
> > On 22. Feb 2018, at 01:49, Jamie Grier <jgr...@lyft.com> wrote:
> >
> > I know this is a very simplistic idea but...
> >
> > In general the issue Eron is describing occurs whenever two (or more)
> > parallel partitions are assigned to the same Flink sub-task and there is
> > large time delta between them.  This problem exists though largely
> because
> > we are not making any decisions about which of these partitions to read
> and
> > when but rather just treating them all the same.  However, this isn't the
> > only way to approach the problem.
> >
> > Think instead of each partition as a "roughly time sorted" file and the
> > function of the connector as roughly a merge sort type process.  In other
> > words just read the older data first by peeking at each partition and
> > deciding what to read next.  The output of the connector would be a
> roughly
> > time ordered stream that way..
> >
> > However to really solve the whole problem you'd have to carry this idea
> > throughout Flink and be more selective about which data you read and when
> > throughout the whole data flow graph.  Similar problem I think and just
> > something I've been thinking a bit about lately.
> >
> >
> >
> >
> > On Mon, Feb 12, 2018 at 7:12 PM, Eron Wright <eronwri...@gmail.com>
> wrote:
> >
> >> It is valuable to consider the behavior of a consumer in both a
> real-time
> >> processing context, which consists mostly of tail reads, and a
> historical
> >> processing context, where there's an abundance of backlogged data.   In
> the
> >> historical processing context, system internals (e.g. shard selection
> >> logic) have an outsized influence on the order of observation and
> >> potentially the progression of the event time clock.  In a real-time
> >> context, the order of observation is, by definition, mostly determined
> by
> >> the order in which events are produced.
> >>
> >> My point is, it would be good to explore the efficacy of these
> improvements
> >> in both contexts.
> >>
> >>
> >>
> >>
> >> On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise <t...@apache.org> wrote:
> >>
> >>> I don't think there is a generic solution to the problem you are
> >>> describing; we don't know how long it will take for resharding to take
> >>> effect and those changes to become visible to the connector. Depending
> on
> >>> how latency sensitive the pipeline is, possibly a configurable
> watermark
> >>> hold period could be used to cushion the event time chaos introduced by
> >>> resharding.
> >>>
> >>> This isn't the primary motivation for the connector customization I'm
> >>> working on though. We face issues with restart from older checkpoints
> >> where
> >>> parent and child shards are consumed in parallel.
> >>>
> >>>
> >>> --
> >>> sent from mobile
> >>>
> >>>
> >>> On Feb 12, 2018 4:36 PM, "Eron Wright" <eronwri...@gmail.com> wrote:
> >>>
> >>> I'd like to know how you envision dealing with resharding in relation
> to
> >>> the watermark state.   Imagine that a given shard S1 has a watermark of
> >> T1,
> >>> and is then split into two shards S2 and S3.   The new shards are
> >> assigned
> >>> to subtasks according to a hash function.  The current watermarks of
> >> those
> >>> subtasks could be far ahead of T1, and thus the events in S2/S3 will be
> >>> considered late.
> >>>
> >>> The problem of a chaotic event time clock is exacerbated by any source
> >> that
> >>> uses dynamic partitioning.  Would a per-shard watermark generator
> really
> >>> solve the problem that is motivating you?
> >>>
> >>> Thanks,
> >>> Eron
> >>>
> >>> On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise <t...@apache.org> wrote:
> >>>
> >>>> Based on my draft implementation, the changes that are needed in the
> >>> Flink
> >>>> connector are as follows:
> >>>>
> >>>> I need to be able to override the following to track last record
> >>> timestamp
> >>>> and idle time per shard.
> >>>>
> >>>>    protected final void emitRecordAndUpdateState(T record, long
> >>>> recordTimestamp, int shardStateIndex, SequenceNumber
> >> lastSequenceNumber)
> >>> {
> >>>>        synchronized (checkpointLock) {
> >>>>            sourceContext.collectWithTimestamp(record,
> >> recordTimestamp);
> >>>>            updateState(shardStateIndex, lastSequenceNumber);
> >>>>        }
> >>>>    }
> >>>>
> >>>> Any objection removing final from it?
> >>>>
> >>>> Also, why is sourceContext.collectWithTimestamp in the synchronized
> >>> block?
> >>>> My custom class will need to emit watermarks - I assume there is no
> >> need
> >>> to
> >>>> acquire checkpointLock for that? Otherwise I would also need to add
> >>>> emitWatermark() to the base class.
> >>>>
> >>>> Let me know if anything else should be considered, I will open a JIRA
> >> and
> >>>> PR otherwise.
> >>>>
> >>>> Thanks,
> >>>> Thomas
> >>>>
> >>>>
> >>>> On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise <t...@apache.org> wrote:
> >>>>
> >>>>> -->
> >>>>>
> >>>>> On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai <
> >>> tzuli...@apache.org
> >>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Regarding the two hooks you would like to be available:
> >>>>>>
> >>>>>>
> >>>>>>   - Provide hook to override discovery (not to hit Kinesis from
> >> every
> >>>>>>   subtask)
> >>>>>>
> >>>>>> Yes, I think we can easily provide a way, for example setting -1 for
> >>>>>> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
> >>>>>> Though, the user would then have to savepoint and restore in order
> >> to
> >>>>>> pick up new shards after a Kinesis stream reshard (which is in
> >>> practice
> >>>> the
> >>>>>> best way to by-pass the Kinesis API rate limitations).
> >>>>>> +1 to provide that.
> >>>>>>
> >>>>>
> >>>>> I'm considering a customization of KinesisDataFetcher with override
> >> for
> >>>>> discoverNewShardsToSubscribe. We still want shards to be discovered,
> >>> just
> >>>>> not by hitting Kinesis from every subtask.
> >>>>>
> >>>>>
> >>>>>>
> >>>>>>
> >>>>>>   - Provide hook to support custom watermark generation (somewhere
> >>>>>>   around KinesisDataFetcher.emitRecordAndUpdateState)
> >>>>>>
> >>>>>> Per-partition watermark generation on the Kinesis side is slightly
> >>> more
> >>>>>> complex than Kafka, due to how Kinesis’s dynamic resharding works.
> >>>>>> I think we need to additionally allow new shards to be consumed only
> >>>>>> after its parent shard is fully read, otherwise “per-shard time
> >>>>>> characteristics” can be broken because of this out-of-orderness
> >>>> consumption
> >>>>>> across the boundaries of a closed parent shard and its child.
> >>>>>> There theses JIRAs [1][2] which has a bit more details on the topic.
> >>>>>> Otherwise, in general I’m also +1 to providing this also in the
> >>> Kinesis
> >>>>>> consumer.
> >>>>>>
> >>>>>
> >>>>> Here I'm thinking to customize emitRecordAndUpdateState (method would
> >>>> need
> >>>>> to be made non-final). Using getSubscribedShardsState with additional
> >>>>> transient state to keep track of watermark per shard and emit
> >> watermark
> >>>> as
> >>>>> appropriate.
> >>>>>
> >>>>> That's the idea - haven't written any code for it yet.
> >>>>>
> >>>>> Thanks,
> >>>>> Thomas
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Reply via email to