I'm not sure if I've understood what the problem is - from what I can tell it's about associating UnboundedSource splits with Checkpoints in order to get consistent behavior from the sources. If I'm wrong, the following isn't really relevant to your problem - it's about the expected behavior of a runner interacting with any split of a Source.
In the absence of updates, the evaluation of a split of UnboundedSource must to obey the general contract for UnboundedSource, which is that createReader(PipelineOptions, CheckpointMarkT) will only ever be called with a Checkpoint Mark that was generated by an UnboundedReader that was created from the source - i.e., a Source creates Readers and is provided only checkpoints from those readers it creates. Each Source instance (split and top-level) should be independent of all other instances. A split of a Source should generally be indistinguishable from a top-level source (it will just have slightly different configuration). Generally this means that Source splits have to have an associated identifier, but these identifiers are arbitrary and not relevant to the actual evaluation of the Source - so the runner gets to tag splits however it pleases, so long as those tags don't allow splits to bleed into each other. Could you instead store the Source paired with some (arbitrary and unique) key and pull out the checkpoint using the key (or even just store the keys and store the source with the checkpoint)? That way you always will keep the same association between Source and Checkpoint. Flink does something like this where they store the serialized source alongside the CheckpointMark so they're never separated ( https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L164 and https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L334 ) On Mon, Sep 12, 2016 at 11:40 AM, Amit Sela <[email protected]> wrote: > If this issue doesn't make sense for "native" streaming systems, and it's > only a Spark issue (and my implementation of Read.Unbounded) - I could keep > doing what I do, use a running id. > I was just wondering... ( hence the question mark in the title ;-) ) > > On Mon, Sep 12, 2016 at 9:31 PM Amit Sela <[email protected]> wrote: > > > Not sure how it works in Dataflow or Flink, but I'm working on an > > implementation for Spark using the (almost) only stateful operator it > has - > > "mapWithState" - and the State needs to correspond to a key. > > Each micro-batch, the Sources recreate the readers and "look-up" the > > latest checkpoint. > > > > On Mon, Sep 12, 2016 at 9:15 PM Raghu Angadi <[email protected] > > > > wrote: > > > >> On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela <[email protected]> wrote: > >> > >> > @Raghu, hashing <topic, partition> is exactly what I mean, but I'm > >> asking > >> > if it should be abstracted in the Source.. Otherwise, runners will > have > >> to > >> > *is instance of* on every Source, and write their own hash > >> implementation. > >> > Since splits contain the "splitted" Source, and it contains it's own > >> > CheckpointMark, and hashing would probably be tied to that > >> CheckpointMark, > >> > why not abstract it in the UnboundedSource ? > >> > > >> > >> I don't quite follow how a runner should be concerned about hashing used > >> by > >> a Source for its own splits. Can you give a concrete example? To me it > >> looks like source and checkpoint objects are completely opaque to the > >> runners. > >> > >> > >> > On Wed, Sep 7, 2016 at 3:02 AM Raghu Angadi > <[email protected] > >> > > >> > wrote: > >> > > >> > > > If splits (UnboundedSources) had an identifier, this could be > >> avoided, > >> > > and checkpoints could be persisted accordingly. > >> > > > >> > > The order of the splits that a source returns is preserved. So > during > >> an > >> > > update, you can expect 5th split gets invoked with the same > checkpoint > >> > mark > >> > > that 5th split saved before upgrade. You can hash <topic, partition> > >> to > >> > one > >> > > of the indices. > >> > > > >> > > KafkaIO.java doe not support change in partitions. > >> > > > >> > > On Tue, Sep 6, 2016 at 4:04 PM, Eugene Kirpichov < > >> > > [email protected]> wrote: > >> > > > >> > > > Oh! Okay, looks like this is a part of the code I was unfamiliar > >> with. > >> > > I'd > >> > > > like to know the answer too, in this case. > >> > > > +Daniel Mills <[email protected]> can you comment ? > >> > > > > >> > > > On Tue, Sep 6, 2016 at 3:32 PM Amit Sela <[email protected]> > >> wrote: > >> > > > > >> > > > > That is correct, as long as non of the Kafka topics "grow" > another > >> > > > > partition (which it could). > >> > > > > In that case, some bundle will have to start reading from this > >> > > partition > >> > > > as > >> > > > > well, and since all other bundles already have a "previous > >> > checkpoint" > >> > > it > >> > > > > matters which checkpoint to relate to. I don't know how it's > >> > > implemented > >> > > > in > >> > > > > Dataflow, but in Spark I'm testing using mapWithState to store > the > >> > > > > checkpoints per split. > >> > > > > It also seems that order does matter to the KafkIO: > >> > > > > > >> > > > > https://github.com/apache/incubator-beam/blob/master/ > >> > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/ > >> > > > kafka/KafkaIO.java#L636 > >> > > > > > >> > > > > On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov > >> > > > > <[email protected]> wrote: > >> > > > > > >> > > > > > Hi Amit, > >> > > > > > Could you explain more about why you're saying the order of > >> splits > >> > > > > matters? > >> > > > > > AFAIK the semantics of Read.Unbounded is "read from all of the > >> > splits > >> > > > in > >> > > > > > parallel, checkpointing each of them independently", so their > >> order > >> > > > > > shouldn't matter. > >> > > > > > > >> > > > > > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela < > [email protected]> > >> > > wrote: > >> > > > > > > >> > > > > > > UnboundedSources generate initial splits, which are > basically > >> > > splits > >> > > > of > >> > > > > > > them selves - for example, if an UnboundedKafkaSource is set > >> to > >> > > read > >> > > > > from > >> > > > > > > topic T1 which is made of 2 partitions P1 and P2, it will > >> > > (optimally) > >> > > > > > split > >> > > > > > > into two UnboundedKafkaSource, one per partition. > >> > > > > > > During the lifecycle of the "reader" bundles, > CheckpointMarks > >> are > >> > > > used > >> > > > > to > >> > > > > > > checkpoint "last-read" and so readers may restart/resume. > I'm > >> > > > assuming > >> > > > > > this > >> > > > > > > is how newly created partitions will automatically be added > to > >> > > > readers. > >> > > > > > > > >> > > > > > > The problem is that it's all fine while there is only one > >> topic > >> > > > (Kafka > >> > > > > > > topic-partition pairs are ordered), but if reading from more > >> then > >> > > one > >> > > > > > topic > >> > > > > > > this may break: > >> > > > > > > T1,P1 > >> > > > > > > T1,P2 > >> > > > > > > T1,P3 > >> > > > > > > T2,P1 > >> > > > > > > The order is not maintained and T2,P1 is 4th now. > >> > > > > > > > >> > > > > > > If splits (UnboundedSources) had an identifier, this could > be > >> > > > avoided, > >> > > > > > and > >> > > > > > > checkpoints could be persisted accordingly. > >> > > > > > > For example, an UnboundedKafkaSource could use the hash code > >> of > >> > > it's > >> > > > > > > assigned topic-partition pairs. > >> > > > > > > > >> > > > > > > I don't know how relevant this is to other Sources, but I > >> guess > >> > it > >> > > is > >> > > > > as > >> > > > > > > long as they may grow their partitions dynamically (though I > >> > might > >> > > be > >> > > > > > > completely wrong...) and I don't see much of a downside. > >> > > > > > > > >> > > > > > > Thoughts ? > >> > > > > > > > >> > > > > > > This is something that troubles me now while working on > >> > > > Read.Unbounded, > >> > > > > > and > >> > > > > > > from a quick look I saw that the FlinkRunner expects > "stable" > >> > > > splitting > >> > > > > > as > >> > > > > > > well.. How does Dataflow handle this ? > >> > > > > > > > >> > > > > > > Thanks, > >> > > > > > > Amit > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >
