Hi Amit,

thanks for the explanation.

For 4, you are right, it's slightly different from DataXchange (related to the elements in the PCollection). I think storing the "starting point" for a reader makes sense.

Regards
JB

On 10/10/2016 10:33 AM, Amit Sela wrote:
Inline, thanks JB!

On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <[email protected]>
wrote:

Hi Amit,



For 1., the runner is responsible of the checkpoint storage (associated

with the source). It's the way for the runner to retry and know the

failed bundles.

True, this was a recap/summary of another, not-so-clear, thread.




For 4, are you proposing that KafkaRecord store additional metadata for

that ? It sounds like what I proposed in the "Technical Vision" appendix

document: there I proposed to introduce a DataXchange object that store

some additional metadata (like offset) used by the runner. It would be

the same with SDF as the tracker state should be persistent as well.

I think I was more focused on persisting the "starting point" for a reader,
even if no records were read (yet), so that the next time the reader
attempts to read it will pick of there. This has more to do with how the
CheckpointMark handles this.
I have to say that I'm not familiar with your DataXchange proposal, I will
take a look though.




Regards

JB



On 10/08/2016 01:55 AM, Amit Sela wrote:

I started a thread about (suggesting) UnboundedSource splitId's and it

turned into an UnboundedSource/KafkaIO discussion, and I think it's best
to

start over in a clear [DISCUSS] thread.



When working on UnboundedSource support for the Spark runner, I've raised

some questions, some were general-UnboundedSource, and others

Kafka-specific.



I'd like to recap them here, and maybe have a more productive and

well-documented discussion for everyone.



   1. UnboundedSource id's - I assume any runner persists the

   UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
how it

   matches the appropriate split (of the UnboundedSource) to it's
previously

   persisted CheckpointMark in any specific worker ?

   *Thomas Groh* mentioned that Source splits have to have an

associated identifier,

   and so the runner gets to tag splits however it pleases, so long as

   those tags don't allow splits to bleed into each other.

   2. Consistent splitting - an UnboundedSource splitting seems to
require

   consistent splitting if it were to "pick-up where it left", correct ?
this

   is not mentioned as a requirement or a recommendation in

   UnboundedSource#generateInitialSplits(), so is this a Kafka-only
issue ?

   *Raghu Angadi* mentioned that Kafka already does so by applying

   partitions to readers in a round-robin manner.

   *Thomas Groh* also added that while the UnboundedSource API doesn't

   require deterministic splitting (although it's recommended), a

   PipelineRunner

   should keep track of the initially generated splits.

   3. Support reading of Kafka partitions that were added to topic/s
while

   a Pipeline reads from them - BEAM-727

   <https://issues.apache.org/jira/browse/BEAM-727> was filed.

   4. Reading/persisting Kafka start offsets - since Spark works in

   micro-batches, if "latest" was applied on a fairly sparse topic each
worker

   would actually begin reading only after it saw a message during the
time

   window it had to read messages. This is because fetching the offsets
is

   done by the worker running the Reader. This means that each Reader
sees a

   different state of "latest" (for his partition/s), such that a failing

   Reader that hasn't read yet might fetch a different "latest" once it's

   recovered then what it originally fetched. While this may not be as
painful

   for other runners, IMHO it lacks correctness and I'd suggest either
reading

   Kafka metadata of the Kafka cluster once upon initial splitting, or
add

   some of it to the CheckpointMark. Filed BEAM-704

   <https://issues.apache.org/jira/browse/BEAM-704>.



The original thread is called "Should UnboundedSource provide a split

identifier ?".



While the only specific implementation of UnboundedSource discussed here
is

Kafka, it is probably the most popular open-source UnboundedSource.
Having

said that, I wonder where this meets PubSub ? or any other
UnboundedSource

that those questions might affect.



Thanks,

Amit





--

Jean-Baptiste Onofré

[email protected]

http://blog.nanthrax.net

Talend - http://www.talend.com




--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to