Hi again, > However I don't like the thread mode which starts a thread for each split. > Starting extra thread in operator is not an ideal way IMO. Especially > thread count is decided by split count. So I was wondering if there is a > more elegant way. Do we really want these threads in Flink core?
Biao you have raised an important issue. Indeed it seems like the current proposal is missing something. I would guess that we need a mechanism for adding new splits to an already existing SplitReader and some logic to determine whether current instance can accept more splits or not. For example void SplitReader#addSplit(Split) boolean SplitReader#doesWantMoreSplits() Flink could randomly/round robin assign new splits to the SplitReaders that `doWantMoreSplits()`. Batch file readers might implement some custom logic in `doesWantMoreSplits()`, like one SplitReader can have at most N enqueued splits? Also what about Kafka. Isn’t it the case that one KafkaConsumer can read from multiple splits? So Kafka’s SplitReader should always return true from `doesWantMoreSplits()`? What do you think? Re: Becket I’m +1 for Sync and AsyncSplitReader. Piotrek > On 21 Nov 2018, at 14:49, Becket Qin <becket....@gmail.com> wrote: > > Hi Aljoscha, > > Good point on the potential optimization in the source. One thing to > clarify, by "adding a minimumTimestamp()/maximumTimestamp() method pair to > the split interface", did you mean "split reader interface"? If so, what > should the readers do if they do not have such additional information? I am > wondering if it is possible to leave such optimization to the source > internal implementation. > > @all > After reading all the feedback, Biao and I talked a little bit offline. We > would like to share some new thoughts with you and see what do you think. > > When looking at the Source API, we were trying to answer two questions. > First of all, how would Flink use this API if someone else implemented it. > Secondly, how would the connector contributors implement the interface? How > difficult is the implementation. > > KafkaConsumer is a typical example of a thread-less reader. The idea was to > allow different threading model on top of it. It could be a global single > thread handles record fetching and processing in an event loop pattern; it > could also be one dedicated fetcher thread for each consumer and a separate > thread pool for record processing. The API gives the freedom of picking up > threading model to the users. To answer the first question, I would love to > have such a source reader API so Flink can choose whatever threading model > it wants. However, implementing such an interface could be pretty > challenging and error prone. > > On the other hand, having a source reader with a naive blocking socket is > probably simple enough in most cases (actually sometimes this might even be > the most efficient way). But it does not leave much option to Flink other > than creating one thread per reader. > > Given the above thoughts, it might be reasonable to separate the > SplitReader API into two: SyncReader and AsyncReader. The sync reader just > has a simple blocking takeNext() API. And the AsyncReader just has a > pollNext(Callback) or Future<?> pollNext(). All the other methods are > shared by both readers and could be put into a package private parent > interface like BaseSplitReader. > > Having these two readers allows both complicated and simple implementation, > depending on the SplitReader writers. From Flink's perspective, it will > choose a more efficient threading model if the SplitReader is an > AsyncReader. Otherwise, it may have to use the one thread per reader model > if the reader is a SyncReader. Users can also choose to implement both > interface, in that case, it is up to Flink to choose which interface to use. > > Admittedly, this solution does have one more interface, but still seems > rewarding. Any thoughts? > > Thanks, > > Jiangjie (Becket) Qin > > > On Sun, Nov 18, 2018 at 11:33 PM Biao Liu <mmyy1...@gmail.com> wrote: > >> Hi community, >> >> Thank you guys for sharing ideas. >> >> The thing I really concern is about the thread mode. >> Actually in Alibaba, we have implemented our "split reader" based source >> two years ago. That's based on "SourceFunction", it's just an extension not >> a refactoring. It's almost same with the version Thomas and Jamie described >> in Google Doc. It really helps in many scenarios. >> >> However I don't like the thread mode which starts a thread for each split. >> Starting extra thread in operator is not an ideal way IMO. Especially >> thread count is decided by split count. So I was wondering if there is a >> more elegant way. Do we really want these threads in Flink core? >> >> I agree that blocking interface is more easy to implement. Could we at >> least separate the split reader with source function into different >> interfaces? Not all sources would like to read all splits concurrently. In >> batch scenario, reading splits one by one is more general. And also not all >> sources are partitioned, right? >> I prefer there is a new source interface with "pull mode" only, no split. >> There is a splittable source extended it. And there is one implementation >> that starting threads for each split, reading all splits concurrently. >> >> >> Thomas Weise <t...@apache.org> 于2018年11月18日周日 上午3:18写道: >> >>> @Aljoscha to address your question first: In the case of the Kinesis >>> consumer (with current Kinesis consumer API), there would also be N+1 >>> threads. I have implemented a prototype similar to what is shown in >> Jamie's >>> document, where the thread ownership is similar to what you have done for >>> Kafka. >>> >>> The equivalent of split reader manages its own thread and the "source >> main >>> thread" is responsible for emitting the data. The interface between the N >>> reader threads and the 1 emitter is a blocking queue per consumer thread. >>> The emitter can now control which queue to consume from based on the >> event >>> time progress. >>> >>> This is akin to a "non-blocking" interface *between emitter and split >>> reader*. Emitter uses poll to retrieve records from the N queues (which >>> requires non-blocking interaction). The emitter is independent of the >> split >>> reader implementation, that part could live in Flink. >>> >>> Regarding whether or not to assume that split readers always need a >> thread >>> and in addition that these reader threads should be managed by Flink: It >>> depends on the API of respective external systems and I would not bake >> that >>> assumption into Flink. Some client libraries manage their own threads >> (see >>> push based API like JMS and as I understand it may also apply to the new >>> fan-out Kinesis API: >>> >>> >> https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html >>> ). >>> In such cases it would not make sense to layer another reader thread on >>> top. It may instead be better if Flink provides to the split reader the >>> queue/buffer to push records to. >>> >>> The discussion so far has largely ignored the discovery aspect. There are >>> some important considerations such as ordering dependency of splits and >>> work rebalancing that may affect the split reader interface. Should we >> fork >>> this into a separate thread? >>> >>> Thanks, >>> Thomas >>> >>> >>> On Fri, Nov 16, 2018 at 8:09 AM Piotr Nowojski <pi...@data-artisans.com> >>> wrote: >>> >>>> Hi Jamie, >>>> >>>> As it was already covered with my discussion with Becket, there is an >>> easy >>>> way to provide blocking API on top of non-blocking API. And yes we both >>>> agreed that blocking API is easier to implement by users. >>>> >>>> I also do not agree with respect to usefulness of non blocking API. >>>> Actually Kafka connector is the one that could be more efficient thanks >>> to >>>> the removal of the one layer of threading. >>>> >>>> Piotrek >>>> >>>>> On 16 Nov 2018, at 02:21, Jamie Grier <jgr...@lyft.com.INVALID> >> wrote: >>>>> >>>>> Thanks Aljoscha for getting this effort going! >>>>> >>>>> There's been plenty of discussion here already and I'll add my big +1 >>> to >>>>> making this interface very simple to implement for a new >>>>> Source/SplitReader. Writing a new production quality connector for >>> Flink >>>>> is very difficult today and requires a lot of detailed knowledge >> about >>>>> Flink, event time progress, watermarking, idle shard detection, etc >> and >>>> it >>>>> would be good to move almost all of this type of code into Flink >> itself >>>> and >>>>> out of source implementations. I also think this is totally doable >> and >>>> I'm >>>>> really excited to see this happening. >>>>> >>>>> I do have a couple of thoughts about the API and the implementation.. >>>>> >>>>> In a perfect world there would be a single thread per Flink source >>>> sub-task >>>>> and no additional threads for SplitReaders -- but this assumes a >> world >>>>> where you have true async IO APIs for the upstream systems (like >> Kafka >>>> and >>>>> Kinesis, S3, HDFS, etc). If that world did exist the single thread >>> could >>>>> just sit in an efficient select() call waiting for new data to arrive >>> on >>>>> any Split. That'd be awesome.. >>>>> >>>>> But, that world doesn't exist and given that practical consideration >> I >>>>> would think the next best implementation is going to be, in practice, >>>>> probably a thread per SplitReader that does nothing but call the >> source >>>> API >>>>> and drop whatever it reads into a (blocking) queue -- as Aljoscha >>>> mentioned >>>>> (calling it N+1) and as we started to describe here: >>>>> >>>> >>> >> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit#heading=h.lfi2u4fv9cxa >>>>> >>>>> I guess my point is that I think we should strive to move as much of >>>>> something like the diagram referenced in the above doc into Flink >>> itself >>>>> and out of sources and simplify the SplitReader API as much as >> possible >>>> as >>>>> well. >>>>> >>>>> With the above in mind and with regard to the discussion about >>> blocking, >>>>> etc.. I'm not sure I agree with some of the discussion so far with >>>> regard >>>>> to this API design. The calls to the upstream systems >> (kafka/kinesis) >>>> are >>>>> in fact going to be blocking calls. So a simple API without the >>>> constraint >>>>> that the methods must be implemented in a non-blocking way seems >> better >>>> to >>>>> me from the point of view of somebody writing a new source >>>> implementation. >>>>> My concern is that if you force the implementer of the SplitReader >>>>> interface to do so in a non-blocking way you're just going to make it >>>>> harder to write those implementations. Those calls to read the next >>> bit >>>> of >>>>> data are going to be blocking calls with most known important sources >>> -- >>>> at >>>>> least Kafka/Kinesis/HDFS -- so I think maybe we should just deal with >>>> that >>>>> head on and work around it a higher level so the SplitReader >> interface >>>>> stays super simple to implement. This means we manage all the >>> threading >>>> in >>>>> Flink core, the API stays pull-based, and the implementer is allowed >> to >>>>> simply block until they have data to return. >>>>> >>>>> I maybe would change my mind about this if truly asynchronous APIs to >>> the >>>>> upstream source systems were likely to be available in the near >> future >>> or >>>>> are now and I'm just ignorant of it. But even then the supporting >> code >>>> in >>>>> Flink to drive async and sync sources would be different and in fact >>> they >>>>> might just have different APIs altogether -- SplitReader vs >>>>> AsyncSplitReader maybe. >>>>> >>>>> In the end I think playing with the implementation, across more than >>> one >>>>> source, and moving as much common code into Flink itself will reveal >>> the >>>>> best API of course. >>>>> >>>>> One other interesting note is that you need to preserve per-partition >>>>> ordering so you have to take care with the implementation if it were >> to >>>> be >>>>> based on a thread pool and futures so as not to reorder the reads. >>>>> >>>>> Anyway, I'm thrilled to see this starting to move forward and I'd >> very >>>> much >>>>> like to help with the implementation wherever I can. We're doing a >>>>> simplified internal version of some of this at Lyft for just Kinesis >>>>> because we need a solution for event time alignment in the very short >>>> term >>>>> but we'd like to immediately start helping to do this properly in >> Flink >>>>> after that. One of the end goals for us is event time alignment >> across >>>>> heterogeneous sources. Another is making it possible for non-expert >>>> users >>>>> to have a high probability of being able to write their own, correct, >>>>> connectors. >>>>> >>>>> -Jamie >>>>> >>>>> On Thu, Nov 15, 2018 at 3:43 AM Aljoscha Krettek < >> aljos...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I thought I had sent this mail a while ago but I must have forgotten >>> to >>>>>> send it. >>>>>> >>>>>> There is another thing we should consider for splits: the range of >>>>>> timestamps that it can contain. For example, the splits of a file >>> source >>>>>> would know what the minimum and maximum timestamp in the splits is, >>>>>> roughly. For infinite splits, such as Kafka partitions, the minimum >>>> would >>>>>> be meaningful but the maximum would be +Inf. If the splits expose >> the >>>>>> interval of time that they contain the readers, or the component >> that >>>>>> manages the readers can make decisions about which splits to forward >>> and >>>>>> read first. And it can also influence the minimum watermark that a >>>> reader >>>>>> forwards: it should never emit a watermark if it knows there are >>> splits >>>> to >>>>>> read that have a lower minimum timestamp. I think it should be as >> easy >>>> as >>>>>> adding a minimumTimestamp()/maximumTimestamp() method pair to the >>> split >>>>>> interface. >>>>>> >>>>>> Another thing we need to resolve is the actual reader interface. I >> see >>>>>> there has been some good discussion but I don't know if we have >>>> consensus. >>>>>> We should try and see how specific sources could be implemented with >>> the >>>>>> new interface. For example, for Kafka I think we need to have N+1 >>>> threads >>>>>> per task (where N is the number of splits that a task is reading >>> from). >>>> On >>>>>> thread is responsible for reading from the splits. And each split >> has >>>> its >>>>>> own (internal) thread for reading from Kafka and putting messages in >>> an >>>>>> internal queue to pull from. This is similar to how the current >> Kafka >>>>>> source is implemented, which has a separate fetcher thread. The >> reason >>>> for >>>>>> this split is that we always need to try reading from Kafka to keep >>> the >>>>>> throughput up. In the current implementation the internal queue (or >>>>>> handover) limits the read rate of the reader threads. >>>>>> >>>>>> @Thomas, what do you think this would look like for Kinesis? >>>>>> >>>>>> Best, >>>>>> Aljoscha >>>>>> >>>>>>> On 15. Nov 2018, at 03:56, Becket Qin <becket....@gmail.com> >> wrote: >>>>>>> >>>>>>> Hi Piotrek, >>>>>>> >>>>>>> Thanks a lot for the detailed reply. All makes sense to me. >>>>>>> >>>>>>> WRT the confusion between advance() / getCurrent(), do you think it >>>> would >>>>>>> help if we combine them and have something like: >>>>>>> >>>>>>> CompletableFuture<T> getNext(); >>>>>>> long getWatermark(); >>>>>>> long getCurrentTimestamp(); >>>>>>> >>>>>>> Cheers, >>>>>>> >>>>>>> Jiangjie (Becket) Qin >>>>>>> >>>>>>> On Tue, Nov 13, 2018 at 9:56 PM Piotr Nowojski < >>>> pi...@data-artisans.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Thanks again for the detailed answer :) Sorry for responding with >> a >>>>>> delay. >>>>>>>> >>>>>>>>> Completely agree that in pattern 2, having a callback is >> necessary >>>> for >>>>>>>> that >>>>>>>>> single thread outside of the connectors. And the connectors MUST >>> have >>>>>>>>> internal threads. >>>>>>>> >>>>>>>> Yes, this thread will have to exists somewhere. In pattern 2 it >>> exists >>>>>> in >>>>>>>> the connector (at least from the perspective of the Flink >> execution >>>>>>>> engine). In pattern 1 it exists inside the Flink execution engine. >>>> With >>>>>>>> completely blocking connectors, like simple reading from files, >> both >>>> of >>>>>>>> those approaches are basically the same. The difference is when >> user >>>>>>>> implementing Flink source is already working with a non blocking >>> code >>>>>> with >>>>>>>> some internal threads. In this case, pattern 1 would result in >>> "double >>>>>>>> thread wrapping”, while pattern 2 would allow to skip one layer of >>>>>>>> indirection. >>>>>>>> >>>>>>>>> If we go that way, we should have something like "void >>>>>>>>> poll(Callback) / void advance(callback)". I am curious how would >>>>>>>>> CompletableFuture work here, though. If 10 readers returns 10 >>>>>> completable >>>>>>>>> futures, will there be 10 additional threads (so 20 threads in >>> total) >>>>>>>>> blocking waiting on them? Or will there be a single thread busy >>> loop >>>>>>>>> checking around? >>>>>>>> >>>>>>>> To be honest, I haven’t thought this completely through and I >>> haven’t >>>>>>>> tested/POC’ed it. Having said that, I can think of at least couple >>> of >>>>>>>> solutions. First is something like this: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>> >>> >> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507 >>>>>>>> < >>>>>>>> >>>>>> >>>> >>> >> https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java#L481-L507 >>>>>>>>> >>>>>>>> >>>>>>>> Line: >>>>>>>> >>>>>>>> `blocked = split.process();` >>>>>>>> >>>>>>>> Is where the execution goes into to the task/sources. This is >> where >>>> the >>>>>>>> returned future is handled: >>>>>>>> >>>>>>>> blocked.addListener(() -> { >>>>>>>> blockedSplits.remove(split); >>>>>>>> // reset the level priority to >>>>>> prevent >>>>>>>> previously-blocked splits from starving existing splits >>>>>>>> split.resetLevelPriority(); >>>>>>>> waitingSplits.offer(split); >>>>>>>> }, executor); >>>>>>>> >>>>>>>> Fundamentally callbacks and Futures are more or less >> interchangeable >>>> You >>>>>>>> can always wrap one into another (creating a callback that >>> completes a >>>>>>>> future and attach a callback once future completes). In this case >>> the >>>>>>>> difference for me is mostly: >>>>>>>> - api with passing callback allows the callback to be fired >> multiple >>>>>> times >>>>>>>> and to fire it even if the connector is not blocked. This is what >> I >>>>>> meant >>>>>>>> by saying that api `CompletableFuture<?> isBlocked()` is a bit >>>> simpler. >>>>>>>> Connector can only return either “I’m not blocked” or “I’m blocked >>>> and I >>>>>>>> will tell you only once when I’m not blocked anymore”. >>>>>>>> >>>>>>>> But this is not the most important thing for me here. For me >>> important >>>>>>>> thing is to try our best to make Flink task’s control and >> execution >>>>>> single >>>>>>>> threaded. For that both callback and future APIs should work the >>> same. >>>>>>>> >>>>>>>>> WRT pattern 1, a single blocking take() API should just work. The >>>> good >>>>>>>>> thing is that a blocking read API is usually simpler to >> implement. >>>>>>>> >>>>>>>> Yes, they are easier to implement (especially if you are not the >> one >>>>>> that >>>>>>>> have to deal with the additional threading required around them ;) >>> ). >>>>>> But >>>>>>>> to answer this issue, if we choose pattern 2, we can always >> provide >>> a >>>>>>>> proxy/wrapper that would using the internal thread implement the >>>>>>>> non-blocking API while exposing blocking API to the user. It would >>>>>>>> implement pattern 2 for the user exposing to him pattern 1. In >> other >>>>>> words >>>>>>>> implementing pattern 1 in pattern 2 paradigm, while making it >>> possible >>>>>> to >>>>>>>> implement pure pattern 2 connectors. >>>>>>>> >>>>>>>>> BTW, one thing I am also trying to avoid is pushing users to >>> perform >>>> IO >>>>>>>> in >>>>>>>>> a method like "isBlocked()". If the method is expected to fetch >>>> records >>>>>>>>> (even if not returning them), naming it something more explicit >>> would >>>>>>>> help >>>>>>>>> avoid confusion. >>>>>>>> >>>>>>>> If we choose so, we could rework it into something like: >>>>>>>> >>>>>>>> CompletableFuture<?> advance() >>>>>>>> T getCurrent(); >>>>>>>> Watermark getCurrentWatermark() >>>>>>>> >>>>>>>> But as I wrote before, this is more confusing to me for the exact >>>>>> reasons >>>>>>>> you mentioned :) I would be confused what should be done in >>>> `adanvce()` >>>>>> and >>>>>>>> what in `getCurrent()`. However, again this naming issue is not >> that >>>>>>>> important to me and probably is matter of taste/personal >>> preferences. >>>>>>>> >>>>>>>> Piotrek >>>>>>>> >>>>>>>>> On 9 Nov 2018, at 18:37, Becket Qin <becket....@gmail.com> >> wrote: >>>>>>>>> >>>>>>>>> Hi Piotrek, >>>>>>>>> >>>>>>>>> Thanks for the explanation. We are probably talking about the >> same >>>>>> thing >>>>>>>>> but in different ways. To clarify a little bit, I think there are >>> two >>>>>>>>> patterns to read from a connector. >>>>>>>>> >>>>>>>>> Pattern 1: Thread-less connector with a blocking read API. >> Outside >>> of >>>>>> the >>>>>>>>> connector, there is one IO thread per reader, doing blocking >> read. >>> An >>>>>>>>> additional thread will interact with all the IO threads. >>>>>>>>> Pattern 2: Connector with internal thread(s) and non-blocking >> API. >>>>>>>> Outside >>>>>>>>> of the connector, there is one thread for ALL readers, doing IO >>>> relying >>>>>>>> on >>>>>>>>> notification callbacks in the reader. >>>>>>>>> >>>>>>>>> In both patterns, there must be at least one thread per >> connector, >>>>>> either >>>>>>>>> inside (created by connector writers) or outside (created by >> Flink) >>>> of >>>>>>>> the >>>>>>>>> connector. Ideally there are NUM_CONNECTORS + 1 threads in total, >>> to >>>>>> make >>>>>>>>> sure that 1 thread is fully non-blocking. >>>>>>>>> >>>>>>>>>> Btw, I don’t know if you understand my point. Having only >> `poll()` >>>> and >>>>>>>>> `take()` is not enough for single threaded task. If our source >>>>>> interface >>>>>>>>> doesn’t provide `notify()` callback nor >`CompletableFuture<?> >>>>>>>>> isBlocked(),`, there is no way to implement single threaded task >>> that >>>>>>>> both >>>>>>>>> reads the data from the source connector and can also react to >>> system >>>>>>>>> events. Ok, non >blocking `poll()` would allow that, but with >> busy >>>>>>>> looping. >>>>>>>>> >>>>>>>>> Completely agree that in pattern 2, having a callback is >> necessary >>>> for >>>>>>>> that >>>>>>>>> single thread outside of the connectors. And the connectors MUST >>> have >>>>>>>>> internal threads. If we go that way, we should have something >> like >>>>>> "void >>>>>>>>> poll(Callback) / void advance(callback)". I am curious how would >>>>>>>>> CompletableFuture work here, though. If 10 readers returns 10 >>>>>> completable >>>>>>>>> futures, will there be 10 additional threads (so 20 threads in >>> total) >>>>>>>>> blocking waiting on them? Or will there be a single thread busy >>> loop >>>>>>>>> checking around? >>>>>>>>> >>>>>>>>> WRT pattern 1, a single blocking take() API should just work. The >>>> good >>>>>>>>> thing is that a blocking read API is usually simpler to >> implement. >>> An >>>>>>>>> additional non-blocking "T poll()" method here is indeed optional >>> and >>>>>>>> could >>>>>>>>> be used in cases like Flink does not want the thread to block >>>> forever. >>>>>>>> They >>>>>>>>> can also be combined to have a "T poll(Timeout)", which is >> exactly >>>> what >>>>>>>>> KafkaConsumer did. >>>>>>>>> >>>>>>>>> It sounds that you are proposing pattern 2 with something similar >>> to >>>>>> NIO2 >>>>>>>>> AsynchronousByteChannel[1]. That API would work, except that the >>>>>>>> signature >>>>>>>>> returning future seems not necessary. If that is the case, a >> minor >>>>>> change >>>>>>>>> on the current FLIP proposal to have "void advance(callback)" >>> should >>>>>>>> work. >>>>>>>>> And this means the connectors MUST have their internal threads. >>>>>>>>> >>>>>>>>> BTW, one thing I am also trying to avoid is pushing users to >>> perform >>>> IO >>>>>>>> in >>>>>>>>> a method like "isBlocked()". If the method is expected to fetch >>>> records >>>>>>>>> (even if not returning them), naming it something more explicit >>> would >>>>>>>> help >>>>>>>>> avoid confusion. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> >>>>>>>> >>>>>> >>>> >>> >> https://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousByteChannel.html >>>>>>>>> >>>>>>>>> On Fri, Nov 9, 2018 at 11:20 PM Piotr Nowojski < >>>>>> pi...@data-artisans.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi >>>>>>>>>> >>>>>>>>>> Good point with select/epoll, however I do not see how they >>> couldn’t >>>>>> be >>>>>>>>>> with Flink if we would like single task in Flink to be >>>> single-threaded >>>>>>>> (and >>>>>>>>>> I believe we should pursue this goal). If your connector blocks >> on >>>>>>>>>> `select`, then it can not process/handle control messages from >>>> Flink, >>>>>>>> like >>>>>>>>>> checkpoints, releasing resources and potentially output flushes. >>>> This >>>>>>>> would >>>>>>>>>> require tight integration between connector and Flink’s main >> event >>>>>>>>>> loop/selects/etc. >>>>>>>>>> >>>>>>>>>> Looking at it from other perspective. Let’s assume that we have >> a >>>>>>>>>> connector implemented on top of `select`/`epoll`. In order to >>>>>> integrate >>>>>>>> it >>>>>>>>>> with Flink’s checkpointing/flushes/resource releasing it will >> have >>>> to >>>>>> be >>>>>>>>>> executed in separate thread one way or another. At least if our >>> API >>>>>> will >>>>>>>>>> enforce/encourage non blocking implementations with some kind of >>>>>>>>>> notifications (`isBlocked()` or `notify()` callback), some >>>> connectors >>>>>>>> might >>>>>>>>>> skip one layer of wapping threads. >>>>>>>>>> >>>>>>>>>> Btw, I don’t know if you understand my point. Having only >> `poll()` >>>> and >>>>>>>>>> `take()` is not enough for single threaded task. If our source >>>>>> interface >>>>>>>>>> doesn’t provide `notify()` callback nor `CompletableFuture<?> >>>>>>>>>> isBlocked(),`, there is no way to implement single threaded task >>>> that >>>>>>>> both >>>>>>>>>> reads the data from the source connector and can also react to >>>> system >>>>>>>>>> events. Ok, non blocking `poll()` would allow that, but with >> busy >>>>>>>> looping. >>>>>>>>>> >>>>>>>>>> Piotrek >>>>>>>>>> >>>>>>>>>>> On 8 Nov 2018, at 06:56, Becket Qin <becket....@gmail.com> >>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Piotrek, >>>>>>>>>>> >>>>>>>>>>>> But I don’t see a reason why we should expose both blocking >>>> `take()` >>>>>>>> and >>>>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone >> (Flink >>>>>>>> engine >>>>>>>>>> or >>>>>>>>>>> connector) would have to do the same busy >>>>>>>>>>>> looping anyway and I think it would be better to have a >> simpler >>>>>>>>>> connector >>>>>>>>>>> API (that would solve our problems) and force connectors to >>> comply >>>>>> one >>>>>>>>>> way >>>>>>>>>>> or another. >>>>>>>>>>> >>>>>>>>>>> If we let the block happen inside the connector, the blocking >>> does >>>>>> not >>>>>>>>>> have >>>>>>>>>>> to be a busy loop. For example, to do the block waiting >>>> efficiently, >>>>>>>> the >>>>>>>>>>> connector can use java NIO selector().select which relies on OS >>>>>> syscall >>>>>>>>>>> like epoll[1] instead of busy looping. But if Flink engine >> blocks >>>>>>>> outside >>>>>>>>>>> the connector, it pretty much has to do the busy loop. So if >>> there >>>> is >>>>>>>>>> only >>>>>>>>>>> one API to get the element, a blocking getNextElement() makes >>> more >>>>>>>> sense. >>>>>>>>>>> In any case, we should avoid ambiguity. It has to be crystal >>> clear >>>>>>>> about >>>>>>>>>>> whether a method is expected to be blocking or non-blocking. >>>>>> Otherwise >>>>>>>> it >>>>>>>>>>> would be very difficult for Flink engine to do the right thing >>> with >>>>>> the >>>>>>>>>>> connectors. At the first glance at getCurrent(), the expected >>>>>> behavior >>>>>>>> is >>>>>>>>>>> not quite clear. >>>>>>>>>>> >>>>>>>>>>> That said, I do agree that functionality wise, poll() and >> take() >>>> kind >>>>>>>> of >>>>>>>>>>> overlap. But they are actually not quite different from >>>>>>>>>>> isBlocked()/getNextElement(). Compared with isBlocked(), the >> only >>>>>>>>>>> difference is that poll() also returns the next record if it is >>>>>>>>>> available. >>>>>>>>>>> But I agree that the isBlocked() + getNextElement() is more >>>> flexible >>>>>> as >>>>>>>>>>> users can just check the record availability, but not fetch the >>>> next >>>>>>>>>>> element. >>>>>>>>>>> >>>>>>>>>>>> In case of thread-less readers with only non-blocking >>>> `queue.poll()` >>>>>>>> (is >>>>>>>>>>> that really a thing? I can not think about a real >> implementation >>>> that >>>>>>>>>>> enforces such constraints) >>>>>>>>>>> Right, it is pretty much a syntax sugar to allow user combine >> the >>>>>>>>>>> check-and-take into one method. It could be achieved with >>>>>> isBlocked() + >>>>>>>>>>> getNextElement(). >>>>>>>>>>> >>>>>>>>>>> [1] http://man7.org/linux/man-pages/man7/epoll.7.html >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>> >>>>>>>>>>> On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski < >>>>>>>> pi...@data-artisans.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Becket, >>>>>>>>>>>> >>>>>>>>>>>> With my proposal, both of your examples would have to be >> solved >>> by >>>>>> the >>>>>>>>>>>> connector and solution to both problems would be the same: >>>>>>>>>>>> >>>>>>>>>>>> Pretend that connector is never blocked (`isBlocked() { return >>>>>>>>>>>> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking >>>>>> fashion >>>>>>>>>> (or >>>>>>>>>>>> semi blocking with return of control from time to time to >> allow >>>> for >>>>>>>>>>>> checkpointing, network flushing and other resource management >>>> things >>>>>>>> to >>>>>>>>>>>> happen in the same main thread). In other words, exactly how >> you >>>>>> would >>>>>>>>>>>> implement `take()` method or how the same source connector >> would >>>> be >>>>>>>>>>>> implemented NOW with current source interface. The difference >>> with >>>>>>>>>> current >>>>>>>>>>>> interface would be only that main loop would be outside of the >>>>>>>>>> connector, >>>>>>>>>>>> and instead of periodically releasing checkpointing lock, >>>>>> periodically >>>>>>>>>>>> `return null;` or `return Optional.empty();` from >>>>>> `getNextElement()`. >>>>>>>>>>>> >>>>>>>>>>>> In case of thread-less readers with only non-blocking >>>> `queue.poll()` >>>>>>>> (is >>>>>>>>>>>> that really a thing? I can not think about a real >> implementation >>>>>> that >>>>>>>>>>>> enforces such constraints), we could provide a wrapper that >>> hides >>>>>> the >>>>>>>>>> busy >>>>>>>>>>>> looping. The same applies how to solve forever blocking >> readers >>> - >>>> we >>>>>>>>>> could >>>>>>>>>>>> provider another wrapper running the connector in separate >>> thread. >>>>>>>>>>>> >>>>>>>>>>>> But I don’t see a reason why we should expose both blocking >>>> `take()` >>>>>>>> and >>>>>>>>>>>> non-blocking `poll()` methods to the Flink engine. Someone >>> (Flink >>>>>>>>>> engine or >>>>>>>>>>>> connector) would have to do the same busy looping anyway and I >>>> think >>>>>>>> it >>>>>>>>>>>> would be better to have a simpler connector API (that would >>> solve >>>>>> our >>>>>>>>>>>> problems) and force connectors to comply one way or another. >>>>>>>>>>>> >>>>>>>>>>>> Piotrek >>>>>>>>>>>> >>>>>>>>>>>>> On 7 Nov 2018, at 10:55, Becket Qin <becket....@gmail.com> >>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi Piotr, >>>>>>>>>>>>> >>>>>>>>>>>>> I might have misunderstood you proposal. But let me try to >>>> explain >>>>>> my >>>>>>>>>>>>> concern. I am thinking about the following case: >>>>>>>>>>>>> 1. a reader has the following two interfaces, >>>>>>>>>>>>> boolean isBlocked() >>>>>>>>>>>>> T getNextElement() >>>>>>>>>>>>> 2. the implementation of getNextElement() is non-blocking. >>>>>>>>>>>>> 3. The reader is thread-less, i.e. it does not have any >>> internal >>>>>>>>>> thread. >>>>>>>>>>>>> For example, it might just delegate the getNextElement() to a >>>>>>>>>>>> queue.poll(), >>>>>>>>>>>>> and isBlocked() is just queue.isEmpty(). >>>>>>>>>>>>> >>>>>>>>>>>>> How can Flink efficiently implement a blocking reading >> behavior >>>>>> with >>>>>>>>>> this >>>>>>>>>>>>> reader? Either a tight loop or a backoff interval is needed. >>>>>> Neither >>>>>>>> of >>>>>>>>>>>>> them is ideal. >>>>>>>>>>>>> >>>>>>>>>>>>> Now let's say in the reader mentioned above implements a >>> blocking >>>>>>>>>>>>> getNextElement() method. Because there is no internal thread >> in >>>> the >>>>>>>>>>>> reader, >>>>>>>>>>>>> after isBlocked() returns false. Flink will still have to >> loop >>> on >>>>>>>>>>>>> isBlocked() to check whether the next record is available. If >>> the >>>>>>>> next >>>>>>>>>>>>> record reaches after 10 min, it is a tight loop for 10 min. >> You >>>>>> have >>>>>>>>>>>>> probably noticed that in this case, even isBlocked() returns >> a >>>>>>>> future, >>>>>>>>>>>> that >>>>>>>>>>>>> future() will not be completed if Flink does not call some >>> method >>>>>>>> from >>>>>>>>>>>> the >>>>>>>>>>>>> reader, because the reader has no internal thread to complete >>>> that >>>>>>>>>> future >>>>>>>>>>>>> by itself. >>>>>>>>>>>>> >>>>>>>>>>>>> Due to the above reasons, a blocking take() API would allow >>> Flink >>>>>> to >>>>>>>>>> have >>>>>>>>>>>>> an efficient way to read from a reader. There are many ways >> to >>>> wake >>>>>>>> up >>>>>>>>>>>> the >>>>>>>>>>>>> blocking thread when checkpointing is needed depending on the >>>>>>>>>>>>> implementation. But I think the poll()/take() API would also >>> work >>>>>> in >>>>>>>>>> that >>>>>>>>>>>>> case. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> >>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski < >>>>>>>> pi...@data-artisans.com >>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> a) >>>>>>>>>>>>>> >>>>>>>>>>>>>>> BTW, regarding the isBlock() method, I have a few more >>>> questions. >>>>>>>> 21, >>>>>>>>>>>> Is >>>>>>>>>>>>>> a method isReady() with boolean as a return value >>>>>>>>>>>>>>> equivalent? Personally I found it is a little bit confusing >>> in >>>>>> what >>>>>>>>>> is >>>>>>>>>>>>>> supposed to be returned when the future is completed. 22. if >>>>>>>>>>>>>>> the implementation of isBlocked() is optional, how do the >>>> callers >>>>>>>>>> know >>>>>>>>>>>>>> whether the method is properly implemented or not? >>>>>>>>>>>>>>> Does not implemented mean it always return a completed >>> future? >>>>>>>>>>>>>> >>>>>>>>>>>>>> `CompletableFuture<?> isBlocked()` is more or less an >>> equivalent >>>>>> to >>>>>>>>>>>>>> `boolean hasNext()` which in case of “false” provides some >>> kind >>>>>> of a >>>>>>>>>>>>>> listener/callback that notifies about presence of next >>> element. >>>>>>>> There >>>>>>>>>>>> are >>>>>>>>>>>>>> some minor details, like `CompletableFuture<?>` has a >> minimal >>>> two >>>>>>>>>> state >>>>>>>>>>>>>> logic: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. Future is completed - we have more data >>>>>>>>>>>>>> 2. Future not yet completed - we don’t have data now, but we >>>>>>>> might/we >>>>>>>>>>>> will >>>>>>>>>>>>>> have in the future >>>>>>>>>>>>>> >>>>>>>>>>>>>> While `boolean hasNext()` and `notify()` callback are a bit >>> more >>>>>>>>>>>>>> complicated/dispersed and can lead/encourage `notify()` >> spam. >>>>>>>>>>>>>> >>>>>>>>>>>>>> b) >>>>>>>>>>>>>> >>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent` to one method >>> like >>>>>>>>>>>> `getNext` >>>>>>>>>>>>>> the `getNext` would need return a >>>>>>>>>>>>>>> `ElementWithTimestamp` because some sources want to add >>>> timestamp >>>>>>>> to >>>>>>>>>>>>>> every element. IMO, this is not so memory friendly >>>>>>>>>>>>>>> so I prefer this design. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Guowei I don’t quite understand this. Could you elaborate >> why >>>>>>>> having a >>>>>>>>>>>>>> separate `advance()` help? >>>>>>>>>>>>>> >>>>>>>>>>>>>> c) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regarding advance/poll/take. What’s the value of having two >>>>>> separate >>>>>>>>>>>>>> methods: poll and take? Which one of them should be called >> and >>>>>> which >>>>>>>>>>>>>> implemented? What’s the benefit of having those methods >>> compared >>>>>> to >>>>>>>>>>>> having >>>>>>>>>>>>>> a one single method `getNextElement()` (or `pollElement() or >>>>>>>> whatever >>>>>>>>>> we >>>>>>>>>>>>>> name it) with following contract: >>>>>>>>>>>>>> >>>>>>>>>>>>>> CompletableFuture<?> isBlocked(); >>>>>>>>>>>>>> >>>>>>>>>>>>>> /** >>>>>>>>>>>>>> Return next element - will be called only if `isBlocked()` >> is >>>>>>>>>> completed. >>>>>>>>>>>>>> Try to implement it in non blocking fashion, but if that’s >>>>>>>> impossible >>>>>>>>>> or >>>>>>>>>>>>>> you just don’t need the effort, you can block in this >> method. >>>>>>>>>>>>>> */ >>>>>>>>>>>>>> T getNextElement(); >>>>>>>>>>>>>> >>>>>>>>>>>>>> I mean, if the connector is implemented non-blockingly, >> Flink >>>>>> should >>>>>>>>>> use >>>>>>>>>>>>>> it that way. If it’s not, then `poll()` will `throw new >>>>>>>>>>>>>> NotImplementedException()`. Implementing both of them and >>>>>> providing >>>>>>>>>>>> both of >>>>>>>>>>>>>> them to Flink wouldn’t make a sense, thus why not merge them >>>> into >>>>>> a >>>>>>>>>>>> single >>>>>>>>>>>>>> method call that should preferably (but not necessarily need >>> to) >>>>>> be >>>>>>>>>>>>>> non-blocking? It’s not like we are implementing general >>> purpose >>>>>>>>>> `Queue`, >>>>>>>>>>>>>> which users might want to call either of `poll` or `take`. >> We >>>>>> would >>>>>>>>>>>> always >>>>>>>>>>>>>> prefer to call `poll`, but if it’s blocking, then still we >>> have >>>> no >>>>>>>>>>>> choice, >>>>>>>>>>>>>> but to call it and block on it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> d) >>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking >> source >>>> is >>>>>>>> very >>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may be >>>> another >>>>>>>> way >>>>>>>>>>>> to >>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly if >>>> every >>>>>>>>>>>> advance >>>>>>>>>>>>>>> call return a Future. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I didn’t want to mention this, to not clog my initial >>> proposal, >>>>>> but >>>>>>>>>>>> there >>>>>>>>>>>>>> is a simple solution for the problem: >>>>>>>>>>>>>> >>>>>>>>>>>>>> public interface SplitReader { >>>>>>>>>>>>>> >>>>>>>>>>>>>> (…) >>>>>>>>>>>>>> >>>>>>>>>>>>>> CompletableFuture<?> NOT_BLOCKED = >>>>>>>>>>>>>> CompletableFuture.completedFuture(null); >>>>>>>>>>>>>> >>>>>>>>>>>>>> /** >>>>>>>>>>>>>> * Returns a future that will be completed when the page >> source >>>>>>>>>>>> becomes >>>>>>>>>>>>>> * unblocked. If the page source is not blocked, this method >>>>>>>> should >>>>>>>>>>>>>> return >>>>>>>>>>>>>> * {@code NOT_BLOCKED}. >>>>>>>>>>>>>> */ >>>>>>>>>>>>>> default CompletableFuture<?> isBlocked() >>>>>>>>>>>>>> { >>>>>>>>>>>>>> return NOT_BLOCKED; >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> If we are blocked and we are waiting for the IO, then >>> creating a >>>>>> new >>>>>>>>>>>>>> Future is non-issue. Under full throttle/throughput and not >>>>>> blocked >>>>>>>>>>>> sources >>>>>>>>>>>>>> returning a static `NOT_BLOCKED` constant should also solve >>> the >>>>>>>>>>>> problem. >>>>>>>>>>>>>> >>>>>>>>>>>>>> One more remark, non-blocking sources might be a necessity >> in >>> a >>>>>>>> single >>>>>>>>>>>>>> threaded model without a checkpointing lock. (Currently when >>>>>> sources >>>>>>>>>> are >>>>>>>>>>>>>> blocked, they can release checkpointing lock and re-acquire >> it >>>>>> again >>>>>>>>>>>>>> later). Non-blocking `poll`/`getNext()` would allow for >>>>>> checkpoints >>>>>>>> to >>>>>>>>>>>>>> happen when source is idling. In that case either `notify()` >>> or >>>> my >>>>>>>>>>>> proposed >>>>>>>>>>>>>> `isBlocked()` would allow to avoid busy-looping. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 5 Nov 2018, at 03:59, Becket Qin <becket....@gmail.com> >>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The iterator-like API was also the first thing that came to >>> me. >>>>>> But >>>>>>>>>> it >>>>>>>>>>>>>>> seems a little confusing that hasNext() does not mean "the >>>> stream >>>>>>>> has >>>>>>>>>>>> not >>>>>>>>>>>>>>> ended", but means "the next record is ready", which is >>>>>> repurposing >>>>>>>>>> the >>>>>>>>>>>>>> well >>>>>>>>>>>>>>> known meaning of hasNext(). If we follow the >> hasNext()/next() >>>>>>>>>> pattern, >>>>>>>>>>>> an >>>>>>>>>>>>>>> additional isNextReady() method to indicate whether the >> next >>>>>> record >>>>>>>>>> is >>>>>>>>>>>>>>> ready seems more intuitive to me. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Similarly, in poll()/take() pattern, another method of >>> isDone() >>>>>> is >>>>>>>>>>>> needed >>>>>>>>>>>>>>> to indicate whether the stream has ended or not. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Compared with hasNext()/next()/isNextReady() pattern, >>>>>>>>>>>>>>> isDone()/poll()/take() seems more flexible for the reader >>>>>>>>>>>> implementation. >>>>>>>>>>>>>>> When I am implementing a reader, I could have a couple of >>>>>> choices: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - A thread-less reader that does not have any internal >>> thread. >>>>>>>>>>>>>>> - When poll() is called, the same calling thread will >>> perform a >>>>>>>> bunch >>>>>>>>>>>>>> of >>>>>>>>>>>>>>> IO asynchronously. >>>>>>>>>>>>>>> - When take() is called, the same calling thread will >>> perform a >>>>>>>>>>>>>> bunch >>>>>>>>>>>>>>> of IO and wait until the record is ready. >>>>>>>>>>>>>>> - A reader with internal threads performing network IO and >>> put >>>>>>>>>> records >>>>>>>>>>>>>>> into a buffer. >>>>>>>>>>>>>>> - When poll() is called, the calling thread simply reads >> from >>>>>> the >>>>>>>>>>>>>>> buffer and return empty result immediately if there is no >>>>>> record. >>>>>>>>>>>>>>> - When take() is called, the calling thread reads from the >>>>>> buffer >>>>>>>>>>>>>> and >>>>>>>>>>>>>>> block waiting if the buffer is empty. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On the other hand, with the hasNext()/next()/isNextReady() >>> API, >>>>>> it >>>>>>>> is >>>>>>>>>>>>>> less >>>>>>>>>>>>>>> intuitive for the reader developers to write the >> thread-less >>>>>>>> pattern. >>>>>>>>>>>>>>> Although technically speaking one can still do the >>> asynchronous >>>>>> IO >>>>>>>> to >>>>>>>>>>>>>>> prepare the record in isNextReady(). But it is inexplicit >> and >>>>>> seems >>>>>>>>>>>>>>> somewhat hacky. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise < >> t...@apache.org> >>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Couple more points regarding discovery: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The proposal mentions that discovery could be outside the >>>>>>>> execution >>>>>>>>>>>>>> graph. >>>>>>>>>>>>>>>> Today, discovered partitions/shards are checkpointed. I >>>> believe >>>>>>>> that >>>>>>>>>>>>>> will >>>>>>>>>>>>>>>> also need to be the case in the future, even when >> discovery >>>> and >>>>>>>>>>>> reading >>>>>>>>>>>>>> are >>>>>>>>>>>>>>>> split between different tasks. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For cases such as resharding of a Kinesis stream, the >>>>>> relationship >>>>>>>>>>>>>> between >>>>>>>>>>>>>>>> splits needs to be considered. Splits cannot be randomly >>>>>>>> distributed >>>>>>>>>>>>>> over >>>>>>>>>>>>>>>> readers in certain situations. An example was mentioned >>> here: >>>>>>>>>>>>>>>> >>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise < >> t...@apache.org >>>> >>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for getting the ball rolling on this! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Can the number of splits decrease? Yes, splits can be >>> closed >>>>>> and >>>>>>>> go >>>>>>>>>>>>>> away. >>>>>>>>>>>>>>>>> An example would be a shard merge in Kinesis (2 existing >>>> shards >>>>>>>>>> will >>>>>>>>>>>> be >>>>>>>>>>>>>>>>> closed and replaced with a new shard). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regarding advance/poll/take: IMO the least restrictive >>>> approach >>>>>>>>>> would >>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> the thread-less IO model (pull based, non-blocking, >> caller >>>>>>>>>> retrieves >>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>> records when available). The current Kinesis API requires >>> the >>>>>> use >>>>>>>>>> of >>>>>>>>>>>>>>>>> threads. But that can be internal to the split reader and >>>> does >>>>>>>> not >>>>>>>>>>>> need >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> be a source API concern. In fact, that's what we are >>> working >>>> on >>>>>>>>>> right >>>>>>>>>>>>>> now >>>>>>>>>>>>>>>>> as improvement to the existing consumer: Each shard >>> consumer >>>>>>>> thread >>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>> push to a queue, the consumer main thread will poll the >>>>>> queue(s). >>>>>>>>>> It >>>>>>>>>>>> is >>>>>>>>>>>>>>>>> essentially a mapping from threaded IO to non-blocking. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The proposed SplitReader interface would fit the >>> thread-less >>>> IO >>>>>>>>>>>> model. >>>>>>>>>>>>>>>>> Similar to an iterator, we find out if there is a new >>> element >>>>>>>>>>>> (hasNext) >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> if so, move to it (next()). Separate calls deliver the >> meta >>>>>>>>>>>> information >>>>>>>>>>>>>>>>> (timestamp, watermark). Perhaps advance call could offer >> a >>>>>>>> timeout >>>>>>>>>>>>>>>> option, >>>>>>>>>>>>>>>>> so that the caller does not end up in a busy wait. On the >>>> other >>>>>>>>>>>> hand, a >>>>>>>>>>>>>>>>> caller processing multiple splits may want to cycle >> through >>>>>> fast, >>>>>>>>>> to >>>>>>>>>>>>>>>>> process elements of other splits as soon as they become >>>>>>>> available. >>>>>>>>>>>> The >>>>>>>>>>>>>>>> nice >>>>>>>>>>>>>>>>> thing is that this "split merge" logic can now live in >>> Flink >>>>>> and >>>>>>>> be >>>>>>>>>>>>>>>>> optimized and shared between different sources. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma < >>>> guowei....@gmail.com >>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> Thanks Aljoscha for this FLIP. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking >>>> source >>>>>> is >>>>>>>>>>>> very >>>>>>>>>>>>>>>>>> important. But in addition to `Future/poll`, there may >> be >>>>>>>> another >>>>>>>>>>>> way >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> achieve this. I think it may be not very memory friendly >>> if >>>>>>>> every >>>>>>>>>>>>>>>> advance >>>>>>>>>>>>>>>>>> call return a Future. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> public interface Listener { >>>>>>>>>>>>>>>>>> public void notify(); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> public interface SplitReader() { >>>>>>>>>>>>>>>>>> /** >>>>>>>>>>>>>>>>>> * When there is no element temporarily, this will return >>>>>>>> false. >>>>>>>>>>>>>>>>>> * When elements is available again splitReader can call >>>>>>>>>>>>>>>>>> listener.notify() >>>>>>>>>>>>>>>>>> * In addition the frame would check `advance` >>> periodically . >>>>>>>>>>>>>>>>>> * Of course advance can always return true and ignore >> the >>>>>>>>>>>>>> listener >>>>>>>>>>>>>>>>>> argument for simplicity. >>>>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>>>> public boolean advance(Listener listener); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2. The FLIP tells us very clearly that how to create >> all >>>>>> Splits >>>>>>>>>> and >>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>> to create a SplitReader from a Split. But there is no >>>> strategy >>>>>>>> for >>>>>>>>>>>> the >>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>> to choose how to assign the splits to the tasks. I think >>> we >>>>>>>> could >>>>>>>>>>>> add >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> Enum to let user to choose. >>>>>>>>>>>>>>>>>> /** >>>>>>>>>>>>>>>>>> public Enum SplitsAssignmentPolicy { >>>>>>>>>>>>>>>>>> Location, >>>>>>>>>>>>>>>>>> Workload, >>>>>>>>>>>>>>>>>> Random, >>>>>>>>>>>>>>>>>> Average >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 3. If merge the `advance` and `getCurrent` to one >> method >>>> like >>>>>>>>>>>>>> `getNext` >>>>>>>>>>>>>>>>>> the `getNext` would need return a `ElementWithTimestamp` >>>>>> because >>>>>>>>>>>> some >>>>>>>>>>>>>>>>>> sources want to add timestamp to every element. IMO, >> this >>> is >>>>>> not >>>>>>>>>> so >>>>>>>>>>>>>>>> memory >>>>>>>>>>>>>>>>>> friendly so I prefer this design. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Piotr Nowojski <pi...@data-artisans.com> 于2018年11月1日周四 >>>>>>>> 下午6:08写道: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite >> a >>>> lot >>>>>> of >>>>>>>>>>>> other >>>>>>>>>>>>>>>>>>> possible improvements. I have one proposal. Instead of >>>>>> having a >>>>>>>>>>>>>> method: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> boolean advance() throws IOException; >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I would replace it with >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> /* >>>>>>>>>>>>>>>>>>> * Return a future, which when completed means that >> source >>>> has >>>>>>>>>> more >>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>>>>> and getNext() will not block. >>>>>>>>>>>>>>>>>>> * If you wish to use benefits of non blocking >> connectors, >>>>>>>> please >>>>>>>>>>>>>>>>>>> implement this method appropriately. >>>>>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>>>>> default CompletableFuture<?> isBlocked() { >>>>>>>>>>>>>>>>>>> return CompletableFuture.completedFuture(null); >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> And rename `getCurrent()` to `getNext()`. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Couple of arguments: >>>>>>>>>>>>>>>>>>> 1. I don’t understand the division of work between >>>>>> `advance()` >>>>>>>>>> and >>>>>>>>>>>>>>>>>>> `getCurrent()`. What should be done in which, >> especially >>>> for >>>>>>>>>>>>>> connectors >>>>>>>>>>>>>>>>>>> that handle records in batches (like Kafka) and when >>> should >>>>>> you >>>>>>>>>>>> call >>>>>>>>>>>>>>>>>>> `advance` and when `getCurrent()`. >>>>>>>>>>>>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will >>>> allow >>>>>>>> us >>>>>>>>>> in >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> future to have asynchronous/non blocking connectors and >>>> more >>>>>>>>>>>>>>>> efficiently >>>>>>>>>>>>>>>>>>> handle large number of blocked threads, without busy >>>> waiting. >>>>>>>>>> While >>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> same time it doesn’t add much complexity, since naive >>>>>> connector >>>>>>>>>>>>>>>>>>> implementations can be always blocking. >>>>>>>>>>>>>>>>>>> 3. This also would allow us to use a fixed size thread >>> pool >>>>>> of >>>>>>>>>> task >>>>>>>>>>>>>>>>>>> executors, instead of one thread per task. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Piotrek >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek < >>>>>>>> aljos...@apache.org >>>>>>>>>>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi All, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> In order to finally get the ball rolling on the new >>> source >>>>>>>>>>>> interface >>>>>>>>>>>>>>>>>>> that we have discussed for so long I finally created a >>>> FLIP: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing >>>>>>>> work/discussion >>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>> adding per-partition watermark support to the Kinesis >>>> source >>>>>>>> and >>>>>>>>>>>>>>>> because >>>>>>>>>>>>>>>>>>> this would enable generic implementation of event-time >>>>>>>> alignment >>>>>>>>>>>> for >>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>> sources. Maybe we need another FLIP for the event-time >>>>>>>> alignment >>>>>>>>>>>>>> part, >>>>>>>>>>>>>>>>>>> especially the part about information sharing between >>>>>>>> operations >>>>>>>>>>>> (I'm >>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>> calling it state sharing because state has a special >>>> meaning >>>>>> in >>>>>>>>>>>>>> Flink). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Please discuss away! >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>> >>>> >>>> >>> >>