Hi Max,

thanks for the explanation and it makes lot of sense.

Not sure it will be so simple to store a previous state from one micro-batch to another. Let me take a look with Amit.

Regards
JB

On 10/10/2016 03:02 PM, Maximilian Michels wrote:
Just to add a comment from the Flink side and its
UnboundedSourceWrapper. We experienced the only way to guarantee
deterministic splitting of the source, was to generate the splits upon
creation of the source and then checkpoint the assignment during
runtime. When restoring from a checkpoint, the same reader
configuration is restored. It's not possible to change the splitting
after the initial splitting has taken place. However, Flink will soon
be able to repartition the operator state upon restart/rescaling of a
job.

Does Spark have a way to pass state of a previous mini batch to the
current mini batch? If so, you could restore the last configuration
and continue reading from the checkpointed offset. You just have to
checkpoint before the mini batch ends.

-Max

On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <[email protected]> 
wrote:
Hi Amit,

thanks for the explanation.

For 4, you are right, it's slightly different from DataXchange (related to
the elements in the PCollection). I think storing the "starting point" for a
reader makes sense.

Regards
JB


On 10/10/2016 10:33 AM, Amit Sela wrote:

Inline, thanks JB!

On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <[email protected]>
wrote:

Hi Amit,



For 1., the runner is responsible of the checkpoint storage (associated

with the source). It's the way for the runner to retry and know the

failed bundles.

True, this was a recap/summary of another, not-so-clear, thread.




For 4, are you proposing that KafkaRecord store additional metadata for

that ? It sounds like what I proposed in the "Technical Vision" appendix

document: there I proposed to introduce a DataXchange object that store

some additional metadata (like offset) used by the runner. It would be

the same with SDF as the tracker state should be persistent as well.

I think I was more focused on persisting the "starting point" for a
reader,
even if no records were read (yet), so that the next time the reader
attempts to read it will pick of there. This has more to do with how the
CheckpointMark handles this.
I have to say that I'm not familiar with your DataXchange proposal, I will
take a look though.




Regards

JB



On 10/08/2016 01:55 AM, Amit Sela wrote:

I started a thread about (suggesting) UnboundedSource splitId's and it


turned into an UnboundedSource/KafkaIO discussion, and I think it's best

to

start over in a clear [DISCUSS] thread.




When working on UnboundedSource support for the Spark runner, I've
raised


some questions, some were general-UnboundedSource, and others


Kafka-specific.




I'd like to recap them here, and maybe have a more productive and


well-documented discussion for everyone.




   1. UnboundedSource id's - I assume any runner persists the


   UnboundedSources's CheckpointMark for fault-tolerance, but I wonder

how it

   matches the appropriate split (of the UnboundedSource) to it's

previously

   persisted CheckpointMark in any specific worker ?


   *Thomas Groh* mentioned that Source splits have to have an


associated identifier,


   and so the runner gets to tag splits however it pleases, so long as


   those tags don't allow splits to bleed into each other.


   2. Consistent splitting - an UnboundedSource splitting seems to

require

   consistent splitting if it were to "pick-up where it left", correct ?

this

   is not mentioned as a requirement or a recommendation in


   UnboundedSource#generateInitialSplits(), so is this a Kafka-only

issue ?

   *Raghu Angadi* mentioned that Kafka already does so by applying


   partitions to readers in a round-robin manner.


   *Thomas Groh* also added that while the UnboundedSource API doesn't


   require deterministic splitting (although it's recommended), a


   PipelineRunner


   should keep track of the initially generated splits.


   3. Support reading of Kafka partitions that were added to topic/s

while

   a Pipeline reads from them - BEAM-727


   <https://issues.apache.org/jira/browse/BEAM-727> was filed.


   4. Reading/persisting Kafka start offsets - since Spark works in


   micro-batches, if "latest" was applied on a fairly sparse topic each

worker

   would actually begin reading only after it saw a message during the

time

   window it had to read messages. This is because fetching the offsets

is

   done by the worker running the Reader. This means that each Reader

sees a

   different state of "latest" (for his partition/s), such that a
failing


   Reader that hasn't read yet might fetch a different "latest" once
it's


   recovered then what it originally fetched. While this may not be as

painful

   for other runners, IMHO it lacks correctness and I'd suggest either

reading

   Kafka metadata of the Kafka cluster once upon initial splitting, or

add

   some of it to the CheckpointMark. Filed BEAM-704


   <https://issues.apache.org/jira/browse/BEAM-704>.




The original thread is called "Should UnboundedSource provide a split


identifier ?".




While the only specific implementation of UnboundedSource discussed here

is

Kafka, it is probably the most popular open-source UnboundedSource.

Having

said that, I wonder where this meets PubSub ? or any other

UnboundedSource

that those questions might affect.




Thanks,


Amit






--

Jean-Baptiste Onofré

[email protected]

http://blog.nanthrax.net

Talend - http://www.talend.com




--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to