Thanks Amit! A little bit inline.
On Fri, Oct 7, 2016 at 4:55 PM, Amit Sela <[email protected]> wrote: > I started a thread about (suggesting) UnboundedSource splitId's and it > turned into an UnboundedSource/KafkaIO discussion, and I think it's best to > start over in a clear [DISCUSS] thread. > > When working on UnboundedSource support for the Spark runner, I've raised > some questions, some were general-UnboundedSource, and others > Kafka-specific. > > I'd like to recap them here, and maybe have a more productive and > well-documented discussion for everyone. > > 1. UnboundedSource id's - I assume any runner persists the > UnboundedSources's CheckpointMark for fault-tolerance, but I wonder how > it > matches the appropriate split (of the UnboundedSource) to it's > previously > persisted CheckpointMark in any specific worker ? > *Thomas Groh* mentioned that Source splits have to have an > associated identifier, > and so the runner gets to tag splits however it pleases, so long as > those tags don't allow splits to bleed into each other. > 2. Consistent splitting - an UnboundedSource splitting seems to require > consistent splitting if it were to "pick-up where it left", correct ? > this > is not mentioned as a requirement or a recommendation in > UnboundedSource#generateInitialSplits(), so is this a Kafka-only issue > ? > *Raghu Angadi* mentioned that Kafka already does so by applying > partitions to readers in a round-robin manner. > *Thomas Groh* also added that while the UnboundedSource API doesn't > require deterministic splitting (although it's recommended), a > PipelineRunner > should keep track of the initially generated splits. > The usual approach (I think this answers both 1 & 2) is that a runner persists the actual original source (which is required to be serializable) along with the checkpoints (which are required to be Codeable). This way when the runner wants to resume from a checkpoint, it can use that same source -- and thus splitting need not be deterministic. It sounded like "pick up where it left off" is reminiscent of Dataflow's pipeline update. That indeed requires UnboundedSource#generateInitialSplits() to be deterministic, if sources have configuration information, because we may try to link up new sources with old checkpoints and we need to do so correctly. But of course this is not super-awesome, because it's hard to make deterministic functions. Especially if, e.g., the Kafka partitioning has changed in the meantime. > 3. Support reading of Kafka partitions that were added to topic/s while > a Pipeline reads from them - BEAM-727 > <https://issues.apache.org/jira/browse/BEAM-727> was filed. > See above :). Tough, but useful. > 4. Reading/persisting Kafka start offsets - since Spark works in > micro-batches, if "latest" was applied on a fairly sparse topic each > worker > would actually begin reading only after it saw a message during the time > window it had to read messages. This is because fetching the offsets is > done by the worker running the Reader. This means that each Reader sees > a > different state of "latest" (for his partition/s), such that a failing > Reader that hasn't read yet might fetch a different "latest" once it's > recovered then what it originally fetched. While this may not be as > painful > for other runners, IMHO it lacks correctness and I'd suggest either > reading > Kafka metadata of the Kafka cluster once upon initial splitting, or add > some of it to the CheckpointMark. Filed BEAM-704 > <https://issues.apache.org/jira/browse/BEAM-704>. > +1. This is a great point. The notion that a runner may stop a reader and resume it from a checkpoint frequently is definitely part of the Beam model -- right now Spark and Direct runners (at least) do it very often. The current behavior is definitely, if not broken... unexpected. Both proposed solutions make sense to me -- either log the last offset for all partitions during splitting, or simply log the previous offset in the checkpoint mark when we start reading for the first time. > > The original thread is called "Should UnboundedSource provide a split > identifier ?". > > While the only specific implementation of UnboundedSource discussed here is > Kafka, it is probably the most popular open-source UnboundedSource. Having > said that, I wonder where this meets PubSub ? or any other UnboundedSource > that those questions might affect. > FWIW, Google Cloud Pub/Sub sources do not have state. Rather than connect to specific partitions, they just say "hey give me some messages". So the splitting is easy and deterministic for free. [link <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java#L1043> ] > > Thanks, > Amit >
