Just noticed this discussion from @Till Rohrmann's weekly community update and 
I want to share some thoughts from our experiences.

We also encountered the source consuption skew issue before, and we are focused 
on improving this by two possible ways.

1. Control the read strategy by the downstream side. In detail, every input 
channel in downstream task corresponds to the consumption of one upstream 
source task, and we will tag each input channel with watermark to find the 
lowest channel to read in high priority. In essence, we actually rely on the 
mechanism of backpressure. If the channel with highest timestamp is not read by 
downstream task for a while, it will block the corresponding source task to 
read when the buffers are exhausted. It is no need to change the source 
interface in this way, but there are two major concerns: first it will affect 
the barier alignment resulting in checkpoint delayed or expired. Second it can 
not confirm source consumption alignment very precisely, and it is just a best 
effort way. So we gave up this way finally.

2. Add the new component of SourceCoordinator to coordinate the source 
consumption distributedly. For example we can start this componnet in the 
JobManager like the current role of CheckpointCoordinator. Then every source 
task would commnicate with JobManager via current RPC mechanism, maybe we can 
rely on the heartbeat message to attach the consumption progress as the 
payloads. The JobManagerwill accumulator or state all the reported progress and 
then give responses for different source tasks. We can define a protocol for 
indicating the fast soruce task to sleep for specific time for example. To do 
so, the coordinator has the global informations to give the proper decision for 
individuals, so it seems more precise. And it will not affect the barrier 
alignment, because the sleeping fast source can release the lock to emit 
barrier as normal. The only concern is the changes for source interface and may 
affect all related source implementations.

Currently we prefer to the second way to implement and will refer to other good 
points above. :)

Best,
Zhijiang
------------------------------------------------------------------
发件人:Jamie Grier <jgr...@lyft.com.INVALID>
发送时间:2018年10月17日(星期三) 03:28
收件人:dev <dev@flink.apache.org>
主 题:Re: Sharing state between subtasks

Here's a doc I started describing some changes we would like to make
starting with the Kinesis Source.. It describes a refactoring of that code
specifically and also hopefully a pattern and some reusable code we can use
in the other sources as well.  The end goal would be best-effort event-time
synchronization across all Flink sources but we are going to start with the
Kinesis Source first.

Please take a look and please provide thoughts and opinions about the best
state sharing mechanism to use -- that section is left blank and we're
especially looking for input there.

https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing

-Jamie


On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <trohrm...@apache.org> wrote:

> But on the Kafka source level it should be perfectly fine to do what Elias
> proposed. This is of course is not the perfect solution but could bring us
> forward quite a bit. The changes required for this should also be minimal.
> This would become obsolete once we have something like shared state. But
> until then, I think it would worth a try.
>
> Cheers,
> Till
>
> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > The reason this selective reading doesn't work well in Flink in the
> moment
> > is because of checkpointing. For checkpointing, checkpoint barriers
> travel
> > within the streams. If we selectively read from inputs based on
> timestamps
> > this is akin to blocking an input if that input is very far ahead in
> event
> > time, which can happen when you have a very fast source and a slow source
> > (in event time), maybe because you're in a catchup phase. In those cases
> > it's better to simply not read the data at the sources, as Thomas said.
> > This is also because with Kafka Streams, each operator is basically its
> own
> > job: it's reading from Kafka and writing to Kafka and there is not a
> > complex graph of different operations with network shuffles in between,
> as
> > you have with Flink.
> >
> > This different nature of Flink is also why I think that readers need
> > awareness of other readers to do the event-time alignment, and this is
> > where shared state comes in.
> >
> > > On 10. Oct 2018, at 20:47, Elias Levy <fearsome.lucid...@gmail.com>
> > wrote:
> > >
> > > On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fhue...@gmail.com>
> wrote:
> > >
> > >> I think the new source interface would be designed to be able to
> > leverage
> > >> shared state to achieve time alignment.
> > >> I don't think this would be possible without some kind of shared
> state.
> > >>
> > >> The problem of tasks that are far ahead in time cannot be solved with
> > >> back-pressure.
> > >> That's because a task cannot choose from which source task it accepts
> > >> events and from which doesn't.
> > >> If it blocks an input, all downstream tasks that are connected to the
> > >> operator are affected. This can easily lead to deadlocks.
> > >> Therefore, all operators need to be able to handle events when they
> > arrive.
> > >> If they cannot process them yet because they are too far ahead in
> time,
> > >> they are put in state.
> > >>
> > >
> > > The idea I was suggesting is not for operators to block an input.
> > Rather,
> > > it is that they selectively choose from which input to process the next
> > > message from based on their timestamp, so long as there are buffered
> > > messages waiting to be processed.  That is a best-effort alignment
> > > strategy.  Seems to work relatively well in practice, at least within
> > Kafka
> > > Streams.
> > >
> > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
> both
> > > its inputs.  Instead, it could keep them separate and selectively
> consume
> > > from the one that had a buffer available, and if both have buffers
> > > available, from the buffer with the messages with a lower timestamp.
> >
> >
>

Reply via email to