Hi again,

> However I don't like the thread mode which starts a thread for each split.
> Starting extra thread in operator is not an ideal way IMO. Especially
> thread count is decided by split count. So I was wondering if there is a
> more elegant way. Do we really want these threads in Flink core?

Biao you have raised an important issue. Indeed it seems like the current 
proposal is missing something. I would guess that we need a mechanism for 
adding new splits to an already existing SplitReader and some logic to 
determine whether current instance can accept more splits or not. For example

void SplitReader#addSplit(Split)
boolean SplitReader#doesWantMoreSplits()

Flink could randomly/round robin assign new splits to the SplitReaders that 
`doWantMoreSplits()`. Batch file readers might implement some custom logic in 
`doesWantMoreSplits()`, like one SplitReader can have at most N enqueued splits?

Also what about Kafka. Isn’t it the case that one KafkaConsumer can read from 
multiple splits? So Kafka’s SplitReader should always return true from 
`doesWantMoreSplits()`?

What do you think?

Re: Becket

I’m +1 for Sync and AsyncSplitReader.

Piotrek

> On 21 Nov 2018, at 14:49, Becket Qin <becket....@gmail.com> wrote:
> 
> Hi Aljoscha,
> 
> Good point on the potential optimization in the source. One thing to
> clarify, by "adding a minimumTimestamp()/maximumTimestamp() method pair to
> the split interface", did you mean "split reader interface"? If so, what
> should the readers do if they do not have such additional information? I am
> wondering if it is possible to leave such optimization to the source
> internal implementation.
> 
> @all
> After reading all the feedback, Biao and I talked a little bit offline. We
> would like to share some new thoughts with you and see what do you think.
> 
> When looking at the Source API, we were trying to answer two questions.
> First of all, how would Flink use this API if someone else implemented it.
> Secondly, how would the connector contributors implement the interface? How
> difficult is the implementation.
> 
> KafkaConsumer is a typical example of a thread-less reader. The idea was to
> allow different threading model on top of it. It could be a global single
> thread handles record fetching and processing in an event loop pattern; it
> could also be one dedicated fetcher thread for each consumer and a separate
> thread pool for record processing. The API gives the freedom of picking up
> threading model to the users. To answer the first question, I would love to
> have such a source reader API so Flink can choose whatever threading model
> it wants. However, implementing such an interface could be pretty
> challenging and error prone.
> 
> On the other hand, having a source reader with a naive blocking socket is
> probably simple enough in most cases (actually sometimes this might even be
> the most efficient way). But it does not leave much option to Flink other
> than creating one thread per reader.
> 
> Given the above thoughts, it might be reasonable to separate the
> SplitReader API into two: SyncReader and AsyncReader. The sync reader just
> has a simple blocking takeNext() API. And the AsyncReader just has a
> pollNext(Callback) or Future<?> pollNext(). All the other methods are
> shared by both readers and could be put into a package private parent
> interface like BaseSplitReader.
> 
> Having these two readers allows both complicated and simple implementation,
> depending on the SplitReader writers. From Flink's perspective, it will
> choose a more efficient threading model if the SplitReader is an
> AsyncReader. Otherwise, it may have to use the one thread per reader model
> if the reader is a SyncReader. Users can also choose to implement both
> interface, in that case, it is up to Flink to choose which interface to use.
> 
> Admittedly, this solution does have one more interface, but still seems
> rewarding. Any thoughts?
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> On Sun, Nov 18, 2018 at 11:33 PM Biao Liu <mmyy1...@gmail.com> wrote:
> 
>> Hi community,
>> 
>> Thank you guys for sharing ideas.
>> 
>> The thing I really concern is about the thread mode.
>> Actually in Alibaba, we have implemented our "split reader" based source
>> two years ago. That's based on "SourceFunction", it's just an extension not
>> a refactoring. It's almost same with the version Thomas and Jamie described
>> in Google Doc. It really helps in many scenarios.
>> 
>> However I don't like the thread mode which starts a thread for each split.
>> Starting extra thread in operator is not an ideal way IMO. Especially
>> thread count is decided by split count. So I was wondering if there is a
>> more elegant way. Do we really want these threads in Flink core?
>> 
>> I agree that blocking interface is more easy to implement. Could we at
>> least separate the split reader with source function into different
>> interfaces? Not all sources would like to read all splits concurrently. In
>> batch scenario, reading splits one by one is more general. And also not all
>> sources are partitioned, right?
>> I prefer there is a new source interface with "pull mode" only, no split.
>> There is a splittable source extended it. And there is one implementation
>> that starting threads for each split, reading all splits concurrently.
>> 
>> 
>> Thomas Weise <t...@apache.org> 于2018年11月18日周日 上午3:18写道:
>> 
>>> @Aljoscha to address your question first: In the case of the Kinesis
>>> consumer (with current Kinesis consumer API), there would also be N+1
>>> threads. I have implemented a prototype similar to what is shown in
>> Jamie's
>>> document, where the thread ownership is similar to what you have done for
>>> Kafka.
>>> 
>>> The equivalent of split reader manages its own thread and the "source
>> main
>>> thread" is responsible for emitting the data. The interface between the N
>>> reader threads and the 1 emitter is a blocking queue per consumer thread.
>>> The emitter can now control which queue to consume from based on the
>> event
>>> time progress.
>>> 
>>> This is akin to a "non-blocking" interface *between emitter and split
>>> reader*. Emitter uses poll to retrieve records from the N queues (which
>>> requires non-blocking interaction). The emitter is independent of the
>> split
>>> reader implementation, that part could live in Flink.
>>> 
>>> Regarding whether or not to assume that split readers always need a
>> thread
>>> and in addition that these reader threads should be managed by Flink: It
>>> depends on the API of respective external systems and I would not bake
>> that
>>> assumption into Flink. Some client libraries manage their own threads
>> (see
>>> push based API like JMS and as I understand it may also apply to the new
>>> fan-out Kinesis API:
>>> 
>>> 
>> https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html
>>> ).
>>> In such cases it would not make sense to layer another reader thread on
>>> top. It may instead be better if Flink provides to the split reader the
>>> queue/buffer to push records to.
>>> 
>>> The discussion so far has largely ignored the discovery aspect. There are
>>> some important considerations such as ordering dependency of splits and
>>> work rebalancing that may affect the split reader interface. Should we
>> fork
>>> this into a separate thread?
>>> 
>>> Thanks,
>>> Thomas
>>> 
>>> 
>>> On Fri, Nov 16, 2018 at 8:09 AM Piotr Nowojski <pi...@data-artisans.com>
>>> wrote:
>>> 
>>>> Hi Jamie,
>>>> 
>>>> As it was already covered with my discussion with Becket, there is an
>>> easy
>>>> way to provide blocking API on top of non-blocking API. And yes we both
>>>> agreed that blocking API is easier to implement by users.
>>>> 
>>>> I also do not agree with respect to usefulness of non blocking API.
>>>> Actually Kafka connector is the one that could be more efficient thanks
>>> to
>>>> the removal of the one layer of threading.
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 16 Nov 2018, at 02:21, Jamie Grier <jgr...@lyft.com.INVALID>
>> wrote:
>>>>> 
>>>>> Thanks Aljoscha for getting this effort going!
>>>>> 
>>>>> There's been plenty of discussion here already and I'll add my big +1
>>> to
>>>>> making this interface very simple to implement for a new
>>>>> Source/SplitReader.  Writing a new production quality connector for
>>> Flink
>>>>> is very difficult today and requires a lot of detailed knowledge
>> about
>>>>> Flink, event time progress, watermarking, idle shard detection, etc
>> and
>>>> it
>>>>> would be good to move almost all of this type of code into Flink
>> itself
>>>> and
>>>>> out of source implementations.  I also think this is totally doable
>> and
>>>> I'm
>>>>> really excited to see this happening.
>>>>> 
>>>>> I do have a couple of thoughts about the API and the implementation..
>>>>> 
>>>>> In a perfect world there would be a single thread per Flink source
>>>> sub-task
>>>>> and no additional threads for SplitReaders -- but this assumes a
>> world
>>>>> where you have true async IO APIs for the upstream systems (like
>> Kafka
>>>> and
>>>>> Kinesis, S3, HDFS, etc).  If that world did exist the single thread
>>> could
>>>>> just sit in an efficient select() call waiting for new data to arrive
>>> on
>>>>> any Split.  That'd be awesome..
>>>>> 
>>>>> But, that world doesn't exist and given that practical consideration
>> I
>>>>> would think the next best implementation is going to be, in practice,
>>>>> probably a thread per SplitReader that does nothing but call the
>> source
>>>> API
>>>>> and drop whatever it reads into a (blocking) queue -- as Aljoscha
>>>> mentioned
>>>>> (calling it N+1) and as we started to describe here:
>>>>> 
>>>> 
>>> 
>> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit#heading=h.lfi2u4fv9cxa
>>>>> 
>>>>> I guess my point is that I think we should strive to move as much of
>>>>> something like the diagram referenced in the above doc into Flink
>>> itself
>>>>> and out of sources and simplify the SplitReader API as much as
>> possible
>>>> as
>>>>> well.
>>>>> 
>>>>> With the above in mind and with regard to the discussion about
>>> blocking,
>>>>> etc..  I'm not sure I agree with some of the discussion so far with
>>>> regard
>>>>> to this API design.  The calls to the upstream systems
>> (kafka/kinesis)
>>>> are
>>>>> in fact going to be blocking calls.  So a simple API without the
>>>> constraint
>>>>> that the methods must be implemented in a non-blocking way seems
>> better
>>>> to
>>>>> me from the point of view of somebody writing a new source
>>>> implementation.
>>>>> My concern is that if you force the implementer of the SplitReader
>>>>> interface to do so in a non-blocking way you're just going to make it
>>>>> harder to write those implementations.  Those calls to read the next
>>> bit
>>>> of
>>>>> data are going to be blocking calls with most known important sources
>>> --
>>>> at
>>>>> least Kafka/Kinesis/HDFS -- so I think maybe we should just deal with
>>>> that
>>>>> head on and work around it a higher level so the SplitReader
>> interface
>>>>> stays super simple to implement.  This means we manage all the
>>> threading
>>>> in
>>>>> Flink core, the API stays pull-based, and the implementer is allowed
>> to
>>>>> simply block until they have data to return.
>>>>> 
>>>>> I maybe would change my mind about this if truly asynchronous APIs to
>>> the
>>>>> upstream source systems were likely to be available in the near
>> future
>>> or
>>>>> are now and I'm just ignorant of it.  But even then the supporting
>> code
>>>> in
>>>>> Flink to drive async and sync sources would be different and in fact
>>> they
>>>>> might just have different APIs altogether -- SplitReader vs
>>>>> AsyncSplitReader maybe.
>>>>> 
>>>>> In the end I think playing with the implementation, across more than
>>> one
>>>>> source, and moving as much common code into Flink itself will reveal
>>> the
>>>>> best API of course.
>>>>> 
>>>>> One other interesting note is that you need to preserve per-partition
>>>>> ordering so you have to take care with the implementation if it were
>> to
>>>> be
>>>>> based on a thread pool and futures so as not to reorder the reads.
>>>>> 
>>>>> Anyway, I'm thrilled to see this starting to move forward and I'd
>> very
>>>> much
>>>>> like to help with the implementation wherever I can.  We're doing a
>>>>> simplified internal version of some of this at Lyft for just Kinesis
>>>>> because we need a solution for event time alignment in the very short
>>>> term
>>>>> but we'd like to immediately start helping to do this properly in
>> Flink
>>>>> after that.  One of the end goals for us is event time alignment
>> across
>>>>> heterogeneous sources.  Another is making it possible for non-expert
>>>> users
>>>>> to have a high probability of being able to write their own, correct,
>>>>> connectors.
>>>>> 
>>>>> -Jamie
>>>>> 
>>>>> On Thu, Nov 15, 2018 at 3:43 AM Aljoscha Krettek <
>> aljos...@apache.org>
>>>>> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> I thought I had sent this mail a while ago but I must have forgotten
>>> to
>>>>>> send it.
>>>>>> 
>>>>>> There is another thing we should consider for splits: the range of
>>>>>> timestamps that it can contain. For example, the splits of a file
>>> source
>>>>>> would know what the minimum and maximum timestamp in the splits is,
>>>>>> roughly. For infinite splits, such as Kafka partitions, the minimum
>>>> would
>>>>>> be meaningful but the maximum would be +Inf. If the splits expose
>> the
>>>>>> interval of time that they contain the readers, or the component
>> that
>>>>>> manages the readers can make decisions about which splits to forward
>>> and
>>>>>> read first. And it can also influence the minimum watermark that a
>>>> reader
>>>>>> forwards: it should never emit a watermark if it knows there are
>>> splits
>>>> to
>>>>>> read that have a lower minimum timestamp. I think it should be as
>> easy
>>>> as
>>>>>> adding a minimumTimestamp()/maximumTimestamp() method pair to the
>>> split
>>>>>> interface.
>>>>>> 
>>>>>> Another thing we need to resolve is the actual reader interface. I
>> see
>>>>>> there has been some good discussion but I don't know if we have
>>>> consensus.
>>>>>> We should try and see how specific sources could be implemented with
>>> the
>>>>>> new interface. For example, for Kafka I think we need to have N+1
>>>> threads
>>>>>> per task (where N is the number of splits that a task is reading
>>> from).
>>>> On
>>>>>> thread is responsible for reading from the splits. And each split
>> has
>>>> its
>>>>>> own (internal) thread for reading from Kafka and putting messages in
>>> an
>>>>>> internal queue to pull from. This is similar to how the current
>> Kafka
>>>>>> source is implemented, which has a separate fetcher thread. The
>> reason
>>>> for
>>>>>> this split is that we always need to try reading from Kafka to keep
>>> the
>>>>>> throughput up. In the current implementation the internal queue (or
>>>>>> handover) limits the read rate of the reader threads.
>>>>>> 
>>>>>> @Thomas, what do you think this would look like for Kinesis?
>>>>>> 
>>>>>> Best,
>>>>>> Aljoscha
>>>>>> 
>>>>>>> On 15. Nov 2018, at 03:56, Becket Qin <becket....@gmail.com>
>> wrote:
>>>>>>> 
>>>>>>> Hi Piotrek,
>>>>>>> 
>>>>>>> Thanks a lot for the detailed reply. All makes sense to me.
>>>>>>> 
>>>>>>> WRT the confusion between advance() / getCurrent(), do you think it
>>>> would
>>>>>>> help if we combine them and have something like:
>>>>>>> 
>>>>>>> CompletableFuture<T> getNext();
>>>>>>> long getWatermark();
>>>>>>> long getCurrentTimestamp();
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> 
>>>>>>> Jiangjie (Becket) Qin
>>>>>>> 
>>>>>>> On Tue, Nov 13, 2018 at 9:56 PM Piotr Nowojski <
>>>> pi...@data-artisans.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> Thanks again for the detailed answer :) Sorry for responding with
>> a
>>>>>> delay.
>>>>>>>> 
>>>>>>>>> Completely agree that in pattern 2, having a callback is
>> necessary
>>>> for
>>>>>>>> that
>>>>>>>>> single thread outside of the connectors. And the connectors MUST
>>> have
>>>>>>>>> internal threads.
>>>>>>>> 
>>>>>>>> Yes, this thread will have to exists somewhere. In pattern 2 it
>>> exists
>>>>>> in
>>>>>>>> the connector (at least from the perspective of the Flink
>> execution
>>>>>>>> engine). In pattern 1 it exists inside the Flink execution engine.
>>>> With
>>>>>>>> completely blocking connectors, like simple reading from files,
>> both
>>>> of
>>>>>>>> those approaches are basically the same. The difference is when
>> user
>>>>>>>> implementing Flink source is already working with a non blocking
>>> code
>>>>>> with
>>>>>>>> some internal threads. In this case, pattern 1 would result in
>>> "double
>>>>>>>> thread wrapping”, while pattern 2 would allow to skip one layer of
>>>>>>>> indirection.
>>>>>>>> 
>>>>>>>>> If we go that way, we should have something like "void
>>>>>>>>> poll(Callback) / void advance(callback)". I am curious how would
>>>>>>>>> CompletableFuture work here, though. If 10 readers returns 10
>>>>>> completable
>>>>>>>>> futures, will there be 10 additional threads (so 20 threads in
>>> total)
>>>>>>>>> blocking waiting on them? Or will there be a single thread busy
>>> loop
>>>>>>>>> checking around?
>>>>>>>> 
>>>>>>>> To be honest, I haven’t thought this completely through and I
>>> haven’t
>>>>>>>> tested/POC’ed it. Having said that, I can think of at least couple
>>> of
>>>>>>>> solutions. First is something like this:
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
>>>>>>>> <
>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> Line:
>>>>>>>> 
>>>>>>>>                              `blocked = split.process();`
>>>>>>>> 
>>>>>>>> Is where the execution goes into to the task/sources. This is
>> where
>>>> the
>>>>>>>> returned future is handled:
>>>>>>>> 
>>>>>>>>                              blocked.addListener(() -> {
>>>>>>>>                                  blockedSplits.remove(split);
>>>>>>>>                                  // reset the level priority to
>>>>>> prevent
>>>>>>>> previously-blocked splits from starving existing splits
>>>>>>>>                                  split.resetLevelPriority();
>>>>>>>>                                  waitingSplits.offer(split);
>>>>>>>>                              }, executor);
>>>>>>>> 
>>>>>>>> Fundamentally callbacks and Futures are more or less
>> interchangeable
>>>> You
>>>>>>>> can always wrap one into another (creating a callback that
>>> completes a
>>>>>>>> future and attach a callback once future completes). In this case
>>> the
>>>>>>>> difference for me is mostly:
>>>>>>>> - api with passing callback allows the callback to be fired
>> multiple
>>>>>> times
>>>>>>>> and to fire it even if the connector is not blocked. This is what
>> I
>>>>>> meant
>>>>>>>> by saying that api `CompletableFuture<?> isBlocked()` is a bit
>>>> simpler.
>>>>>>>> Connector can only return either “I’m not blocked” or “I’m blocked
>>>> and I
>>>>>>>> will tell you only once when I’m not blocked anymore”.
>>>>>>>> 
>>>>>>>> But this is not the most important thing for me here. For me
>>> important
>>>>>>>> thing is to try our best to make Flink task’s control and
>> execution
>>>>>> single
>>>>>>>> threaded. For that both callback and future APIs should work the
>>> same.
>>>>>>>> 
>>>>>>>>> WRT pattern 1, a single blocking take() API should just work. The
>>>> good
>>>>>>>>> thing is that a blocking read API is usually simpler to
>> implement.
>>>>>>>> 
>>>>>>>> Yes, they are easier to implement (especially if you are not the
>> one
>>>>>> that
>>>>>>>> have to deal with the additional threading required around them ;)
>>> ).
>>>>>> But
>>>>>>>> to answer this issue, if we choose pattern 2, we can always
>> provide
>>> a
>>>>>>>> proxy/wrapper that would using the internal thread implement the
>>>>>>>> non-blocking API while exposing blocking API to the user. It would
>>>>>>>> implement pattern 2 for the user exposing to him pattern 1. In
>> other
>>>>>> words
>>>>>>>> implementing pattern 1 in pattern 2 paradigm, while making it
>>> possible
>>>>>> to
>>>>>>>> implement pure pattern 2 connectors.
>>>>>>>> 
>>>>>>>>> BTW, one thing I am also trying to avoid is pushing users to
>>> perform
>>>> IO
>>>>>>>> in
>>>>>>>>> a method like "isBlocked()". If the method is expected to fetch
>>>> records
>>>>>>>>> (even if not returning them), naming it something more explicit
>>> would
>>>>>>>> help
>>>>>>>>> avoid confusion.
>>>>>>>> 
>>>>>>>> If we choose so, we could rework it into something like:
>>>>>>>> 
>>>>>>>> CompletableFuture<?> advance()
>>>>>>>> T getCurrent();
>>>>>>>> Watermark getCurrentWatermark()
>>>>>>>> 
>>>>>>>> But as I wrote before, this is more confusing to me for the exact
>>>>>> reasons
>>>>>>>> you mentioned :) I would be confused what should be done in
>>>> `adanvce()`
>>>>>> and
>>>>>>>> what in `getCurrent()`. However, again this naming issue is not
>> that
>>>>>>>> important to me and probably is matter of taste/personal
>>> preferences.
>>>>>>>> 
>>>>>>>> Piotrek
>>>>>>>> 
>>>>>>>>> On 9 Nov 2018, at 18:37, Becket Qin <becket....@gmail.com>
>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Piotrek,
>>>>>>>>> 
>>>>>>>>> Thanks for the explanation. We are probably talking about the
>> same
>>>>>> thing
>>>>>>>>> but in different ways. To clarify a little bit, I think there are
>>> two
>>>>>>>>> patterns to read from a connector.
>>>>>>>>> 
>>>>>>>>> Pattern 1: Thread-less connector with a blocking read API.
>> Outside
>>> of
>>>>>> the
>>>>>>>>> connector, there is one IO thread per reader, doing blocking
>> read.
>>> An
>>>>>>>>> additional thread will interact with all the IO threads.
>>>>>>>>> Pattern 2: Connector with internal thread(s) and non-blocking
>> API.
>>>>>>>> Outside
>>>>>>>>> of the connector, there is one thread for ALL readers, doing IO
>>>> relying
>>>>>>>> on
>>>>>>>>> notification callbacks in the reader.
>>>>>>>>> 
>>>>>>>>> In both patterns, there must be at least one thread per
>> connector,
>>>>>> either
>>>>>>>>> inside (created by connector writers) or outside (created by
>> Flink)
>>>> of
>>>>>>>> the
>>>>>>>>> connector. Ideally there are NUM_CONNECTORS + 1 threads in total,
>>> to
>>>>>> make
>>>>>>>>> sure that 1 thread is fully non-blocking.
>>>>>>>>> 
>>>>>>>>>> Btw, I don’t know if you understand my point. Having only
>> `poll()`
>>>> and
>>>>>>>>> `take()` is not enough for single threaded task. If our source
>>>>>> interface
>>>>>>>>> doesn’t provide `notify()` callback nor >`CompletableFuture<?>
>>>>>>>>> isBlocked(),`, there is no way to implement single threaded task
>>> that
>>>>>>>> both
>>>>>>>>> reads the data from the source connector and can also react to
>>> system
>>>>>>>>> events. Ok, non >blocking `poll()` would allow that, but with
>> busy
>>>>>>>> looping.
>>>>>>>>> 
>>>>>>>>> Completely agree that in pattern 2, having a callback is
>> necessary
>>>> for
>>>>>>>> that
>>>>>>>>> single thread outside of the connectors. And the connectors MUST
>>> have
>>>>>>>>> internal threads. If we go that way, we should have something
>> like
>>>>>> "void
>>>>>>>>> poll(Callback) / void advance(callback)". I am curious how would
>>>>>>>>> CompletableFuture work here, though. If 10 readers returns 10
>>>>>> completable
>>>>>>>>> futures, will there be 10 additional threads (so 20 threads in
>>> total)
>>>>>>>>> blocking waiting on them? Or will there be a single thread busy
>>> loop
>>>>>>>>> checking around?
>>>>>>>>> 
>>>>>>>>> WRT pattern 1, a single blocking take() API should just work. The
>>>> good
>>>>>>>>> thing is that a blocking read API is usually simpler to
>> implement.
>>> An
>>>>>>>>> additional non-blocking "T poll()" method here is indeed optional
>>> and
>>>>>>>> could
>>>>>>>>> be used in cases like Flink does not want the thread to block
>>>> forever.
>>>>>>>> They
>>>>>>>>> can also be combined to have a "T poll(Timeout)", which is
>> exactly
>>>> what
>>>>>>>>> KafkaConsumer did.
>>>>>>>>> 
>>>>>>>>> It sounds that you are proposing pattern 2 with something similar
>>> to
>>>>>> NIO2
>>>>>>>>> AsynchronousByteChannel[1]. That API would work, except that the
>>>>>>>> signature
>>>>>>>>> returning future seems not necessary. If that is the case, a
>> minor
>>>>>> change
>>>>>>>>> on the current FLIP proposal to have "void advance(callback)"
>>> should
>>>>>>>> work.
>>>>>>>>> And this means the connectors MUST have their internal threads.
>>>>>>>>> 
>>>>>>>>> BTW, one thing I am also trying to avoid is pushing users to
>>> perform
>>>> IO
>>>>>>>> in
>>>>>>>>> a method like "isBlocked()". If the method is expected to fetch
>>>> records
>>>>>>>>> (even if not returning them), naming it something more explicit
>>> would
>>>>>>>> help
>>>>>>>>> avoid confusion.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> 
>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>> 
>>>>>>>>> [1]
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> https://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousByteChannel.html
>>>>>>>>> 
>>>>>>>>> On Fri, Nov 9, 2018 at 11:20 PM Piotr Nowojski <
>>>>>> pi...@data-artisans.com>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi
>>>>>>>>>> 
>>>>>>>>>> Good point with select/epoll, however I do not see how they
>>> couldn’t
>>>>>> be
>>>>>>>>>> with Flink if we would like single task in Flink to be
>>>> single-threaded
>>>>>>>> (and
>>>>>>>>>> I believe we should pursue this goal). If your connector blocks
>> on
>>>>>>>>>> `select`, then it can not process/handle control messages from
>>>> Flink,
>>>>>>>> like
>>>>>>>>>> checkpoints, releasing resources and potentially output flushes.
>>>> This
>>>>>>>> would
>>>>>>>>>> require tight integration between connector and Flink’s main
>> event
>>>>>>>>>> loop/selects/etc.
>>>>>>>>>> 
>>>>>>>>>> Looking at it from other perspective. Let’s assume that we have
>> a
>>>>>>>>>> connector implemented on top of `select`/`epoll`. In order to
>>>>>> integrate
>>>>>>>> it
>>>>>>>>>> with Flink’s checkpointing/flushes/resource releasing it will
>> have
>>>> to
>>>>>> be
>>>>>>>>>> executed in separate thread one way or another. At least if our
>>> API
>>>>>> will
>>>>>>>>>> enforce/encourage non blocking implementations with some kind of
>>>>>>>>>> notifications (`isBlocked()` or `notify()` callback), some
>>>> connectors
>>>>>>>> might
>>>>>>>>>> skip one layer of wapping threads.
>>>>>>>>>> 
>>>>>>>>>> Btw, I don’t know if you understand my point. Having only
>> `poll()`
>>>> and
>>>>>>>>>> `take()` is not enough for single threaded task. If our source
>>>>>> interface
>>>>>>>>>> doesn’t provide `notify()` callback nor `CompletableFuture<?>
>>>>>>>>>> isBlocked(),`, there is no way to implement single threaded task
>>>> that
>>>>>>>> both
>>>>>>>>>> reads the data from the source connector and can also react to
>>>> system
>>>>>>>>>> events. Ok, non blocking `poll()` would allow that, but with
>> busy
>>>>>>>> looping.
>>>>>>>>>> 
>>>>>>>>>> Piotrek
>>>>>>>>>> 
>>>>>>>>>>> On 8 Nov 2018, at 06:56, Becket Qin <becket....@gmail.com>
>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi Piotrek,
>>>>>>>>>>> 
>>>>>>>>>>>> But I don’t see a reason why we should expose both blocking
>>>> `take()`
>>>>>>>> and
>>>>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone
>> (Flink
>>>>>>>> engine
>>>>>>>>>> or
>>>>>>>>>>> connector) would have to do the same busy
>>>>>>>>>>>> looping anyway and I think it would be better to have a
>> simpler
>>>>>>>>>> connector
>>>>>>>>>>> API (that would solve our problems) and force connectors to
>>> comply
>>>>>> one
>>>>>>>>>> way
>>>>>>>>>>> or another.
>>>>>>>>>>> 
>>>>>>>>>>> If we let the block happen inside the connector, the blocking
>>> does
>>>>>> not
>>>>>>>>>> have
>>>>>>>>>>> to be a busy loop. For example, to do the block waiting
>>>> efficiently,
>>>>>>>> the
>>>>>>>>>>> connector can use java NIO selector().select which relies on OS
>>>>>> syscall
>>>>>>>>>>> like epoll[1] instead of busy looping. But if Flink engine
>> blocks
>>>>>>>> outside
>>>>>>>>>>> the connector, it pretty much has to do the busy loop. So if
>>> there
>>>> is
>>>>>>>>>> only
>>>>>>>>>>> one API to get the element, a blocking getNextElement() makes
>>> more
>>>>>>>> sense.
>>>>>>>>>>> In any case, we should avoid ambiguity. It has to be crystal
>>> clear
>>>>>>>> about
>>>>>>>>>>> whether a method is expected to be blocking or non-blocking.
>>>>>> Otherwise
>>>>>>>> it
>>>>>>>>>>> would be very difficult for Flink engine to do the right thing
>>> with
>>>>>> the
>>>>>>>>>>> connectors. At the first glance at getCurrent(), the expected
>>>>>> behavior
>>>>>>>> is
>>>>>>>>>>> not quite clear.
>>>>>>>>>>> 
>>>>>>>>>>> That said, I do agree that functionality wise, poll() and
>> take()
>>>> kind
>>>>>>>> of
>>>>>>>>>>> overlap. But they are actually not quite different from
>>>>>>>>>>> isBlocked()/getNextElement(). Compared with isBlocked(), the
>> only
>>>>>>>>>>> difference is that poll() also returns the next record if it is
>>>>>>>>>> available.
>>>>>>>>>>> But I agree that the isBlocked() + getNextElement() is more
>>>> flexible
>>>>>> as
>>>>>>>>>>> users can just check the record availability, but not fetch the
>>>> next
>>>>>>>>>>> element.
>>>>>>>>>>> 
>>>>>>>>>>>> In case of thread-less readers with only non-blocking
>>>> `queue.poll()`
>>>>>>>> (is
>>>>>>>>>>> that really a thing? I can not think about a real
>> implementation
>>>> that
>>>>>>>>>>> enforces such constraints)
>>>>>>>>>>> Right, it is pretty much a syntax sugar to allow user combine
>> the
>>>>>>>>>>> check-and-take into one method. It could be achieved with
>>>>>> isBlocked() +
>>>>>>>>>>> getNextElement().
>>>>>>>>>>> 
>>>>>>>>>>> [1] http://man7.org/linux/man-pages/man7/epoll.7.html
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> 
>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>> 
>>>>>>>>>>> On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski <
>>>>>>>> pi...@data-artisans.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>> 
>>>>>>>>>>>> With my proposal, both of your examples would have to be
>> solved
>>> by
>>>>>> the
>>>>>>>>>>>> connector and solution to both problems would be the same:
>>>>>>>>>>>> 
>>>>>>>>>>>> Pretend that connector is never blocked (`isBlocked() { return
>>>>>>>>>>>> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking
>>>>>> fashion
>>>>>>>>>> (or
>>>>>>>>>>>> semi blocking with return of control from time to time to
>> allow
>>>> for
>>>>>>>>>>>> checkpointing, network flushing and other resource management
>>>> things
>>>>>>>> to
>>>>>>>>>>>> happen in the same main thread). In other words, exactly how
>> you
>>>>>> would
>>>>>>>>>>>> implement `take()` method or how the same source connector
>> would
>>>> be
>>>>>>>>>>>> implemented NOW with current source interface. The difference
>>> with
>>>>>>>>>> current
>>>>>>>>>>>> interface would be only that main loop would be outside of the
>>>>>>>>>> connector,
>>>>>>>>>>>> and instead of periodically releasing checkpointing lock,
>>>>>> periodically
>>>>>>>>>>>> `return null;` or `return Optional.empty();` from
>>>>>> `getNextElement()`.
>>>>>>>>>>>> 
>>>>>>>>>>>> In case of thread-less readers with only non-blocking
>>>> `queue.poll()`
>>>>>>>> (is
>>>>>>>>>>>> that really a thing? I can not think about a real
>> implementation
>>>>>> that
>>>>>>>>>>>> enforces such constraints), we could provide a wrapper that
>>> hides
>>>>>> the
>>>>>>>>>> busy
>>>>>>>>>>>> looping. The same applies how to solve forever blocking
>> readers
>>> -
>>>> we
>>>>>>>>>> could
>>>>>>>>>>>> provider another wrapper running the connector in separate
>>> thread.
>>>>>>>>>>>> 
>>>>>>>>>>>> But I don’t see a reason why we should expose both blocking
>>>> `take()`
>>>>>>>> and
>>>>>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone
>>> (Flink
>>>>>>>>>> engine or
>>>>>>>>>>>> connector) would have to do the same busy looping anyway and I
>>>> think
>>>>>>>> it
>>>>>>>>>>>> would be better to have a simpler connector API (that would
>>> solve
>>>>>> our
>>>>>>>>>>>> problems) and force connectors to comply one way or another.
>>>>>>>>>>>> 
>>>>>>>>>>>> Piotrek
>>>>>>>>>>>> 
>>>>>>>>>>>>> On 7 Nov 2018, at 10:55, Becket Qin <becket....@gmail.com>
>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Piotr,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I might have misunderstood you proposal. But let me try to
>>>> explain
>>>>>> my
>>>>>>>>>>>>> concern. I am thinking about the following case:
>>>>>>>>>>>>> 1. a reader has the following two interfaces,
>>>>>>>>>>>>> boolean isBlocked()
>>>>>>>>>>>>> T getNextElement()
>>>>>>>>>>>>> 2. the implementation of getNextElement() is non-blocking.
>>>>>>>>>>>>> 3. The reader is thread-less, i.e. it does not have any
>>> internal
>>>>>>>>>> thread.
>>>>>>>>>>>>> For example, it might just delegate the getNextElement() to a
>>>>>>>>>>>> queue.poll(),
>>>>>>>>>>>>> and isBlocked() is just queue.isEmpty().
>>>>>>>>>>>>> 
>>>>>>>>>>>>> How can Flink efficiently implement a blocking reading
>> behavior
>>>>>> with
>>>>>>>>>> this
>>>>>>>>>>>>> reader? Either a tight loop or a backoff interval is needed.
>>>>>> Neither
>>>>>>>> of
>>>>>>>>>>>>> them is ideal.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Now let's say in the reader mentioned above implements a
>>> blocking
>>>>>>>>>>>>> getNextElement() method. Because there is no internal thread
>> in
>>>> the
>>>>>>>>>>>> reader,
>>>>>>>>>>>>> after isBlocked() returns false. Flink will still have to
>> loop
>>> on
>>>>>>>>>>>>> isBlocked() to check whether the next record is available. If
>>> the
>>>>>>>> next
>>>>>>>>>>>>> record reaches after 10 min, it is a tight loop for 10 min.
>> You
>>>>>> have
>>>>>>>>>>>>> probably noticed that in this case, even isBlocked() returns
>> a
>>>>>>>> future,
>>>>>>>>>>>> that
>>>>>>>>>>>>> future() will not be completed if Flink does not call some
>>> method
>>>>>>>> from
>>>>>>>>>>>> the
>>>>>>>>>>>>> reader, because the reader has no internal thread to complete
>>>> that
>>>>>>>>>> future
>>>>>>>>>>>>> by itself.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Due to the above reasons, a blocking take() API would allow
>>> Flink
>>>>>> to
>>>>>>>>>> have
>>>>>>>>>>>>> an efficient way to read from a reader. There are many ways
>> to
>>>> wake
>>>>>>>> up
>>>>>>>>>>>> the
>>>>>>>>>>>>> blocking thread when checkpointing is needed depending on the
>>>>>>>>>>>>> implementation. But I think the poll()/take() API would also
>>> work
>>>>>> in
>>>>>>>>>> that
>>>>>>>>>>>>> case.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski <
>>>>>>>> pi...@data-artisans.com
>>>>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> a)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> BTW, regarding the isBlock() method, I have a few more
>>>> questions.
>>>>>>>> 21,
>>>>>>>>>>>> Is
>>>>>>>>>>>>>> a method isReady() with boolean as a return value
>>>>>>>>>>>>>>> equivalent? Personally I found it is a little bit confusing
>>> in
>>>>>> what
>>>>>>>>>> is
>>>>>>>>>>>>>> supposed to be returned when the future is completed. 22. if
>>>>>>>>>>>>>>> the implementation of isBlocked() is optional, how do the
>>>> callers
>>>>>>>>>> know
>>>>>>>>>>>>>> whether the method is properly implemented or not?
>>>>>>>>>>>>>>> Does not implemented mean it always return a completed
>>> future?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> `CompletableFuture<?> isBlocked()` is more or less an
>>> equivalent
>>>>>> to
>>>>>>>>>>>>>> `boolean hasNext()` which in case of “false” provides some
>>> kind
>>>>>> of a
>>>>>>>>>>>>>> listener/callback that notifies about presence of next
>>> element.
>>>>>>>> There
>>>>>>>>>>>> are
>>>>>>>>>>>>>> some minor details, like `CompletableFuture<?>` has a
>> minimal
>>>> two
>>>>>>>>>> state
>>>>>>>>>>>>>> logic:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 1. Future is completed - we have more data
>>>>>>>>>>>>>> 2. Future not yet completed - we don’t have data now, but we
>>>>>>>> might/we
>>>>>>>>>>>> will
>>>>>>>>>>>>>> have in the future
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> While `boolean hasNext()` and `notify()` callback are a bit
>>> more
>>>>>>>>>>>>>> complicated/dispersed and can lead/encourage `notify()`
>> spam.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> b)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent`  to one method
>>> like
>>>>>>>>>>>> `getNext`
>>>>>>>>>>>>>> the `getNext` would need return a
>>>>>>>>>>>>>>> `ElementWithTimestamp` because some sources want to add
>>>> timestamp
>>>>>>>> to
>>>>>>>>>>>>>> every element. IMO, this is not so memory friendly
>>>>>>>>>>>>>>> so I prefer this design.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Guowei I don’t quite understand this. Could you elaborate
>> why
>>>>>>>> having a
>>>>>>>>>>>>>> separate `advance()` help?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> c)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Regarding advance/poll/take. What’s the value of having two
>>>>>> separate
>>>>>>>>>>>>>> methods: poll and take? Which one of them should be called
>> and
>>>>>> which
>>>>>>>>>>>>>> implemented? What’s the benefit of having those methods
>>> compared
>>>>>> to
>>>>>>>>>>>> having
>>>>>>>>>>>>>> a one single method `getNextElement()` (or `pollElement() or
>>>>>>>> whatever
>>>>>>>>>> we
>>>>>>>>>>>>>> name it) with following contract:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> CompletableFuture<?> isBlocked();
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>> Return next element - will be called only if `isBlocked()`
>> is
>>>>>>>>>> completed.
>>>>>>>>>>>>>> Try to implement it in non blocking fashion, but if that’s
>>>>>>>> impossible
>>>>>>>>>> or
>>>>>>>>>>>>>> you just don’t need the effort, you can block in this
>> method.
>>>>>>>>>>>>>> */
>>>>>>>>>>>>>> T getNextElement();
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I mean, if the connector is implemented non-blockingly,
>> Flink
>>>>>> should
>>>>>>>>>> use
>>>>>>>>>>>>>> it that way. If it’s not, then `poll()` will `throw new
>>>>>>>>>>>>>> NotImplementedException()`. Implementing both of them and
>>>>>> providing
>>>>>>>>>>>> both of
>>>>>>>>>>>>>> them to Flink wouldn’t make a sense, thus why not merge them
>>>> into
>>>>>> a
>>>>>>>>>>>> single
>>>>>>>>>>>>>> method call that should preferably (but not necessarily need
>>> to)
>>>>>> be
>>>>>>>>>>>>>> non-blocking? It’s not like we are implementing general
>>> purpose
>>>>>>>>>> `Queue`,
>>>>>>>>>>>>>> which users might want to call either of `poll` or `take`.
>> We
>>>>>> would
>>>>>>>>>>>> always
>>>>>>>>>>>>>> prefer to call `poll`, but if it’s blocking, then still we
>>> have
>>>> no
>>>>>>>>>>>> choice,
>>>>>>>>>>>>>> but to call it and block on it.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> d)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking
>> source
>>>> is
>>>>>>>> very
>>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may be
>>>> another
>>>>>>>> way
>>>>>>>>>>>> to
>>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly if
>>>> every
>>>>>>>>>>>> advance
>>>>>>>>>>>>>>> call return a Future.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I didn’t want to mention this, to not clog my initial
>>> proposal,
>>>>>> but
>>>>>>>>>>>> there
>>>>>>>>>>>>>> is a simple solution for the problem:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> public interface SplitReader {
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> (…)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> CompletableFuture<?> NOT_BLOCKED =
>>>>>>>>>>>>>> CompletableFuture.completedFuture(null);
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>> * Returns a future that will be completed when the page
>> source
>>>>>>>>>>>> becomes
>>>>>>>>>>>>>> * unblocked.  If the page source is not blocked, this method
>>>>>>>> should
>>>>>>>>>>>>>> return
>>>>>>>>>>>>>> * {@code NOT_BLOCKED}.
>>>>>>>>>>>>>> */
>>>>>>>>>>>>>> default CompletableFuture<?> isBlocked()
>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>   return NOT_BLOCKED;
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> If we are blocked and we are waiting for the IO, then
>>> creating a
>>>>>> new
>>>>>>>>>>>>>> Future is non-issue. Under full throttle/throughput and not
>>>>>> blocked
>>>>>>>>>>>> sources
>>>>>>>>>>>>>> returning a static `NOT_BLOCKED` constant  should also solve
>>> the
>>>>>>>>>>>> problem.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> One more remark, non-blocking sources might be a necessity
>> in
>>> a
>>>>>>>> single
>>>>>>>>>>>>>> threaded model without a checkpointing lock. (Currently when
>>>>>> sources
>>>>>>>>>> are
>>>>>>>>>>>>>> blocked, they can release checkpointing lock and re-acquire
>> it
>>>>>> again
>>>>>>>>>>>>>> later). Non-blocking `poll`/`getNext()` would allow for
>>>>>> checkpoints
>>>>>>>> to
>>>>>>>>>>>>>> happen when source is idling. In that case either `notify()`
>>> or
>>>> my
>>>>>>>>>>>> proposed
>>>>>>>>>>>>>> `isBlocked()` would allow to avoid busy-looping.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 5 Nov 2018, at 03:59, Becket Qin <becket....@gmail.com>
>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> The iterator-like API was also the first thing that came to
>>> me.
>>>>>> But
>>>>>>>>>> it
>>>>>>>>>>>>>>> seems a little confusing that hasNext() does not mean "the
>>>> stream
>>>>>>>> has
>>>>>>>>>>>> not
>>>>>>>>>>>>>>> ended", but means "the next record is ready", which is
>>>>>> repurposing
>>>>>>>>>> the
>>>>>>>>>>>>>> well
>>>>>>>>>>>>>>> known meaning of hasNext(). If we follow the
>> hasNext()/next()
>>>>>>>>>> pattern,
>>>>>>>>>>>> an
>>>>>>>>>>>>>>> additional isNextReady() method to indicate whether the
>> next
>>>>>> record
>>>>>>>>>> is
>>>>>>>>>>>>>>> ready seems more intuitive to me.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Similarly, in poll()/take() pattern, another method of
>>> isDone()
>>>>>> is
>>>>>>>>>>>> needed
>>>>>>>>>>>>>>> to indicate whether the stream has ended or not.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Compared with hasNext()/next()/isNextReady() pattern,
>>>>>>>>>>>>>>> isDone()/poll()/take() seems more flexible for the reader
>>>>>>>>>>>> implementation.
>>>>>>>>>>>>>>> When I am implementing a reader, I could have a couple of
>>>>>> choices:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> - A thread-less reader that does not have any internal
>>> thread.
>>>>>>>>>>>>>>> - When poll() is called, the same calling thread will
>>> perform a
>>>>>>>> bunch
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>> IO asynchronously.
>>>>>>>>>>>>>>> - When take() is called, the same calling thread will
>>> perform a
>>>>>>>>>>>>>> bunch
>>>>>>>>>>>>>>> of IO and wait until the record is ready.
>>>>>>>>>>>>>>> - A reader with internal threads performing network IO and
>>> put
>>>>>>>>>> records
>>>>>>>>>>>>>>> into a buffer.
>>>>>>>>>>>>>>> - When poll() is called, the calling thread simply reads
>> from
>>>>>> the
>>>>>>>>>>>>>>> buffer and return empty result immediately if there is no
>>>>>> record.
>>>>>>>>>>>>>>> - When take() is called, the calling thread reads from the
>>>>>> buffer
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> block waiting if the buffer is empty.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On the other hand, with the hasNext()/next()/isNextReady()
>>> API,
>>>>>> it
>>>>>>>> is
>>>>>>>>>>>>>> less
>>>>>>>>>>>>>>> intuitive for the reader developers to write the
>> thread-less
>>>>>>>> pattern.
>>>>>>>>>>>>>>> Although technically speaking one can still do the
>>> asynchronous
>>>>>> IO
>>>>>>>> to
>>>>>>>>>>>>>>> prepare the record in isNextReady(). But it is inexplicit
>> and
>>>>>> seems
>>>>>>>>>>>>>>> somewhat hacky.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <
>> t...@apache.org>
>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Couple more points regarding discovery:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> The proposal mentions that discovery could be outside the
>>>>>>>> execution
>>>>>>>>>>>>>> graph.
>>>>>>>>>>>>>>>> Today, discovered partitions/shards are checkpointed. I
>>>> believe
>>>>>>>> that
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>> also need to be the case in the future, even when
>> discovery
>>>> and
>>>>>>>>>>>> reading
>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> split between different tasks.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> For cases such as resharding of a Kinesis stream, the
>>>>>> relationship
>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>> splits needs to be considered. Splits cannot be randomly
>>>>>>>> distributed
>>>>>>>>>>>>>> over
>>>>>>>>>>>>>>>> readers in certain situations. An example was mentioned
>>> here:
>>>>>>>>>>>>>>>> 
>>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <
>> t...@apache.org
>>>> 
>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks for getting the ball rolling on this!
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Can the number of splits decrease? Yes, splits can be
>>> closed
>>>>>> and
>>>>>>>> go
>>>>>>>>>>>>>> away.
>>>>>>>>>>>>>>>>> An example would be a shard merge in Kinesis (2 existing
>>>> shards
>>>>>>>>>> will
>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> closed and replaced with a new shard).
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Regarding advance/poll/take: IMO the least restrictive
>>>> approach
>>>>>>>>>> would
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> the thread-less IO model (pull based, non-blocking,
>> caller
>>>>>>>>>> retrieves
>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>> records when available). The current Kinesis API requires
>>> the
>>>>>> use
>>>>>>>>>> of
>>>>>>>>>>>>>>>>> threads. But that can be internal to the split reader and
>>>> does
>>>>>>>> not
>>>>>>>>>>>> need
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> be a source API concern. In fact, that's what we are
>>> working
>>>> on
>>>>>>>>>> right
>>>>>>>>>>>>>> now
>>>>>>>>>>>>>>>>> as improvement to the existing consumer: Each shard
>>> consumer
>>>>>>>> thread
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> push to a queue, the consumer main thread will poll the
>>>>>> queue(s).
>>>>>>>>>> It
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> essentially a mapping from threaded IO to non-blocking.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> The proposed SplitReader interface would fit the
>>> thread-less
>>>> IO
>>>>>>>>>>>> model.
>>>>>>>>>>>>>>>>> Similar to an iterator, we find out if there is a new
>>> element
>>>>>>>>>>>> (hasNext)
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> if so, move to it (next()). Separate calls deliver the
>> meta
>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>> (timestamp, watermark). Perhaps advance call could offer
>> a
>>>>>>>> timeout
>>>>>>>>>>>>>>>> option,
>>>>>>>>>>>>>>>>> so that the caller does not end up in a busy wait. On the
>>>> other
>>>>>>>>>>>> hand, a
>>>>>>>>>>>>>>>>> caller processing multiple splits may want to cycle
>> through
>>>>>> fast,
>>>>>>>>>> to
>>>>>>>>>>>>>>>>> process elements of other splits as soon as they become
>>>>>>>> available.
>>>>>>>>>>>> The
>>>>>>>>>>>>>>>> nice
>>>>>>>>>>>>>>>>> thing is that this "split merge" logic can now live in
>>> Flink
>>>>>> and
>>>>>>>> be
>>>>>>>>>>>>>>>>> optimized and shared between different sources.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <
>>>> guowei....@gmail.com
>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>> Thanks Aljoscha for this FLIP.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking
>>>> source
>>>>>> is
>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may
>> be
>>>>>>>> another
>>>>>>>>>>>> way
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly
>>> if
>>>>>>>> every
>>>>>>>>>>>>>>>> advance
>>>>>>>>>>>>>>>>>> call return a Future.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> public interface Listener {
>>>>>>>>>>>>>>>>>> public void notify();
>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> public interface SplitReader() {
>>>>>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>>>>> * When there is no element temporarily, this will return
>>>>>>>> false.
>>>>>>>>>>>>>>>>>> * When elements is available again splitReader can call
>>>>>>>>>>>>>>>>>> listener.notify()
>>>>>>>>>>>>>>>>>> * In addition the frame would check `advance`
>>> periodically .
>>>>>>>>>>>>>>>>>> * Of course advance can always return true and ignore
>> the
>>>>>>>>>>>>>> listener
>>>>>>>>>>>>>>>>>> argument for simplicity.
>>>>>>>>>>>>>>>>>> */
>>>>>>>>>>>>>>>>>> public boolean advance(Listener listener);
>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 2.  The FLIP tells us very clearly that how to create
>> all
>>>>>> Splits
>>>>>>>>>> and
>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>> to create a SplitReader from a Split. But there is no
>>>> strategy
>>>>>>>> for
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>> to choose how to assign the splits to the tasks. I think
>>> we
>>>>>>>> could
>>>>>>>>>>>> add
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> Enum to let user to choose.
>>>>>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>>>>> public Enum SplitsAssignmentPolicy {
>>>>>>>>>>>>>>>>>> Location,
>>>>>>>>>>>>>>>>>> Workload,
>>>>>>>>>>>>>>>>>> Random,
>>>>>>>>>>>>>>>>>> Average
>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>> */
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent`  to one
>> method
>>>> like
>>>>>>>>>>>>>> `getNext`
>>>>>>>>>>>>>>>>>> the `getNext` would need return a `ElementWithTimestamp`
>>>>>> because
>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>> sources want to add timestamp to every element. IMO,
>> this
>>> is
>>>>>> not
>>>>>>>>>> so
>>>>>>>>>>>>>>>> memory
>>>>>>>>>>>>>>>>>> friendly so I prefer this design.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四
>>>>>>>> 下午6:08写道:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite
>> a
>>>> lot
>>>>>> of
>>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>>> possible improvements. I have one proposal. Instead of
>>>>>> having a
>>>>>>>>>>>>>> method:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> boolean advance() throws IOException;
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I would replace it with
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> /*
>>>>>>>>>>>>>>>>>>> * Return a future, which when completed means that
>> source
>>>> has
>>>>>>>>>> more
>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>>>> and getNext() will not block.
>>>>>>>>>>>>>>>>>>> * If you wish to use benefits of non blocking
>> connectors,
>>>>>>>> please
>>>>>>>>>>>>>>>>>>> implement this method appropriately.
>>>>>>>>>>>>>>>>>>> */
>>>>>>>>>>>>>>>>>>> default CompletableFuture<?> isBlocked() {
>>>>>>>>>>>>>>>>>>>  return CompletableFuture.completedFuture(null);
>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> And rename `getCurrent()` to `getNext()`.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Couple of arguments:
>>>>>>>>>>>>>>>>>>> 1. I don’t understand the division of work between
>>>>>> `advance()`
>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> `getCurrent()`. What should be done in which,
>> especially
>>>> for
>>>>>>>>>>>>>> connectors
>>>>>>>>>>>>>>>>>>> that handle records in batches (like Kafka) and when
>>> should
>>>>>> you
>>>>>>>>>>>> call
>>>>>>>>>>>>>>>>>>> `advance` and when `getCurrent()`.
>>>>>>>>>>>>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will
>>>> allow
>>>>>>>> us
>>>>>>>>>> in
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> future to have asynchronous/non blocking connectors and
>>>> more
>>>>>>>>>>>>>>>> efficiently
>>>>>>>>>>>>>>>>>>> handle large number of blocked threads, without busy
>>>> waiting.
>>>>>>>>>> While
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> same time it doesn’t add much complexity, since naive
>>>>>> connector
>>>>>>>>>>>>>>>>>>> implementations can be always blocking.
>>>>>>>>>>>>>>>>>>> 3. This also would allow us to use a fixed size thread
>>> pool
>>>>>> of
>>>>>>>>>> task
>>>>>>>>>>>>>>>>>>> executors, instead of one thread per task.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Piotrek
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <
>>>>>>>> aljos...@apache.org
>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> In order to finally get the ball rolling on the new
>>> source
>>>>>>>>>>>> interface
>>>>>>>>>>>>>>>>>>> that we have discussed for so long I finally created a
>>>> FLIP:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing
>>>>>>>> work/discussion
>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>> adding per-partition watermark support to the Kinesis
>>>> source
>>>>>>>> and
>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>> this would enable generic implementation of event-time
>>>>>>>> alignment
>>>>>>>>>>>> for
>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>> sources. Maybe we need another FLIP for the event-time
>>>>>>>> alignment
>>>>>>>>>>>>>> part,
>>>>>>>>>>>>>>>>>>> especially the part about information sharing between
>>>>>>>> operations
>>>>>>>>>>>> (I'm
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>> calling it state sharing because state has a special
>>>> meaning
>>>>>> in
>>>>>>>>>>>>>> Flink).
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Please discuss away!
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>> 

Reply via email to