I started a thread about (suggesting) UnboundedSource splitId's and it turned into an UnboundedSource/KafkaIO discussion, and I think it's best to start over in a clear [DISCUSS] thread.
When working on UnboundedSource support for the Spark runner, I've raised some questions, some were general-UnboundedSource, and others Kafka-specific. I'd like to recap them here, and maybe have a more productive and well-documented discussion for everyone. 1. UnboundedSource id's - I assume any runner persists the UnboundedSources's CheckpointMark for fault-tolerance, but I wonder how it matches the appropriate split (of the UnboundedSource) to it's previously persisted CheckpointMark in any specific worker ? *Thomas Groh* mentioned that Source splits have to have an associated identifier, and so the runner gets to tag splits however it pleases, so long as those tags don't allow splits to bleed into each other. 2. Consistent splitting - an UnboundedSource splitting seems to require consistent splitting if it were to "pick-up where it left", correct ? this is not mentioned as a requirement or a recommendation in UnboundedSource#generateInitialSplits(), so is this a Kafka-only issue ? *Raghu Angadi* mentioned that Kafka already does so by applying partitions to readers in a round-robin manner. *Thomas Groh* also added that while the UnboundedSource API doesn't require deterministic splitting (although it's recommended), a PipelineRunner should keep track of the initially generated splits. 3. Support reading of Kafka partitions that were added to topic/s while a Pipeline reads from them - BEAM-727 <https://issues.apache.org/jira/browse/BEAM-727> was filed. 4. Reading/persisting Kafka start offsets - since Spark works in micro-batches, if "latest" was applied on a fairly sparse topic each worker would actually begin reading only after it saw a message during the time window it had to read messages. This is because fetching the offsets is done by the worker running the Reader. This means that each Reader sees a different state of "latest" (for his partition/s), such that a failing Reader that hasn't read yet might fetch a different "latest" once it's recovered then what it originally fetched. While this may not be as painful for other runners, IMHO it lacks correctness and I'd suggest either reading Kafka metadata of the Kafka cluster once upon initial splitting, or add some of it to the CheckpointMark. Filed BEAM-704 <https://issues.apache.org/jira/browse/BEAM-704>. The original thread is called "Should UnboundedSource provide a split identifier ?". While the only specific implementation of UnboundedSource discussed here is Kafka, it is probably the most popular open-source UnboundedSource. Having said that, I wonder where this meets PubSub ? or any other UnboundedSource that those questions might affect. Thanks, Amit
