Just to add a comment from the Flink side and its
UnboundedSourceWrapper. We experienced the only way to guarantee
deterministic splitting of the source, was to generate the splits upon
creation of the source and then checkpoint the assignment during
runtime. When restoring from a checkpoint, the same reader
configuration is restored. It's not possible to change the splitting
after the initial splitting has taken place. However, Flink will soon
be able to repartition the operator state upon restart/rescaling of a
job.

Does Spark have a way to pass state of a previous mini batch to the
current mini batch? If so, you could restore the last configuration
and continue reading from the checkpointed offset. You just have to
checkpoint before the mini batch ends.

-Max

On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <[email protected]> 
wrote:
> Hi Amit,
>
> thanks for the explanation.
>
> For 4, you are right, it's slightly different from DataXchange (related to
> the elements in the PCollection). I think storing the "starting point" for a
> reader makes sense.
>
> Regards
> JB
>
>
> On 10/10/2016 10:33 AM, Amit Sela wrote:
>>
>> Inline, thanks JB!
>>
>> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <[email protected]>
>> wrote:
>>
>>> Hi Amit,
>>>
>>>
>>>
>>> For 1., the runner is responsible of the checkpoint storage (associated
>>>
>>> with the source). It's the way for the runner to retry and know the
>>>
>>> failed bundles.
>>>
>> True, this was a recap/summary of another, not-so-clear, thread.
>>
>>>
>>>
>>>
>>> For 4, are you proposing that KafkaRecord store additional metadata for
>>>
>>> that ? It sounds like what I proposed in the "Technical Vision" appendix
>>>
>>> document: there I proposed to introduce a DataXchange object that store
>>>
>>> some additional metadata (like offset) used by the runner. It would be
>>>
>>> the same with SDF as the tracker state should be persistent as well.
>>>
>> I think I was more focused on persisting the "starting point" for a
>> reader,
>> even if no records were read (yet), so that the next time the reader
>> attempts to read it will pick of there. This has more to do with how the
>> CheckpointMark handles this.
>> I have to say that I'm not familiar with your DataXchange proposal, I will
>> take a look though.
>>
>>>
>>>
>>>
>>> Regards
>>>
>>> JB
>>>
>>>
>>>
>>> On 10/08/2016 01:55 AM, Amit Sela wrote:
>>>
>>>> I started a thread about (suggesting) UnboundedSource splitId's and it
>>>
>>>
>>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's best
>>>
>>> to
>>>
>>>> start over in a clear [DISCUSS] thread.
>>>
>>>
>>>>
>>>
>>>> When working on UnboundedSource support for the Spark runner, I've
>>>> raised
>>>
>>>
>>>> some questions, some were general-UnboundedSource, and others
>>>
>>>
>>>> Kafka-specific.
>>>
>>>
>>>>
>>>
>>>> I'd like to recap them here, and maybe have a more productive and
>>>
>>>
>>>> well-documented discussion for everyone.
>>>
>>>
>>>>
>>>
>>>>    1. UnboundedSource id's - I assume any runner persists the
>>>
>>>
>>>>    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
>>>
>>> how it
>>>
>>>>    matches the appropriate split (of the UnboundedSource) to it's
>>>
>>> previously
>>>
>>>>    persisted CheckpointMark in any specific worker ?
>>>
>>>
>>>>    *Thomas Groh* mentioned that Source splits have to have an
>>>
>>>
>>>> associated identifier,
>>>
>>>
>>>>    and so the runner gets to tag splits however it pleases, so long as
>>>
>>>
>>>>    those tags don't allow splits to bleed into each other.
>>>
>>>
>>>>    2. Consistent splitting - an UnboundedSource splitting seems to
>>>
>>> require
>>>
>>>>    consistent splitting if it were to "pick-up where it left", correct ?
>>>
>>> this
>>>
>>>>    is not mentioned as a requirement or a recommendation in
>>>
>>>
>>>>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only
>>>
>>> issue ?
>>>
>>>>    *Raghu Angadi* mentioned that Kafka already does so by applying
>>>
>>>
>>>>    partitions to readers in a round-robin manner.
>>>
>>>
>>>>    *Thomas Groh* also added that while the UnboundedSource API doesn't
>>>
>>>
>>>>    require deterministic splitting (although it's recommended), a
>>>
>>>
>>>>    PipelineRunner
>>>
>>>
>>>>    should keep track of the initially generated splits.
>>>
>>>
>>>>    3. Support reading of Kafka partitions that were added to topic/s
>>>
>>> while
>>>
>>>>    a Pipeline reads from them - BEAM-727
>>>
>>>
>>>>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>>>
>>>
>>>>    4. Reading/persisting Kafka start offsets - since Spark works in
>>>
>>>
>>>>    micro-batches, if "latest" was applied on a fairly sparse topic each
>>>
>>> worker
>>>
>>>>    would actually begin reading only after it saw a message during the
>>>
>>> time
>>>
>>>>    window it had to read messages. This is because fetching the offsets
>>>
>>> is
>>>
>>>>    done by the worker running the Reader. This means that each Reader
>>>
>>> sees a
>>>
>>>>    different state of "latest" (for his partition/s), such that a
>>>> failing
>>>
>>>
>>>>    Reader that hasn't read yet might fetch a different "latest" once
>>>> it's
>>>
>>>
>>>>    recovered then what it originally fetched. While this may not be as
>>>
>>> painful
>>>
>>>>    for other runners, IMHO it lacks correctness and I'd suggest either
>>>
>>> reading
>>>
>>>>    Kafka metadata of the Kafka cluster once upon initial splitting, or
>>>
>>> add
>>>
>>>>    some of it to the CheckpointMark. Filed BEAM-704
>>>
>>>
>>>>    <https://issues.apache.org/jira/browse/BEAM-704>.
>>>
>>>
>>>>
>>>
>>>> The original thread is called "Should UnboundedSource provide a split
>>>
>>>
>>>> identifier ?".
>>>
>>>
>>>>
>>>
>>>> While the only specific implementation of UnboundedSource discussed here
>>>
>>> is
>>>
>>>> Kafka, it is probably the most popular open-source UnboundedSource.
>>>
>>> Having
>>>
>>>> said that, I wonder where this meets PubSub ? or any other
>>>
>>> UnboundedSource
>>>
>>>> that those questions might affect.
>>>
>>>
>>>>
>>>
>>>> Thanks,
>>>
>>>
>>>> Amit
>>>
>>>
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> Jean-Baptiste Onofré
>>>
>>> [email protected]
>>>
>>> http://blog.nanthrax.net
>>>
>>> Talend - http://www.talend.com
>>>
>>>
>>
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Reply via email to