Hi Biao!

This discussion was stalled because of preparations for the open sourcing & 
merging Blink. I think before creating the tickets we should split this 
discussion into topics/areas outlined by Stephan and create Flips for that.

I think there is no chance for this to be completed in couple of remaining 
weeks/1 month before 1.8 feature freeze, however it would be good to aim with 
those changes for 1.9.

Piotrek 

> On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> wrote:
> 
> Hi community,
> The summary of Stephan makes a lot sense to me. It is much clearer indeed
> after splitting the complex topic into small ones.
> I was wondering is there any detail plan for next step? If not, I would
> like to push this thing forward by creating some JIRA issues.
> Another question is that should version 1.8 include these features?
> 
> Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道:
> 
>> Thanks everyone for the lively discussion. Let me try to summarize where I
>> see convergence in the discussion and open issues.
>> I'll try to group this by design aspect of the source. Please let me know
>> if I got things wrong or missed something crucial here.
>> 
>> For issues 1-3, if the below reflects the state of the discussion, I would
>> try and update the FLIP in the next days.
>> For the remaining ones we need more discussion.
>> 
>> I would suggest to fork each of these aspects into a separate mail thread,
>> or will loose sight of the individual aspects.
>> 
>> *(1) Separation of Split Enumerator and Split Reader*
>> 
>>  - All seem to agree this is a good thing
>>  - Split Enumerator could in the end live on JobManager (and assign splits
>> via RPC) or in a task (and assign splits via data streams)
>>  - this discussion is orthogonal and should come later, when the interface
>> is agreed upon.
>> 
>> *(2) Split Readers for one or more splits*
>> 
>>  - Discussion seems to agree that we need to support one reader that
>> possibly handles multiple splits concurrently.
>>  - The requirement comes from sources where one poll()-style call fetches
>> data from different splits / partitions
>>    --> example sources that require that would be for example Kafka,
>> Pravega, Pulsar
>> 
>>  - Could have one split reader per source, or multiple split readers that
>> share the "poll()" function
>>  - To not make it too complicated, we can start with thinking about one
>> split reader for all splits initially and see if that covers all
>> requirements
>> 
>> *(3) Threading model of the Split Reader*
>> 
>>  - Most active part of the discussion ;-)
>> 
>>  - A non-blocking way for Flink's task code to interact with the source is
>> needed in order to a task runtime code based on a
>> single-threaded/actor-style task design
>>    --> I personally am a big proponent of that, it will help with
>> well-behaved checkpoints, efficiency, and simpler yet more robust runtime
>> code
>> 
>>  - Users care about simple abstraction, so as a subclass of SplitReader
>> (non-blocking / async) we need to have a BlockingSplitReader which will
>> form the basis of most source implementations. BlockingSplitReader lets
>> users do blocking simple poll() calls.
>>  - The BlockingSplitReader would spawn a thread (or more) and the
>> thread(s) can make blocking calls and hand over data buffers via a blocking
>> queue
>>  - This should allow us to cover both, a fully async runtime, and a simple
>> blocking interface for users.
>>  - This is actually very similar to how the Kafka connectors work. Kafka
>> 9+ with one thread, Kafka 8 with multiple threads
>> 
>>  - On the base SplitReader (the async one), the non-blocking method that
>> gets the next chunk of data would signal data availability via a
>> CompletableFuture, because that gives the best flexibility (can await
>> completion or register notification handlers).
>>  - The source task would register a "thenHandle()" (or similar) on the
>> future to put a "take next data" task into the actor-style mailbox
>> 
>> *(4) Split Enumeration and Assignment*
>> 
>>  - Splits may be generated lazily, both in cases where there is a limited
>> number of splits (but very many), or splits are discovered over time
>>  - Assignment should also be lazy, to get better load balancing
>>  - Assignment needs support locality preferences
>> 
>>  - Possible design based on discussion so far:
>> 
>>    --> SplitReader has a method "addSplits(SplitT...)" to add one or more
>> splits. Some split readers might assume they have only one split ever,
>> concurrently, others assume multiple splits. (Note: idea behind being able
>> to add multiple splits at the same time is to ease startup where multiple
>> splits may be assigned instantly.)
>>    --> SplitReader has a context object on which it can call indicate when
>> splits are completed. The enumerator gets that notification and can use to
>> decide when to assign new splits. This should help both in cases of sources
>> that take splits lazily (file readers) and in case the source needs to
>> preserve a partial order between splits (Kinesis, Pravega, Pulsar may need
>> that).
>>    --> SplitEnumerator gets notification when SplitReaders start and when
>> they finish splits. They can decide at that moment to push more splits to
>> that reader
>>    --> The SplitEnumerator should probably be aware of the source
>> parallelism, to build its initial distribution.
>> 
>>  - Open question: Should the source expose something like "host
>> preferences", so that yarn/mesos/k8s can take this into account when
>> selecting a node to start a TM on?
>> 
>> *(5) Watermarks and event time alignment*
>> 
>>  - Watermark generation, as well as idleness, needs to be per split (like
>> currently in the Kafka Source, per partition)
>>  - It is desirable to support optional event-time-alignment, meaning that
>> splits that are ahead are back-pressured or temporarily unsubscribed
>> 
>>  - I think i would be desirable to encapsulate watermark generation logic
>> in watermark generators, for a separation of concerns. The watermark
>> generators should run per split.
>>  - Using watermark generators would also help with another problem of the
>> suggested interface, namely supporting non-periodic watermarks efficiently.
>> 
>>  - Need a way to "dispatch" next record to different watermark generators
>>  - Need a way to tell SplitReader to "suspend" a split until a certain
>> watermark is reached (event time backpressure)
>>  - This would in fact be not needed (and thus simpler) if we had a
>> SplitReader per split and may be a reason to re-open that discussion
>> 
>> *(6) Watermarks across splits and in the Split Enumerator*
>> 
>>  - The split enumerator may need some watermark awareness, which should be
>> purely based on split metadata (like create timestamp of file splits)
>>  - If there are still more splits with overlapping event time range for a
>> split reader, then that split reader should not advance the watermark
>> within the split beyond the overlap boundary. Otherwise future splits will
>> produce late data.
>> 
>>  - One way to approach this could be that the split enumerator may send
>> watermarks to the readers, and the readers cannot emit watermarks beyond
>> that received watermark.
>>  - Many split enumerators would simply immediately send Long.MAX out and
>> leave the progress purely to the split readers.
>> 
>>  - For event-time alignment / split back pressure, this begs the question
>> how we can avoid deadlocks that may arise when splits are suspended for
>> event time back pressure,
>> 
>> *(7) Batch and streaming Unification*
>> 
>>  - Functionality wise, the above design should support both
>>  - Batch often (mostly) does not care about reading "in order" and
>> generating watermarks
>>    --> Might use different enumerator logic that is more locality aware
>> and ignores event time order
>>    --> Does not generate watermarks
>>  - Would be great if bounded sources could be identified at compile time,
>> so that "env.addBoundedSource(...)" is type safe and can return a
>> "BoundedDataStream".
>>  - Possible to defer this discussion until later
>> 
>> *Miscellaneous Comments*
>> 
>>  - Should the source have a TypeInformation for the produced type, instead
>> of a serializer? We need a type information in the stream anyways, and can
>> derive the serializer from that. Plus, creating the serializer should
>> respect the ExecutionConfig.
>> 
>>  - The TypeSerializer interface is very powerful but also not easy to
>> implement. Its purpose is to handle data super efficiently, support
>> flexible ways of evolution, etc.
>>  For metadata I would suggest to look at the SimpleVersionedSerializer
>> instead, which is used for example for checkpoint master hooks, or for the
>> streaming file sink. I think that is is a good match for cases where we do
>> not need more than ser/deser (no copy, etc.) and don't need to push
>> versioning out of the serialization paths for best performance (as in the
>> TypeSerializer)
>> 
>> 
>> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
>> k.klou...@data-artisans.com>
>> wrote:
>> 
>>> Hi Biao,
>>> 
>>> Thanks for the answer!
>>> 
>>> So given the multi-threaded readers, now we have as open questions:
>>> 
>>> 1) How do we let the checkpoints pass through our multi-threaded reader
>>> operator?
>>> 
>>> 2) Do we have separate reader and source operators or not? In the
>> strategy
>>> that has a separate source, the source operator has a parallelism of 1
>> and
>>> is responsible for split recovery only.
>>> 
>>> For the first one, given also the constraints (blocking, finite queues,
>>> etc), I do not have an answer yet.
>>> 
>>> For the 2nd, I think that we should go with separate operators for the
>>> source and the readers, for the following reasons:
>>> 
>>> 1) This is more aligned with a potential future improvement where the
>> split
>>> discovery becomes a responsibility of the JobManager and readers are
>>> pooling more work from the JM.
>>> 
>>> 2) The source is going to be the "single point of truth". It will know
>> what
>>> has been processed and what not. If the source and the readers are a
>> single
>>> operator with parallelism > 1, or in general, if the split discovery is
>>> done by each task individually, then:
>>>   i) we have to have a deterministic scheme for each reader to assign
>>> splits to itself (e.g. mod subtaskId). This is not necessarily trivial
>> for
>>> all sources.
>>>   ii) each reader would have to keep a copy of all its processed slpits
>>>   iii) the state has to be a union state with a non-trivial merging
>> logic
>>> in order to support rescaling.
>>> 
>>> Two additional points that you raised above:
>>> 
>>> i) The point that you raised that we need to keep all splits (processed
>> and
>>> not-processed) I think is a bit of a strong requirement. This would imply
>>> that for infinite sources the state will grow indefinitely. This is
>> problem
>>> is even more pronounced if we do not have a single source that assigns
>>> splits to readers, as each reader will have its own copy of the state.
>>> 
>>> ii) it is true that for finite sources we need to somehow not close the
>>> readers when the source/split discoverer finishes. The
>>> ContinuousFileReaderOperator has a work-around for that. It is not
>> elegant,
>>> and checkpoints are not emitted after closing the source, but this, I
>>> believe, is a bigger problem which requires more changes than just
>>> refactoring the source interface.
>>> 
>>> Cheers,
>>> Kostas
>>> 
>> 

Reply via email to