If this issue doesn't make sense for "native" streaming systems, and it's only a Spark issue (and my implementation of Read.Unbounded) - I could keep doing what I do, use a running id. I was just wondering... ( hence the question mark in the title ;-) )
On Mon, Sep 12, 2016 at 9:31 PM Amit Sela <[email protected]> wrote: > Not sure how it works in Dataflow or Flink, but I'm working on an > implementation for Spark using the (almost) only stateful operator it has - > "mapWithState" - and the State needs to correspond to a key. > Each micro-batch, the Sources recreate the readers and "look-up" the > latest checkpoint. > > On Mon, Sep 12, 2016 at 9:15 PM Raghu Angadi <[email protected]> > wrote: > >> On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela <[email protected]> wrote: >> >> > @Raghu, hashing <topic, partition> is exactly what I mean, but I'm >> asking >> > if it should be abstracted in the Source.. Otherwise, runners will have >> to >> > *is instance of* on every Source, and write their own hash >> implementation. >> > Since splits contain the "splitted" Source, and it contains it's own >> > CheckpointMark, and hashing would probably be tied to that >> CheckpointMark, >> > why not abstract it in the UnboundedSource ? >> > >> >> I don't quite follow how a runner should be concerned about hashing used >> by >> a Source for its own splits. Can you give a concrete example? To me it >> looks like source and checkpoint objects are completely opaque to the >> runners. >> >> >> > On Wed, Sep 7, 2016 at 3:02 AM Raghu Angadi <[email protected] >> > >> > wrote: >> > >> > > > If splits (UnboundedSources) had an identifier, this could be >> avoided, >> > > and checkpoints could be persisted accordingly. >> > > >> > > The order of the splits that a source returns is preserved. So during >> an >> > > update, you can expect 5th split gets invoked with the same checkpoint >> > mark >> > > that 5th split saved before upgrade. You can hash <topic, partition> >> to >> > one >> > > of the indices. >> > > >> > > KafkaIO.java doe not support change in partitions. >> > > >> > > On Tue, Sep 6, 2016 at 4:04 PM, Eugene Kirpichov < >> > > [email protected]> wrote: >> > > >> > > > Oh! Okay, looks like this is a part of the code I was unfamiliar >> with. >> > > I'd >> > > > like to know the answer too, in this case. >> > > > +Daniel Mills <[email protected]> can you comment ? >> > > > >> > > > On Tue, Sep 6, 2016 at 3:32 PM Amit Sela <[email protected]> >> wrote: >> > > > >> > > > > That is correct, as long as non of the Kafka topics "grow" another >> > > > > partition (which it could). >> > > > > In that case, some bundle will have to start reading from this >> > > partition >> > > > as >> > > > > well, and since all other bundles already have a "previous >> > checkpoint" >> > > it >> > > > > matters which checkpoint to relate to. I don't know how it's >> > > implemented >> > > > in >> > > > > Dataflow, but in Spark I'm testing using mapWithState to store the >> > > > > checkpoints per split. >> > > > > It also seems that order does matter to the KafkIO: >> > > > > >> > > > > https://github.com/apache/incubator-beam/blob/master/ >> > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/ >> > > > kafka/KafkaIO.java#L636 >> > > > > >> > > > > On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov >> > > > > <[email protected]> wrote: >> > > > > >> > > > > > Hi Amit, >> > > > > > Could you explain more about why you're saying the order of >> splits >> > > > > matters? >> > > > > > AFAIK the semantics of Read.Unbounded is "read from all of the >> > splits >> > > > in >> > > > > > parallel, checkpointing each of them independently", so their >> order >> > > > > > shouldn't matter. >> > > > > > >> > > > > > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela <[email protected]> >> > > wrote: >> > > > > > >> > > > > > > UnboundedSources generate initial splits, which are basically >> > > splits >> > > > of >> > > > > > > them selves - for example, if an UnboundedKafkaSource is set >> to >> > > read >> > > > > from >> > > > > > > topic T1 which is made of 2 partitions P1 and P2, it will >> > > (optimally) >> > > > > > split >> > > > > > > into two UnboundedKafkaSource, one per partition. >> > > > > > > During the lifecycle of the "reader" bundles, CheckpointMarks >> are >> > > > used >> > > > > to >> > > > > > > checkpoint "last-read" and so readers may restart/resume. I'm >> > > > assuming >> > > > > > this >> > > > > > > is how newly created partitions will automatically be added to >> > > > readers. >> > > > > > > >> > > > > > > The problem is that it's all fine while there is only one >> topic >> > > > (Kafka >> > > > > > > topic-partition pairs are ordered), but if reading from more >> then >> > > one >> > > > > > topic >> > > > > > > this may break: >> > > > > > > T1,P1 >> > > > > > > T1,P2 >> > > > > > > T1,P3 >> > > > > > > T2,P1 >> > > > > > > The order is not maintained and T2,P1 is 4th now. >> > > > > > > >> > > > > > > If splits (UnboundedSources) had an identifier, this could be >> > > > avoided, >> > > > > > and >> > > > > > > checkpoints could be persisted accordingly. >> > > > > > > For example, an UnboundedKafkaSource could use the hash code >> of >> > > it's >> > > > > > > assigned topic-partition pairs. >> > > > > > > >> > > > > > > I don't know how relevant this is to other Sources, but I >> guess >> > it >> > > is >> > > > > as >> > > > > > > long as they may grow their partitions dynamically (though I >> > might >> > > be >> > > > > > > completely wrong...) and I don't see much of a downside. >> > > > > > > >> > > > > > > Thoughts ? >> > > > > > > >> > > > > > > This is something that troubles me now while working on >> > > > Read.Unbounded, >> > > > > > and >> > > > > > > from a quick look I saw that the FlinkRunner expects "stable" >> > > > splitting >> > > > > > as >> > > > > > > well.. How does Dataflow handle this ? >> > > > > > > >> > > > > > > Thanks, >> > > > > > > Amit >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >
