Hi Amit,

For 1., the runner is responsible of the checkpoint storage (associated with the source). It's the way for the runner to retry and know the failed bundles.

For 4, are you proposing that KafkaRecord store additional metadata for that ? It sounds like what I proposed in the "Technical Vision" appendix document: there I proposed to introduce a DataXchange object that store some additional metadata (like offset) used by the runner. It would be the same with SDF as the tracker state should be persistent as well.

Regards
JB

On 10/08/2016 01:55 AM, Amit Sela wrote:
I started a thread about (suggesting) UnboundedSource splitId's and it
turned into an UnboundedSource/KafkaIO discussion, and I think it's best to
start over in a clear [DISCUSS] thread.

When working on UnboundedSource support for the Spark runner, I've raised
some questions, some were general-UnboundedSource, and others
Kafka-specific.

I'd like to recap them here, and maybe have a more productive and
well-documented discussion for everyone.

   1. UnboundedSource id's - I assume any runner persists the
   UnboundedSources's CheckpointMark for fault-tolerance, but I wonder how it
   matches the appropriate split (of the UnboundedSource) to it's previously
   persisted CheckpointMark in any specific worker ?
   *Thomas Groh* mentioned that Source splits have to have an
associated identifier,
   and so the runner gets to tag splits however it pleases, so long as
   those tags don't allow splits to bleed into each other.
   2. Consistent splitting - an UnboundedSource splitting seems to require
   consistent splitting if it were to "pick-up where it left", correct ? this
   is not mentioned as a requirement or a recommendation in
   UnboundedSource#generateInitialSplits(), so is this a Kafka-only issue ?
   *Raghu Angadi* mentioned that Kafka already does so by applying
   partitions to readers in a round-robin manner.
   *Thomas Groh* also added that while the UnboundedSource API doesn't
   require deterministic splitting (although it's recommended), a
   PipelineRunner
   should keep track of the initially generated splits.
   3. Support reading of Kafka partitions that were added to topic/s while
   a Pipeline reads from them - BEAM-727
   <https://issues.apache.org/jira/browse/BEAM-727> was filed.
   4. Reading/persisting Kafka start offsets - since Spark works in
   micro-batches, if "latest" was applied on a fairly sparse topic each worker
   would actually begin reading only after it saw a message during the time
   window it had to read messages. This is because fetching the offsets is
   done by the worker running the Reader. This means that each Reader sees a
   different state of "latest" (for his partition/s), such that a failing
   Reader that hasn't read yet might fetch a different "latest" once it's
   recovered then what it originally fetched. While this may not be as painful
   for other runners, IMHO it lacks correctness and I'd suggest either reading
   Kafka metadata of the Kafka cluster once upon initial splitting, or add
   some of it to the CheckpointMark. Filed BEAM-704
   <https://issues.apache.org/jira/browse/BEAM-704>.

The original thread is called "Should UnboundedSource provide a split
identifier ?".

While the only specific implementation of UnboundedSource discussed here is
Kafka, it is probably the most popular open-source UnboundedSource. Having
said that, I wonder where this meets PubSub ? or any other UnboundedSource
that those questions might affect.

Thanks,
Amit


--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to