Thanks Piotrek,

> void SplitReader#addSplit(Split)
> boolean SplitReader#doesWantMoreSplits()

I have two questions about this API.
1. What if the SplitReader implementation cannot easily add a split to read
on the fly?
2. Does Flink have to be involved in splits assignment?

I am wondering if it would be simpler to let the enumerator indicate
whether a split reassignment is needed. If the answer is yes, Flink can
just start from the beginning to get all the splits and create one reader
per split. This might be a little more expensive than dynamically adding a
split to a reader, but given that the splits change should be rare, it is
probably acceptable.

In the Kafka case, the SplitT may just be a consumer. The enumerator will
simply check if the topic has new partitions to be assigned to this reader.

@Biao,
If I understand correctly, the concern you raised was that a Source may
return a lot of splits and thus Flink may have to create a lot of fetcher
threads. This is a valid concern, but I cannot think of a solution to that.
After all, the SplitReaders may be written by third parties. Poor
implementations seem difficult to prevent.

Thanks,

Jiangjie (Becket) Qin

On Wed, Nov 21, 2018 at 10:13 PM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi again,
>
> > However I don't like the thread mode which starts a thread for each
> split.
> > Starting extra thread in operator is not an ideal way IMO. Especially
> > thread count is decided by split count. So I was wondering if there is a
> > more elegant way. Do we really want these threads in Flink core?
>
> Biao you have raised an important issue. Indeed it seems like the current
> proposal is missing something. I would guess that we need a mechanism for
> adding new splits to an already existing SplitReader and some logic to
> determine whether current instance can accept more splits or not. For
> example
>
> void SplitReader#addSplit(Split)
> boolean SplitReader#doesWantMoreSplits()
>
> Flink could randomly/round robin assign new splits to the SplitReaders
> that `doWantMoreSplits()`. Batch file readers might implement some custom
> logic in `doesWantMoreSplits()`, like one SplitReader can have at most N
> enqueued splits?
>
> Also what about Kafka. Isn’t it the case that one KafkaConsumer can read
> from multiple splits? So Kafka’s SplitReader should always return true from
> `doesWantMoreSplits()`?
>
> What do you think?
>
> Re: Becket
>
> I’m +1 for Sync and AsyncSplitReader.
>
> Piotrek
>
> > On 21 Nov 2018, at 14:49, Becket Qin <becket....@gmail.com> wrote:
> >
> > Hi Aljoscha,
> >
> > Good point on the potential optimization in the source. One thing to
> > clarify, by "adding a minimumTimestamp()/maximumTimestamp() method pair
> to
> > the split interface", did you mean "split reader interface"? If so, what
> > should the readers do if they do not have such additional information? I
> am
> > wondering if it is possible to leave such optimization to the source
> > internal implementation.
> >
> > @all
> > After reading all the feedback, Biao and I talked a little bit offline.
> We
> > would like to share some new thoughts with you and see what do you think.
> >
> > When looking at the Source API, we were trying to answer two questions.
> > First of all, how would Flink use this API if someone else implemented
> it.
> > Secondly, how would the connector contributors implement the interface?
> How
> > difficult is the implementation.
> >
> > KafkaConsumer is a typical example of a thread-less reader. The idea was
> to
> > allow different threading model on top of it. It could be a global single
> > thread handles record fetching and processing in an event loop pattern;
> it
> > could also be one dedicated fetcher thread for each consumer and a
> separate
> > thread pool for record processing. The API gives the freedom of picking
> up
> > threading model to the users. To answer the first question, I would love
> to
> > have such a source reader API so Flink can choose whatever threading
> model
> > it wants. However, implementing such an interface could be pretty
> > challenging and error prone.
> >
> > On the other hand, having a source reader with a naive blocking socket is
> > probably simple enough in most cases (actually sometimes this might even
> be
> > the most efficient way). But it does not leave much option to Flink other
> > than creating one thread per reader.
> >
> > Given the above thoughts, it might be reasonable to separate the
> > SplitReader API into two: SyncReader and AsyncReader. The sync reader
> just
> > has a simple blocking takeNext() API. And the AsyncReader just has a
> > pollNext(Callback) or Future<?> pollNext(). All the other methods are
> > shared by both readers and could be put into a package private parent
> > interface like BaseSplitReader.
> >
> > Having these two readers allows both complicated and simple
> implementation,
> > depending on the SplitReader writers. From Flink's perspective, it will
> > choose a more efficient threading model if the SplitReader is an
> > AsyncReader. Otherwise, it may have to use the one thread per reader
> model
> > if the reader is a SyncReader. Users can also choose to implement both
> > interface, in that case, it is up to Flink to choose which interface to
> use.
> >
> > Admittedly, this solution does have one more interface, but still seems
> > rewarding. Any thoughts?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Sun, Nov 18, 2018 at 11:33 PM Biao Liu <mmyy1...@gmail.com> wrote:
> >
> >> Hi community,
> >>
> >> Thank you guys for sharing ideas.
> >>
> >> The thing I really concern is about the thread mode.
> >> Actually in Alibaba, we have implemented our "split reader" based source
> >> two years ago. That's based on "SourceFunction", it's just an extension
> not
> >> a refactoring. It's almost same with the version Thomas and Jamie
> described
> >> in Google Doc. It really helps in many scenarios.
> >>
> >> However I don't like the thread mode which starts a thread for each
> split.
> >> Starting extra thread in operator is not an ideal way IMO. Especially
> >> thread count is decided by split count. So I was wondering if there is a
> >> more elegant way. Do we really want these threads in Flink core?
> >>
> >> I agree that blocking interface is more easy to implement. Could we at
> >> least separate the split reader with source function into different
> >> interfaces? Not all sources would like to read all splits concurrently.
> In
> >> batch scenario, reading splits one by one is more general. And also not
> all
> >> sources are partitioned, right?
> >> I prefer there is a new source interface with "pull mode" only, no
> split.
> >> There is a splittable source extended it. And there is one
> implementation
> >> that starting threads for each split, reading all splits concurrently.
> >>
> >>
> >> Thomas Weise <t...@apache.org> 于2018年11月18日周日 上午3:18写道:
> >>
> >>> @Aljoscha to address your question first: In the case of the Kinesis
> >>> consumer (with current Kinesis consumer API), there would also be N+1
> >>> threads. I have implemented a prototype similar to what is shown in
> >> Jamie's
> >>> document, where the thread ownership is similar to what you have done
> for
> >>> Kafka.
> >>>
> >>> The equivalent of split reader manages its own thread and the "source
> >> main
> >>> thread" is responsible for emitting the data. The interface between
> the N
> >>> reader threads and the 1 emitter is a blocking queue per consumer
> thread.
> >>> The emitter can now control which queue to consume from based on the
> >> event
> >>> time progress.
> >>>
> >>> This is akin to a "non-blocking" interface *between emitter and split
> >>> reader*. Emitter uses poll to retrieve records from the N queues (which
> >>> requires non-blocking interaction). The emitter is independent of the
> >> split
> >>> reader implementation, that part could live in Flink.
> >>>
> >>> Regarding whether or not to assume that split readers always need a
> >> thread
> >>> and in addition that these reader threads should be managed by Flink:
> It
> >>> depends on the API of respective external systems and I would not bake
> >> that
> >>> assumption into Flink. Some client libraries manage their own threads
> >> (see
> >>> push based API like JMS and as I understand it may also apply to the
> new
> >>> fan-out Kinesis API:
> >>>
> >>>
> >>
> https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html
> >>> ).
> >>> In such cases it would not make sense to layer another reader thread on
> >>> top. It may instead be better if Flink provides to the split reader the
> >>> queue/buffer to push records to.
> >>>
> >>> The discussion so far has largely ignored the discovery aspect. There
> are
> >>> some important considerations such as ordering dependency of splits and
> >>> work rebalancing that may affect the split reader interface. Should we
> >> fork
> >>> this into a separate thread?
> >>>
> >>> Thanks,
> >>> Thomas
> >>>
> >>>
> >>> On Fri, Nov 16, 2018 at 8:09 AM Piotr Nowojski <
> pi...@data-artisans.com>
> >>> wrote:
> >>>
> >>>> Hi Jamie,
> >>>>
> >>>> As it was already covered with my discussion with Becket, there is an
> >>> easy
> >>>> way to provide blocking API on top of non-blocking API. And yes we
> both
> >>>> agreed that blocking API is easier to implement by users.
> >>>>
> >>>> I also do not agree with respect to usefulness of non blocking API.
> >>>> Actually Kafka connector is the one that could be more efficient
> thanks
> >>> to
> >>>> the removal of the one layer of threading.
> >>>>
> >>>> Piotrek
> >>>>
> >>>>> On 16 Nov 2018, at 02:21, Jamie Grier <jgr...@lyft.com.INVALID>
> >> wrote:
> >>>>>
> >>>>> Thanks Aljoscha for getting this effort going!
> >>>>>
> >>>>> There's been plenty of discussion here already and I'll add my big +1
> >>> to
> >>>>> making this interface very simple to implement for a new
> >>>>> Source/SplitReader.  Writing a new production quality connector for
> >>> Flink
> >>>>> is very difficult today and requires a lot of detailed knowledge
> >> about
> >>>>> Flink, event time progress, watermarking, idle shard detection, etc
> >> and
> >>>> it
> >>>>> would be good to move almost all of this type of code into Flink
> >> itself
> >>>> and
> >>>>> out of source implementations.  I also think this is totally doable
> >> and
> >>>> I'm
> >>>>> really excited to see this happening.
> >>>>>
> >>>>> I do have a couple of thoughts about the API and the implementation..
> >>>>>
> >>>>> In a perfect world there would be a single thread per Flink source
> >>>> sub-task
> >>>>> and no additional threads for SplitReaders -- but this assumes a
> >> world
> >>>>> where you have true async IO APIs for the upstream systems (like
> >> Kafka
> >>>> and
> >>>>> Kinesis, S3, HDFS, etc).  If that world did exist the single thread
> >>> could
> >>>>> just sit in an efficient select() call waiting for new data to arrive
> >>> on
> >>>>> any Split.  That'd be awesome..
> >>>>>
> >>>>> But, that world doesn't exist and given that practical consideration
> >> I
> >>>>> would think the next best implementation is going to be, in practice,
> >>>>> probably a thread per SplitReader that does nothing but call the
> >> source
> >>>> API
> >>>>> and drop whatever it reads into a (blocking) queue -- as Aljoscha
> >>>> mentioned
> >>>>> (calling it N+1) and as we started to describe here:
> >>>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit#heading=h.lfi2u4fv9cxa
> >>>>>
> >>>>> I guess my point is that I think we should strive to move as much of
> >>>>> something like the diagram referenced in the above doc into Flink
> >>> itself
> >>>>> and out of sources and simplify the SplitReader API as much as
> >> possible
> >>>> as
> >>>>> well.
> >>>>>
> >>>>> With the above in mind and with regard to the discussion about
> >>> blocking,
> >>>>> etc..  I'm not sure I agree with some of the discussion so far with
> >>>> regard
> >>>>> to this API design.  The calls to the upstream systems
> >> (kafka/kinesis)
> >>>> are
> >>>>> in fact going to be blocking calls.  So a simple API without the
> >>>> constraint
> >>>>> that the methods must be implemented in a non-blocking way seems
> >> better
> >>>> to
> >>>>> me from the point of view of somebody writing a new source
> >>>> implementation.
> >>>>> My concern is that if you force the implementer of the SplitReader
> >>>>> interface to do so in a non-blocking way you're just going to make it
> >>>>> harder to write those implementations.  Those calls to read the next
> >>> bit
> >>>> of
> >>>>> data are going to be blocking calls with most known important sources
> >>> --
> >>>> at
> >>>>> least Kafka/Kinesis/HDFS -- so I think maybe we should just deal with
> >>>> that
> >>>>> head on and work around it a higher level so the SplitReader
> >> interface
> >>>>> stays super simple to implement.  This means we manage all the
> >>> threading
> >>>> in
> >>>>> Flink core, the API stays pull-based, and the implementer is allowed
> >> to
> >>>>> simply block until they have data to return.
> >>>>>
> >>>>> I maybe would change my mind about this if truly asynchronous APIs to
> >>> the
> >>>>> upstream source systems were likely to be available in the near
> >> future
> >>> or
> >>>>> are now and I'm just ignorant of it.  But even then the supporting
> >> code
> >>>> in
> >>>>> Flink to drive async and sync sources would be different and in fact
> >>> they
> >>>>> might just have different APIs altogether -- SplitReader vs
> >>>>> AsyncSplitReader maybe.
> >>>>>
> >>>>> In the end I think playing with the implementation, across more than
> >>> one
> >>>>> source, and moving as much common code into Flink itself will reveal
> >>> the
> >>>>> best API of course.
> >>>>>
> >>>>> One other interesting note is that you need to preserve per-partition
> >>>>> ordering so you have to take care with the implementation if it were
> >> to
> >>>> be
> >>>>> based on a thread pool and futures so as not to reorder the reads.
> >>>>>
> >>>>> Anyway, I'm thrilled to see this starting to move forward and I'd
> >> very
> >>>> much
> >>>>> like to help with the implementation wherever I can.  We're doing a
> >>>>> simplified internal version of some of this at Lyft for just Kinesis
> >>>>> because we need a solution for event time alignment in the very short
> >>>> term
> >>>>> but we'd like to immediately start helping to do this properly in
> >> Flink
> >>>>> after that.  One of the end goals for us is event time alignment
> >> across
> >>>>> heterogeneous sources.  Another is making it possible for non-expert
> >>>> users
> >>>>> to have a high probability of being able to write their own, correct,
> >>>>> connectors.
> >>>>>
> >>>>> -Jamie
> >>>>>
> >>>>> On Thu, Nov 15, 2018 at 3:43 AM Aljoscha Krettek <
> >> aljos...@apache.org>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> I thought I had sent this mail a while ago but I must have forgotten
> >>> to
> >>>>>> send it.
> >>>>>>
> >>>>>> There is another thing we should consider for splits: the range of
> >>>>>> timestamps that it can contain. For example, the splits of a file
> >>> source
> >>>>>> would know what the minimum and maximum timestamp in the splits is,
> >>>>>> roughly. For infinite splits, such as Kafka partitions, the minimum
> >>>> would
> >>>>>> be meaningful but the maximum would be +Inf. If the splits expose
> >> the
> >>>>>> interval of time that they contain the readers, or the component
> >> that
> >>>>>> manages the readers can make decisions about which splits to forward
> >>> and
> >>>>>> read first. And it can also influence the minimum watermark that a
> >>>> reader
> >>>>>> forwards: it should never emit a watermark if it knows there are
> >>> splits
> >>>> to
> >>>>>> read that have a lower minimum timestamp. I think it should be as
> >> easy
> >>>> as
> >>>>>> adding a minimumTimestamp()/maximumTimestamp() method pair to the
> >>> split
> >>>>>> interface.
> >>>>>>
> >>>>>> Another thing we need to resolve is the actual reader interface. I
> >> see
> >>>>>> there has been some good discussion but I don't know if we have
> >>>> consensus.
> >>>>>> We should try and see how specific sources could be implemented with
> >>> the
> >>>>>> new interface. For example, for Kafka I think we need to have N+1
> >>>> threads
> >>>>>> per task (where N is the number of splits that a task is reading
> >>> from).
> >>>> On
> >>>>>> thread is responsible for reading from the splits. And each split
> >> has
> >>>> its
> >>>>>> own (internal) thread for reading from Kafka and putting messages in
> >>> an
> >>>>>> internal queue to pull from. This is similar to how the current
> >> Kafka
> >>>>>> source is implemented, which has a separate fetcher thread. The
> >> reason
> >>>> for
> >>>>>> this split is that we always need to try reading from Kafka to keep
> >>> the
> >>>>>> throughput up. In the current implementation the internal queue (or
> >>>>>> handover) limits the read rate of the reader threads.
> >>>>>>
> >>>>>> @Thomas, what do you think this would look like for Kinesis?
> >>>>>>
> >>>>>> Best,
> >>>>>> Aljoscha
> >>>>>>
> >>>>>>> On 15. Nov 2018, at 03:56, Becket Qin <becket....@gmail.com>
> >> wrote:
> >>>>>>>
> >>>>>>> Hi Piotrek,
> >>>>>>>
> >>>>>>> Thanks a lot for the detailed reply. All makes sense to me.
> >>>>>>>
> >>>>>>> WRT the confusion between advance() / getCurrent(), do you think it
> >>>> would
> >>>>>>> help if we combine them and have something like:
> >>>>>>>
> >>>>>>> CompletableFuture<T> getNext();
> >>>>>>> long getWatermark();
> >>>>>>> long getCurrentTimestamp();
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>>
> >>>>>>> Jiangjie (Becket) Qin
> >>>>>>>
> >>>>>>> On Tue, Nov 13, 2018 at 9:56 PM Piotr Nowojski <
> >>>> pi...@data-artisans.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> Thanks again for the detailed answer :) Sorry for responding with
> >> a
> >>>>>> delay.
> >>>>>>>>
> >>>>>>>>> Completely agree that in pattern 2, having a callback is
> >> necessary
> >>>> for
> >>>>>>>> that
> >>>>>>>>> single thread outside of the connectors. And the connectors MUST
> >>> have
> >>>>>>>>> internal threads.
> >>>>>>>>
> >>>>>>>> Yes, this thread will have to exists somewhere. In pattern 2 it
> >>> exists
> >>>>>> in
> >>>>>>>> the connector (at least from the perspective of the Flink
> >> execution
> >>>>>>>> engine). In pattern 1 it exists inside the Flink execution engine.
> >>>> With
> >>>>>>>> completely blocking connectors, like simple reading from files,
> >> both
> >>>> of
> >>>>>>>> those approaches are basically the same. The difference is when
> >> user
> >>>>>>>> implementing Flink source is already working with a non blocking
> >>> code
> >>>>>> with
> >>>>>>>> some internal threads. In this case, pattern 1 would result in
> >>> "double
> >>>>>>>> thread wrapping”, while pattern 2 would allow to skip one layer of
> >>>>>>>> indirection.
> >>>>>>>>
> >>>>>>>>> If we go that way, we should have something like "void
> >>>>>>>>> poll(Callback) / void advance(callback)". I am curious how would
> >>>>>>>>> CompletableFuture work here, though. If 10 readers returns 10
> >>>>>> completable
> >>>>>>>>> futures, will there be 10 additional threads (so 20 threads in
> >>> total)
> >>>>>>>>> blocking waiting on them? Or will there be a single thread busy
> >>> loop
> >>>>>>>>> checking around?
> >>>>>>>>
> >>>>>>>> To be honest, I haven’t thought this completely through and I
> >>> haven’t
> >>>>>>>> tested/POC’ed it. Having said that, I can think of at least couple
> >>> of
> >>>>>>>> solutions. First is something like this:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
> >>>>>>>> <
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>> Line:
> >>>>>>>>
> >>>>>>>>                              `blocked = split.process();`
> >>>>>>>>
> >>>>>>>> Is where the execution goes into to the task/sources. This is
> >> where
> >>>> the
> >>>>>>>> returned future is handled:
> >>>>>>>>
> >>>>>>>>                              blocked.addListener(() -> {
> >>>>>>>>                                  blockedSplits.remove(split);
> >>>>>>>>                                  // reset the level priority to
> >>>>>> prevent
> >>>>>>>> previously-blocked splits from starving existing splits
> >>>>>>>>                                  split.resetLevelPriority();
> >>>>>>>>                                  waitingSplits.offer(split);
> >>>>>>>>                              }, executor);
> >>>>>>>>
> >>>>>>>> Fundamentally callbacks and Futures are more or less
> >> interchangeable
> >>>> You
> >>>>>>>> can always wrap one into another (creating a callback that
> >>> completes a
> >>>>>>>> future and attach a callback once future completes). In this case
> >>> the
> >>>>>>>> difference for me is mostly:
> >>>>>>>> - api with passing callback allows the callback to be fired
> >> multiple
> >>>>>> times
> >>>>>>>> and to fire it even if the connector is not blocked. This is what
> >> I
> >>>>>> meant
> >>>>>>>> by saying that api `CompletableFuture<?> isBlocked()` is a bit
> >>>> simpler.
> >>>>>>>> Connector can only return either “I’m not blocked” or “I’m blocked
> >>>> and I
> >>>>>>>> will tell you only once when I’m not blocked anymore”.
> >>>>>>>>
> >>>>>>>> But this is not the most important thing for me here. For me
> >>> important
> >>>>>>>> thing is to try our best to make Flink task’s control and
> >> execution
> >>>>>> single
> >>>>>>>> threaded. For that both callback and future APIs should work the
> >>> same.
> >>>>>>>>
> >>>>>>>>> WRT pattern 1, a single blocking take() API should just work. The
> >>>> good
> >>>>>>>>> thing is that a blocking read API is usually simpler to
> >> implement.
> >>>>>>>>
> >>>>>>>> Yes, they are easier to implement (especially if you are not the
> >> one
> >>>>>> that
> >>>>>>>> have to deal with the additional threading required around them ;)
> >>> ).
> >>>>>> But
> >>>>>>>> to answer this issue, if we choose pattern 2, we can always
> >> provide
> >>> a
> >>>>>>>> proxy/wrapper that would using the internal thread implement the
> >>>>>>>> non-blocking API while exposing blocking API to the user. It would
> >>>>>>>> implement pattern 2 for the user exposing to him pattern 1. In
> >> other
> >>>>>> words
> >>>>>>>> implementing pattern 1 in pattern 2 paradigm, while making it
> >>> possible
> >>>>>> to
> >>>>>>>> implement pure pattern 2 connectors.
> >>>>>>>>
> >>>>>>>>> BTW, one thing I am also trying to avoid is pushing users to
> >>> perform
> >>>> IO
> >>>>>>>> in
> >>>>>>>>> a method like "isBlocked()". If the method is expected to fetch
> >>>> records
> >>>>>>>>> (even if not returning them), naming it something more explicit
> >>> would
> >>>>>>>> help
> >>>>>>>>> avoid confusion.
> >>>>>>>>
> >>>>>>>> If we choose so, we could rework it into something like:
> >>>>>>>>
> >>>>>>>> CompletableFuture<?> advance()
> >>>>>>>> T getCurrent();
> >>>>>>>> Watermark getCurrentWatermark()
> >>>>>>>>
> >>>>>>>> But as I wrote before, this is more confusing to me for the exact
> >>>>>> reasons
> >>>>>>>> you mentioned :) I would be confused what should be done in
> >>>> `adanvce()`
> >>>>>> and
> >>>>>>>> what in `getCurrent()`. However, again this naming issue is not
> >> that
> >>>>>>>> important to me and probably is matter of taste/personal
> >>> preferences.
> >>>>>>>>
> >>>>>>>> Piotrek
> >>>>>>>>
> >>>>>>>>> On 9 Nov 2018, at 18:37, Becket Qin <becket....@gmail.com>
> >> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Piotrek,
> >>>>>>>>>
> >>>>>>>>> Thanks for the explanation. We are probably talking about the
> >> same
> >>>>>> thing
> >>>>>>>>> but in different ways. To clarify a little bit, I think there are
> >>> two
> >>>>>>>>> patterns to read from a connector.
> >>>>>>>>>
> >>>>>>>>> Pattern 1: Thread-less connector with a blocking read API.
> >> Outside
> >>> of
> >>>>>> the
> >>>>>>>>> connector, there is one IO thread per reader, doing blocking
> >> read.
> >>> An
> >>>>>>>>> additional thread will interact with all the IO threads.
> >>>>>>>>> Pattern 2: Connector with internal thread(s) and non-blocking
> >> API.
> >>>>>>>> Outside
> >>>>>>>>> of the connector, there is one thread for ALL readers, doing IO
> >>>> relying
> >>>>>>>> on
> >>>>>>>>> notification callbacks in the reader.
> >>>>>>>>>
> >>>>>>>>> In both patterns, there must be at least one thread per
> >> connector,
> >>>>>> either
> >>>>>>>>> inside (created by connector writers) or outside (created by
> >> Flink)
> >>>> of
> >>>>>>>> the
> >>>>>>>>> connector. Ideally there are NUM_CONNECTORS + 1 threads in total,
> >>> to
> >>>>>> make
> >>>>>>>>> sure that 1 thread is fully non-blocking.
> >>>>>>>>>
> >>>>>>>>>> Btw, I don’t know if you understand my point. Having only
> >> `poll()`
> >>>> and
> >>>>>>>>> `take()` is not enough for single threaded task. If our source
> >>>>>> interface
> >>>>>>>>> doesn’t provide `notify()` callback nor >`CompletableFuture<?>
> >>>>>>>>> isBlocked(),`, there is no way to implement single threaded task
> >>> that
> >>>>>>>> both
> >>>>>>>>> reads the data from the source connector and can also react to
> >>> system
> >>>>>>>>> events. Ok, non >blocking `poll()` would allow that, but with
> >> busy
> >>>>>>>> looping.
> >>>>>>>>>
> >>>>>>>>> Completely agree that in pattern 2, having a callback is
> >> necessary
> >>>> for
> >>>>>>>> that
> >>>>>>>>> single thread outside of the connectors. And the connectors MUST
> >>> have
> >>>>>>>>> internal threads. If we go that way, we should have something
> >> like
> >>>>>> "void
> >>>>>>>>> poll(Callback) / void advance(callback)". I am curious how would
> >>>>>>>>> CompletableFuture work here, though. If 10 readers returns 10
> >>>>>> completable
> >>>>>>>>> futures, will there be 10 additional threads (so 20 threads in
> >>> total)
> >>>>>>>>> blocking waiting on them? Or will there be a single thread busy
> >>> loop
> >>>>>>>>> checking around?
> >>>>>>>>>
> >>>>>>>>> WRT pattern 1, a single blocking take() API should just work. The
> >>>> good
> >>>>>>>>> thing is that a blocking read API is usually simpler to
> >> implement.
> >>> An
> >>>>>>>>> additional non-blocking "T poll()" method here is indeed optional
> >>> and
> >>>>>>>> could
> >>>>>>>>> be used in cases like Flink does not want the thread to block
> >>>> forever.
> >>>>>>>> They
> >>>>>>>>> can also be combined to have a "T poll(Timeout)", which is
> >> exactly
> >>>> what
> >>>>>>>>> KafkaConsumer did.
> >>>>>>>>>
> >>>>>>>>> It sounds that you are proposing pattern 2 with something similar
> >>> to
> >>>>>> NIO2
> >>>>>>>>> AsynchronousByteChannel[1]. That API would work, except that the
> >>>>>>>> signature
> >>>>>>>>> returning future seems not necessary. If that is the case, a
> >> minor
> >>>>>> change
> >>>>>>>>> on the current FLIP proposal to have "void advance(callback)"
> >>> should
> >>>>>>>> work.
> >>>>>>>>> And this means the connectors MUST have their internal threads.
> >>>>>>>>>
> >>>>>>>>> BTW, one thing I am also trying to avoid is pushing users to
> >>> perform
> >>>> IO
> >>>>>>>> in
> >>>>>>>>> a method like "isBlocked()". If the method is expected to fetch
> >>>> records
> >>>>>>>>> (even if not returning them), naming it something more explicit
> >>> would
> >>>>>>>> help
> >>>>>>>>> avoid confusion.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousByteChannel.html
> >>>>>>>>>
> >>>>>>>>> On Fri, Nov 9, 2018 at 11:20 PM Piotr Nowojski <
> >>>>>> pi...@data-artisans.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi
> >>>>>>>>>>
> >>>>>>>>>> Good point with select/epoll, however I do not see how they
> >>> couldn’t
> >>>>>> be
> >>>>>>>>>> with Flink if we would like single task in Flink to be
> >>>> single-threaded
> >>>>>>>> (and
> >>>>>>>>>> I believe we should pursue this goal). If your connector blocks
> >> on
> >>>>>>>>>> `select`, then it can not process/handle control messages from
> >>>> Flink,
> >>>>>>>> like
> >>>>>>>>>> checkpoints, releasing resources and potentially output flushes.
> >>>> This
> >>>>>>>> would
> >>>>>>>>>> require tight integration between connector and Flink’s main
> >> event
> >>>>>>>>>> loop/selects/etc.
> >>>>>>>>>>
> >>>>>>>>>> Looking at it from other perspective. Let’s assume that we have
> >> a
> >>>>>>>>>> connector implemented on top of `select`/`epoll`. In order to
> >>>>>> integrate
> >>>>>>>> it
> >>>>>>>>>> with Flink’s checkpointing/flushes/resource releasing it will
> >> have
> >>>> to
> >>>>>> be
> >>>>>>>>>> executed in separate thread one way or another. At least if our
> >>> API
> >>>>>> will
> >>>>>>>>>> enforce/encourage non blocking implementations with some kind of
> >>>>>>>>>> notifications (`isBlocked()` or `notify()` callback), some
> >>>> connectors
> >>>>>>>> might
> >>>>>>>>>> skip one layer of wapping threads.
> >>>>>>>>>>
> >>>>>>>>>> Btw, I don’t know if you understand my point. Having only
> >> `poll()`
> >>>> and
> >>>>>>>>>> `take()` is not enough for single threaded task. If our source
> >>>>>> interface
> >>>>>>>>>> doesn’t provide `notify()` callback nor `CompletableFuture<?>
> >>>>>>>>>> isBlocked(),`, there is no way to implement single threaded task
> >>>> that
> >>>>>>>> both
> >>>>>>>>>> reads the data from the source connector and can also react to
> >>>> system
> >>>>>>>>>> events. Ok, non blocking `poll()` would allow that, but with
> >> busy
> >>>>>>>> looping.
> >>>>>>>>>>
> >>>>>>>>>> Piotrek
> >>>>>>>>>>
> >>>>>>>>>>> On 8 Nov 2018, at 06:56, Becket Qin <becket....@gmail.com>
> >>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi Piotrek,
> >>>>>>>>>>>
> >>>>>>>>>>>> But I don’t see a reason why we should expose both blocking
> >>>> `take()`
> >>>>>>>> and
> >>>>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone
> >> (Flink
> >>>>>>>> engine
> >>>>>>>>>> or
> >>>>>>>>>>> connector) would have to do the same busy
> >>>>>>>>>>>> looping anyway and I think it would be better to have a
> >> simpler
> >>>>>>>>>> connector
> >>>>>>>>>>> API (that would solve our problems) and force connectors to
> >>> comply
> >>>>>> one
> >>>>>>>>>> way
> >>>>>>>>>>> or another.
> >>>>>>>>>>>
> >>>>>>>>>>> If we let the block happen inside the connector, the blocking
> >>> does
> >>>>>> not
> >>>>>>>>>> have
> >>>>>>>>>>> to be a busy loop. For example, to do the block waiting
> >>>> efficiently,
> >>>>>>>> the
> >>>>>>>>>>> connector can use java NIO selector().select which relies on OS
> >>>>>> syscall
> >>>>>>>>>>> like epoll[1] instead of busy looping. But if Flink engine
> >> blocks
> >>>>>>>> outside
> >>>>>>>>>>> the connector, it pretty much has to do the busy loop. So if
> >>> there
> >>>> is
> >>>>>>>>>> only
> >>>>>>>>>>> one API to get the element, a blocking getNextElement() makes
> >>> more
> >>>>>>>> sense.
> >>>>>>>>>>> In any case, we should avoid ambiguity. It has to be crystal
> >>> clear
> >>>>>>>> about
> >>>>>>>>>>> whether a method is expected to be blocking or non-blocking.
> >>>>>> Otherwise
> >>>>>>>> it
> >>>>>>>>>>> would be very difficult for Flink engine to do the right thing
> >>> with
> >>>>>> the
> >>>>>>>>>>> connectors. At the first glance at getCurrent(), the expected
> >>>>>> behavior
> >>>>>>>> is
> >>>>>>>>>>> not quite clear.
> >>>>>>>>>>>
> >>>>>>>>>>> That said, I do agree that functionality wise, poll() and
> >> take()
> >>>> kind
> >>>>>>>> of
> >>>>>>>>>>> overlap. But they are actually not quite different from
> >>>>>>>>>>> isBlocked()/getNextElement(). Compared with isBlocked(), the
> >> only
> >>>>>>>>>>> difference is that poll() also returns the next record if it is
> >>>>>>>>>> available.
> >>>>>>>>>>> But I agree that the isBlocked() + getNextElement() is more
> >>>> flexible
> >>>>>> as
> >>>>>>>>>>> users can just check the record availability, but not fetch the
> >>>> next
> >>>>>>>>>>> element.
> >>>>>>>>>>>
> >>>>>>>>>>>> In case of thread-less readers with only non-blocking
> >>>> `queue.poll()`
> >>>>>>>> (is
> >>>>>>>>>>> that really a thing? I can not think about a real
> >> implementation
> >>>> that
> >>>>>>>>>>> enforces such constraints)
> >>>>>>>>>>> Right, it is pretty much a syntax sugar to allow user combine
> >> the
> >>>>>>>>>>> check-and-take into one method. It could be achieved with
> >>>>>> isBlocked() +
> >>>>>>>>>>> getNextElement().
> >>>>>>>>>>>
> >>>>>>>>>>> [1] http://man7.org/linux/man-pages/man7/epoll.7.html
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski <
> >>>>>>>> pi...@data-artisans.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Becket,
> >>>>>>>>>>>>
> >>>>>>>>>>>> With my proposal, both of your examples would have to be
> >> solved
> >>> by
> >>>>>> the
> >>>>>>>>>>>> connector and solution to both problems would be the same:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Pretend that connector is never blocked (`isBlocked() { return
> >>>>>>>>>>>> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking
> >>>>>> fashion
> >>>>>>>>>> (or
> >>>>>>>>>>>> semi blocking with return of control from time to time to
> >> allow
> >>>> for
> >>>>>>>>>>>> checkpointing, network flushing and other resource management
> >>>> things
> >>>>>>>> to
> >>>>>>>>>>>> happen in the same main thread). In other words, exactly how
> >> you
> >>>>>> would
> >>>>>>>>>>>> implement `take()` method or how the same source connector
> >> would
> >>>> be
> >>>>>>>>>>>> implemented NOW with current source interface. The difference
> >>> with
> >>>>>>>>>> current
> >>>>>>>>>>>> interface would be only that main loop would be outside of the
> >>>>>>>>>> connector,
> >>>>>>>>>>>> and instead of periodically releasing checkpointing lock,
> >>>>>> periodically
> >>>>>>>>>>>> `return null;` or `return Optional.empty();` from
> >>>>>> `getNextElement()`.
> >>>>>>>>>>>>
> >>>>>>>>>>>> In case of thread-less readers with only non-blocking
> >>>> `queue.poll()`
> >>>>>>>> (is
> >>>>>>>>>>>> that really a thing? I can not think about a real
> >> implementation
> >>>>>> that
> >>>>>>>>>>>> enforces such constraints), we could provide a wrapper that
> >>> hides
> >>>>>> the
> >>>>>>>>>> busy
> >>>>>>>>>>>> looping. The same applies how to solve forever blocking
> >> readers
> >>> -
> >>>> we
> >>>>>>>>>> could
> >>>>>>>>>>>> provider another wrapper running the connector in separate
> >>> thread.
> >>>>>>>>>>>>
> >>>>>>>>>>>> But I don’t see a reason why we should expose both blocking
> >>>> `take()`
> >>>>>>>> and
> >>>>>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone
> >>> (Flink
> >>>>>>>>>> engine or
> >>>>>>>>>>>> connector) would have to do the same busy looping anyway and I
> >>>> think
> >>>>>>>> it
> >>>>>>>>>>>> would be better to have a simpler connector API (that would
> >>> solve
> >>>>>> our
> >>>>>>>>>>>> problems) and force connectors to comply one way or another.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Piotrek
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On 7 Nov 2018, at 10:55, Becket Qin <becket....@gmail.com>
> >>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Piotr,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I might have misunderstood you proposal. But let me try to
> >>>> explain
> >>>>>> my
> >>>>>>>>>>>>> concern. I am thinking about the following case:
> >>>>>>>>>>>>> 1. a reader has the following two interfaces,
> >>>>>>>>>>>>> boolean isBlocked()
> >>>>>>>>>>>>> T getNextElement()
> >>>>>>>>>>>>> 2. the implementation of getNextElement() is non-blocking.
> >>>>>>>>>>>>> 3. The reader is thread-less, i.e. it does not have any
> >>> internal
> >>>>>>>>>> thread.
> >>>>>>>>>>>>> For example, it might just delegate the getNextElement() to a
> >>>>>>>>>>>> queue.poll(),
> >>>>>>>>>>>>> and isBlocked() is just queue.isEmpty().
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> How can Flink efficiently implement a blocking reading
> >> behavior
> >>>>>> with
> >>>>>>>>>> this
> >>>>>>>>>>>>> reader? Either a tight loop or a backoff interval is needed.
> >>>>>> Neither
> >>>>>>>> of
> >>>>>>>>>>>>> them is ideal.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Now let's say in the reader mentioned above implements a
> >>> blocking
> >>>>>>>>>>>>> getNextElement() method. Because there is no internal thread
> >> in
> >>>> the
> >>>>>>>>>>>> reader,
> >>>>>>>>>>>>> after isBlocked() returns false. Flink will still have to
> >> loop
> >>> on
> >>>>>>>>>>>>> isBlocked() to check whether the next record is available. If
> >>> the
> >>>>>>>> next
> >>>>>>>>>>>>> record reaches after 10 min, it is a tight loop for 10 min.
> >> You
> >>>>>> have
> >>>>>>>>>>>>> probably noticed that in this case, even isBlocked() returns
> >> a
> >>>>>>>> future,
> >>>>>>>>>>>> that
> >>>>>>>>>>>>> future() will not be completed if Flink does not call some
> >>> method
> >>>>>>>> from
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> reader, because the reader has no internal thread to complete
> >>>> that
> >>>>>>>>>> future
> >>>>>>>>>>>>> by itself.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Due to the above reasons, a blocking take() API would allow
> >>> Flink
> >>>>>> to
> >>>>>>>>>> have
> >>>>>>>>>>>>> an efficient way to read from a reader. There are many ways
> >> to
> >>>> wake
> >>>>>>>> up
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> blocking thread when checkpointing is needed depending on the
> >>>>>>>>>>>>> implementation. But I think the poll()/take() API would also
> >>> work
> >>>>>> in
> >>>>>>>>>> that
> >>>>>>>>>>>>> case.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski <
> >>>>>>>> pi...@data-artisans.com
> >>>>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> a)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> BTW, regarding the isBlock() method, I have a few more
> >>>> questions.
> >>>>>>>> 21,
> >>>>>>>>>>>> Is
> >>>>>>>>>>>>>> a method isReady() with boolean as a return value
> >>>>>>>>>>>>>>> equivalent? Personally I found it is a little bit confusing
> >>> in
> >>>>>> what
> >>>>>>>>>> is
> >>>>>>>>>>>>>> supposed to be returned when the future is completed. 22. if
> >>>>>>>>>>>>>>> the implementation of isBlocked() is optional, how do the
> >>>> callers
> >>>>>>>>>> know
> >>>>>>>>>>>>>> whether the method is properly implemented or not?
> >>>>>>>>>>>>>>> Does not implemented mean it always return a completed
> >>> future?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> `CompletableFuture<?> isBlocked()` is more or less an
> >>> equivalent
> >>>>>> to
> >>>>>>>>>>>>>> `boolean hasNext()` which in case of “false” provides some
> >>> kind
> >>>>>> of a
> >>>>>>>>>>>>>> listener/callback that notifies about presence of next
> >>> element.
> >>>>>>>> There
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>> some minor details, like `CompletableFuture<?>` has a
> >> minimal
> >>>> two
> >>>>>>>>>> state
> >>>>>>>>>>>>>> logic:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. Future is completed - we have more data
> >>>>>>>>>>>>>> 2. Future not yet completed - we don’t have data now, but we
> >>>>>>>> might/we
> >>>>>>>>>>>> will
> >>>>>>>>>>>>>> have in the future
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> While `boolean hasNext()` and `notify()` callback are a bit
> >>> more
> >>>>>>>>>>>>>> complicated/dispersed and can lead/encourage `notify()`
> >> spam.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> b)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent`  to one method
> >>> like
> >>>>>>>>>>>> `getNext`
> >>>>>>>>>>>>>> the `getNext` would need return a
> >>>>>>>>>>>>>>> `ElementWithTimestamp` because some sources want to add
> >>>> timestamp
> >>>>>>>> to
> >>>>>>>>>>>>>> every element. IMO, this is not so memory friendly
> >>>>>>>>>>>>>>> so I prefer this design.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Guowei I don’t quite understand this. Could you elaborate
> >> why
> >>>>>>>> having a
> >>>>>>>>>>>>>> separate `advance()` help?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> c)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regarding advance/poll/take. What’s the value of having two
> >>>>>> separate
> >>>>>>>>>>>>>> methods: poll and take? Which one of them should be called
> >> and
> >>>>>> which
> >>>>>>>>>>>>>> implemented? What’s the benefit of having those methods
> >>> compared
> >>>>>> to
> >>>>>>>>>>>> having
> >>>>>>>>>>>>>> a one single method `getNextElement()` (or `pollElement() or
> >>>>>>>> whatever
> >>>>>>>>>> we
> >>>>>>>>>>>>>> name it) with following contract:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> CompletableFuture<?> isBlocked();
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> /**
> >>>>>>>>>>>>>> Return next element - will be called only if `isBlocked()`
> >> is
> >>>>>>>>>> completed.
> >>>>>>>>>>>>>> Try to implement it in non blocking fashion, but if that’s
> >>>>>>>> impossible
> >>>>>>>>>> or
> >>>>>>>>>>>>>> you just don’t need the effort, you can block in this
> >> method.
> >>>>>>>>>>>>>> */
> >>>>>>>>>>>>>> T getNextElement();
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I mean, if the connector is implemented non-blockingly,
> >> Flink
> >>>>>> should
> >>>>>>>>>> use
> >>>>>>>>>>>>>> it that way. If it’s not, then `poll()` will `throw new
> >>>>>>>>>>>>>> NotImplementedException()`. Implementing both of them and
> >>>>>> providing
> >>>>>>>>>>>> both of
> >>>>>>>>>>>>>> them to Flink wouldn’t make a sense, thus why not merge them
> >>>> into
> >>>>>> a
> >>>>>>>>>>>> single
> >>>>>>>>>>>>>> method call that should preferably (but not necessarily need
> >>> to)
> >>>>>> be
> >>>>>>>>>>>>>> non-blocking? It’s not like we are implementing general
> >>> purpose
> >>>>>>>>>> `Queue`,
> >>>>>>>>>>>>>> which users might want to call either of `poll` or `take`.
> >> We
> >>>>>> would
> >>>>>>>>>>>> always
> >>>>>>>>>>>>>> prefer to call `poll`, but if it’s blocking, then still we
> >>> have
> >>>> no
> >>>>>>>>>>>> choice,
> >>>>>>>>>>>>>> but to call it and block on it.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> d)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking
> >> source
> >>>> is
> >>>>>>>> very
> >>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may be
> >>>> another
> >>>>>>>> way
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly if
> >>>> every
> >>>>>>>>>>>> advance
> >>>>>>>>>>>>>>> call return a Future.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I didn’t want to mention this, to not clog my initial
> >>> proposal,
> >>>>>> but
> >>>>>>>>>>>> there
> >>>>>>>>>>>>>> is a simple solution for the problem:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> public interface SplitReader {
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> (…)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> CompletableFuture<?> NOT_BLOCKED =
> >>>>>>>>>>>>>> CompletableFuture.completedFuture(null);
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> /**
> >>>>>>>>>>>>>> * Returns a future that will be completed when the page
> >> source
> >>>>>>>>>>>> becomes
> >>>>>>>>>>>>>> * unblocked.  If the page source is not blocked, this method
> >>>>>>>> should
> >>>>>>>>>>>>>> return
> >>>>>>>>>>>>>> * {@code NOT_BLOCKED}.
> >>>>>>>>>>>>>> */
> >>>>>>>>>>>>>> default CompletableFuture<?> isBlocked()
> >>>>>>>>>>>>>> {
> >>>>>>>>>>>>>>   return NOT_BLOCKED;
> >>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> If we are blocked and we are waiting for the IO, then
> >>> creating a
> >>>>>> new
> >>>>>>>>>>>>>> Future is non-issue. Under full throttle/throughput and not
> >>>>>> blocked
> >>>>>>>>>>>> sources
> >>>>>>>>>>>>>> returning a static `NOT_BLOCKED` constant  should also solve
> >>> the
> >>>>>>>>>>>> problem.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> One more remark, non-blocking sources might be a necessity
> >> in
> >>> a
> >>>>>>>> single
> >>>>>>>>>>>>>> threaded model without a checkpointing lock. (Currently when
> >>>>>> sources
> >>>>>>>>>> are
> >>>>>>>>>>>>>> blocked, they can release checkpointing lock and re-acquire
> >> it
> >>>>>> again
> >>>>>>>>>>>>>> later). Non-blocking `poll`/`getNext()` would allow for
> >>>>>> checkpoints
> >>>>>>>> to
> >>>>>>>>>>>>>> happen when source is idling. In that case either `notify()`
> >>> or
> >>>> my
> >>>>>>>>>>>> proposed
> >>>>>>>>>>>>>> `isBlocked()` would allow to avoid busy-looping.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Piotrek
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 5 Nov 2018, at 03:59, Becket Qin <becket....@gmail.com>
> >>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> The iterator-like API was also the first thing that came to
> >>> me.
> >>>>>> But
> >>>>>>>>>> it
> >>>>>>>>>>>>>>> seems a little confusing that hasNext() does not mean "the
> >>>> stream
> >>>>>>>> has
> >>>>>>>>>>>> not
> >>>>>>>>>>>>>>> ended", but means "the next record is ready", which is
> >>>>>> repurposing
> >>>>>>>>>> the
> >>>>>>>>>>>>>> well
> >>>>>>>>>>>>>>> known meaning of hasNext(). If we follow the
> >> hasNext()/next()
> >>>>>>>>>> pattern,
> >>>>>>>>>>>> an
> >>>>>>>>>>>>>>> additional isNextReady() method to indicate whether the
> >> next
> >>>>>> record
> >>>>>>>>>> is
> >>>>>>>>>>>>>>> ready seems more intuitive to me.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Similarly, in poll()/take() pattern, another method of
> >>> isDone()
> >>>>>> is
> >>>>>>>>>>>> needed
> >>>>>>>>>>>>>>> to indicate whether the stream has ended or not.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Compared with hasNext()/next()/isNextReady() pattern,
> >>>>>>>>>>>>>>> isDone()/poll()/take() seems more flexible for the reader
> >>>>>>>>>>>> implementation.
> >>>>>>>>>>>>>>> When I am implementing a reader, I could have a couple of
> >>>>>> choices:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> - A thread-less reader that does not have any internal
> >>> thread.
> >>>>>>>>>>>>>>> - When poll() is called, the same calling thread will
> >>> perform a
> >>>>>>>> bunch
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>> IO asynchronously.
> >>>>>>>>>>>>>>> - When take() is called, the same calling thread will
> >>> perform a
> >>>>>>>>>>>>>> bunch
> >>>>>>>>>>>>>>> of IO and wait until the record is ready.
> >>>>>>>>>>>>>>> - A reader with internal threads performing network IO and
> >>> put
> >>>>>>>>>> records
> >>>>>>>>>>>>>>> into a buffer.
> >>>>>>>>>>>>>>> - When poll() is called, the calling thread simply reads
> >> from
> >>>>>> the
> >>>>>>>>>>>>>>> buffer and return empty result immediately if there is no
> >>>>>> record.
> >>>>>>>>>>>>>>> - When take() is called, the calling thread reads from the
> >>>>>> buffer
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>> block waiting if the buffer is empty.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On the other hand, with the hasNext()/next()/isNextReady()
> >>> API,
> >>>>>> it
> >>>>>>>> is
> >>>>>>>>>>>>>> less
> >>>>>>>>>>>>>>> intuitive for the reader developers to write the
> >> thread-less
> >>>>>>>> pattern.
> >>>>>>>>>>>>>>> Although technically speaking one can still do the
> >>> asynchronous
> >>>>>> IO
> >>>>>>>> to
> >>>>>>>>>>>>>>> prepare the record in isNextReady(). But it is inexplicit
> >> and
> >>>>>> seems
> >>>>>>>>>>>>>>> somewhat hacky.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <
> >> t...@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Couple more points regarding discovery:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The proposal mentions that discovery could be outside the
> >>>>>>>> execution
> >>>>>>>>>>>>>> graph.
> >>>>>>>>>>>>>>>> Today, discovered partitions/shards are checkpointed. I
> >>>> believe
> >>>>>>>> that
> >>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>> also need to be the case in the future, even when
> >> discovery
> >>>> and
> >>>>>>>>>>>> reading
> >>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>> split between different tasks.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> For cases such as resharding of a Kinesis stream, the
> >>>>>> relationship
> >>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>>> splits needs to be considered. Splits cannot be randomly
> >>>>>>>> distributed
> >>>>>>>>>>>>>> over
> >>>>>>>>>>>>>>>> readers in certain situations. An example was mentioned
> >>> here:
> >>>>>>>>>>>>>>>>
> >>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thomas
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <
> >> t...@apache.org
> >>>>
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for getting the ball rolling on this!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Can the number of splits decrease? Yes, splits can be
> >>> closed
> >>>>>> and
> >>>>>>>> go
> >>>>>>>>>>>>>> away.
> >>>>>>>>>>>>>>>>> An example would be a shard merge in Kinesis (2 existing
> >>>> shards
> >>>>>>>>>> will
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> closed and replaced with a new shard).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Regarding advance/poll/take: IMO the least restrictive
> >>>> approach
> >>>>>>>>>> would
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> the thread-less IO model (pull based, non-blocking,
> >> caller
> >>>>>>>>>> retrieves
> >>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>> records when available). The current Kinesis API requires
> >>> the
> >>>>>> use
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>>> threads. But that can be internal to the split reader and
> >>>> does
> >>>>>>>> not
> >>>>>>>>>>>> need
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> be a source API concern. In fact, that's what we are
> >>> working
> >>>> on
> >>>>>>>>>> right
> >>>>>>>>>>>>>> now
> >>>>>>>>>>>>>>>>> as improvement to the existing consumer: Each shard
> >>> consumer
> >>>>>>>> thread
> >>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>> push to a queue, the consumer main thread will poll the
> >>>>>> queue(s).
> >>>>>>>>>> It
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> essentially a mapping from threaded IO to non-blocking.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The proposed SplitReader interface would fit the
> >>> thread-less
> >>>> IO
> >>>>>>>>>>>> model.
> >>>>>>>>>>>>>>>>> Similar to an iterator, we find out if there is a new
> >>> element
> >>>>>>>>>>>> (hasNext)
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> if so, move to it (next()). Separate calls deliver the
> >> meta
> >>>>>>>>>>>> information
> >>>>>>>>>>>>>>>>> (timestamp, watermark). Perhaps advance call could offer
> >> a
> >>>>>>>> timeout
> >>>>>>>>>>>>>>>> option,
> >>>>>>>>>>>>>>>>> so that the caller does not end up in a busy wait. On the
> >>>> other
> >>>>>>>>>>>> hand, a
> >>>>>>>>>>>>>>>>> caller processing multiple splits may want to cycle
> >> through
> >>>>>> fast,
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>> process elements of other splits as soon as they become
> >>>>>>>> available.
> >>>>>>>>>>>> The
> >>>>>>>>>>>>>>>> nice
> >>>>>>>>>>>>>>>>> thing is that this "split merge" logic can now live in
> >>> Flink
> >>>>>> and
> >>>>>>>> be
> >>>>>>>>>>>>>>>>> optimized and shared between different sources.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>> Thomas
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <
> >>>> guowei....@gmail.com
> >>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>> Thanks Aljoscha for this FLIP.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking
> >>>> source
> >>>>>> is
> >>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may
> >> be
> >>>>>>>> another
> >>>>>>>>>>>> way
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly
> >>> if
> >>>>>>>> every
> >>>>>>>>>>>>>>>> advance
> >>>>>>>>>>>>>>>>>> call return a Future.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> public interface Listener {
> >>>>>>>>>>>>>>>>>> public void notify();
> >>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> public interface SplitReader() {
> >>>>>>>>>>>>>>>>>> /**
> >>>>>>>>>>>>>>>>>> * When there is no element temporarily, this will return
> >>>>>>>> false.
> >>>>>>>>>>>>>>>>>> * When elements is available again splitReader can call
> >>>>>>>>>>>>>>>>>> listener.notify()
> >>>>>>>>>>>>>>>>>> * In addition the frame would check `advance`
> >>> periodically .
> >>>>>>>>>>>>>>>>>> * Of course advance can always return true and ignore
> >> the
> >>>>>>>>>>>>>> listener
> >>>>>>>>>>>>>>>>>> argument for simplicity.
> >>>>>>>>>>>>>>>>>> */
> >>>>>>>>>>>>>>>>>> public boolean advance(Listener listener);
> >>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 2.  The FLIP tells us very clearly that how to create
> >> all
> >>>>>> Splits
> >>>>>>>>>> and
> >>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>> to create a SplitReader from a Split. But there is no
> >>>> strategy
> >>>>>>>> for
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> user
> >>>>>>>>>>>>>>>>>> to choose how to assign the splits to the tasks. I think
> >>> we
> >>>>>>>> could
> >>>>>>>>>>>> add
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>> Enum to let user to choose.
> >>>>>>>>>>>>>>>>>> /**
> >>>>>>>>>>>>>>>>>> public Enum SplitsAssignmentPolicy {
> >>>>>>>>>>>>>>>>>> Location,
> >>>>>>>>>>>>>>>>>> Workload,
> >>>>>>>>>>>>>>>>>> Random,
> >>>>>>>>>>>>>>>>>> Average
> >>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>> */
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent`  to one
> >> method
> >>>> like
> >>>>>>>>>>>>>> `getNext`
> >>>>>>>>>>>>>>>>>> the `getNext` would need return a `ElementWithTimestamp`
> >>>>>> because
> >>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>> sources want to add timestamp to every element. IMO,
> >> this
> >>> is
> >>>>>> not
> >>>>>>>>>> so
> >>>>>>>>>>>>>>>> memory
> >>>>>>>>>>>>>>>>>> friendly so I prefer this design.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四
> >>>>>>>> 下午6:08写道:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite
> >> a
> >>>> lot
> >>>>>> of
> >>>>>>>>>>>> other
> >>>>>>>>>>>>>>>>>>> possible improvements. I have one proposal. Instead of
> >>>>>> having a
> >>>>>>>>>>>>>> method:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> boolean advance() throws IOException;
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I would replace it with
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> /*
> >>>>>>>>>>>>>>>>>>> * Return a future, which when completed means that
> >> source
> >>>> has
> >>>>>>>>>> more
> >>>>>>>>>>>>>>>> data
> >>>>>>>>>>>>>>>>>>> and getNext() will not block.
> >>>>>>>>>>>>>>>>>>> * If you wish to use benefits of non blocking
> >> connectors,
> >>>>>>>> please
> >>>>>>>>>>>>>>>>>>> implement this method appropriately.
> >>>>>>>>>>>>>>>>>>> */
> >>>>>>>>>>>>>>>>>>> default CompletableFuture<?> isBlocked() {
> >>>>>>>>>>>>>>>>>>>  return CompletableFuture.completedFuture(null);
> >>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> And rename `getCurrent()` to `getNext()`.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Couple of arguments:
> >>>>>>>>>>>>>>>>>>> 1. I don’t understand the division of work between
> >>>>>> `advance()`
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> `getCurrent()`. What should be done in which,
> >> especially
> >>>> for
> >>>>>>>>>>>>>> connectors
> >>>>>>>>>>>>>>>>>>> that handle records in batches (like Kafka) and when
> >>> should
> >>>>>> you
> >>>>>>>>>>>> call
> >>>>>>>>>>>>>>>>>>> `advance` and when `getCurrent()`.
> >>>>>>>>>>>>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will
> >>>> allow
> >>>>>>>> us
> >>>>>>>>>> in
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> future to have asynchronous/non blocking connectors and
> >>>> more
> >>>>>>>>>>>>>>>> efficiently
> >>>>>>>>>>>>>>>>>>> handle large number of blocked threads, without busy
> >>>> waiting.
> >>>>>>>>>> While
> >>>>>>>>>>>>>> at
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> same time it doesn’t add much complexity, since naive
> >>>>>> connector
> >>>>>>>>>>>>>>>>>>> implementations can be always blocking.
> >>>>>>>>>>>>>>>>>>> 3. This also would allow us to use a fixed size thread
> >>> pool
> >>>>>> of
> >>>>>>>>>> task
> >>>>>>>>>>>>>>>>>>> executors, instead of one thread per task.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Piotrek
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <
> >>>>>>>> aljos...@apache.org
> >>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi All,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> In order to finally get the ball rolling on the new
> >>> source
> >>>>>>>>>>>> interface
> >>>>>>>>>>>>>>>>>>> that we have discussed for so long I finally created a
> >>>> FLIP:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing
> >>>>>>>> work/discussion
> >>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>> adding per-partition watermark support to the Kinesis
> >>>> source
> >>>>>>>> and
> >>>>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>>>>> this would enable generic implementation of event-time
> >>>>>>>> alignment
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>> sources. Maybe we need another FLIP for the event-time
> >>>>>>>> alignment
> >>>>>>>>>>>>>> part,
> >>>>>>>>>>>>>>>>>>> especially the part about information sharing between
> >>>>>>>> operations
> >>>>>>>>>>>> (I'm
> >>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>> calling it state sharing because state has a special
> >>>> meaning
> >>>>>> in
> >>>>>>>>>>>>>> Flink).
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Please discuss away!
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>>
> >>>
> >>
>
>

Reply via email to