On Tue, Sep 13, 2016 at 7:30 PM Thomas Groh <[email protected]> wrote:
> Yes. We don't currently permit adding (or modifying) partitions in Kafka > while a Pipeline is running (without updates). Our understanding was that > this was a rare occurrence, but it's not impossible to support. > Don't know how rare it is, but I know that Kafka supports adding partitions, and Spark can handle this while reading from Kafka. > > For generateInitialSplits, the UnboundedSource API doesn't require > deterministic splitting (although it's recommended), and a PipelineRunner > should keep track of the initially generated splits. > If the splitting were to be consistent, in such way that newly added partitions would be assigned with a new "splitId" while existing ones would still be assigned with the same (consistent) splitId, it could support newly added partitions, no ? > > On Tue, Sep 13, 2016 at 1:49 AM, Amit Sela <[email protected]> wrote: > > > If I understand correctly this will break > > https://github.com/apache/incubator-beam/blob/master/sdks/ > > > java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L857 > > in > > KafkaIO. > > > > So it's a KafkaIO limitation (for now ?) ? > > > > On Tue, Sep 13, 2016 at 11:31 AM Amit Sela <[email protected]> wrote: > > > > > Thanks Thomas, you did understand correct. > > > Doing this, or assigning a running id, is basically the same, as long > as > > > generateInitialSplits implementation is deterministic (KafkaIO actually > > > notes this). > > > > > > So what if partitions were added at runtime to one (or more) of the > > topics > > > I'm consuming from ? > > > > > > On Tue, Sep 13, 2016 at 3:51 AM Thomas Groh <[email protected]> > > > wrote: > > > > > >> I'm not sure if I've understood what the problem is - from what I can > > tell > > >> it's about associating UnboundedSource splits with Checkpoints in > order > > to > > >> get consistent behavior from the sources. If I'm wrong, the following > > >> isn't > > >> really relevant to your problem - it's about the expected behavior of > a > > >> runner interacting with any split of a Source. > > >> > > >> In the absence of updates, the evaluation of a split of > UnboundedSource > > >> must to obey the general contract for UnboundedSource, which is that > > >> createReader(PipelineOptions, CheckpointMarkT) will only ever be > called > > >> with a Checkpoint Mark that was generated by an UnboundedReader that > was > > >> created from the source - i.e., a Source creates Readers and is > provided > > >> only checkpoints from those readers it creates. Each Source instance > > >> (split > > >> and top-level) should be independent of all other instances. A split > of > > a > > >> Source should generally be indistinguishable from a top-level source > (it > > >> will just have slightly different configuration). > > >> > > >> Generally this means that Source splits have to have an associated > > >> identifier, but these identifiers are arbitrary and not relevant to > the > > >> actual evaluation of the Source - so the runner gets to tag splits > > however > > >> it pleases, so long as those tags don't allow splits to bleed into > each > > >> other. > > >> > > >> Could you instead store the Source paired with some (arbitrary and > > unique) > > >> key and pull out the checkpoint using the key (or even just store the > > keys > > >> and store the source with the checkpoint)? That way you always will > keep > > >> the same association between Source and Checkpoint. Flink does > something > > >> like this where they store the serialized source alongside the > > >> CheckpointMark so they're never separated ( > > >> > > >> https://github.com/apache/incubator-beam/blob/master/runners > > /flink/runner/src/main/java/org/apache/beam/runners/flink/ > > translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L164 > > >> and > > >> > > >> https://github.com/apache/incubator-beam/blob/master/runners > > /flink/runner/src/main/java/org/apache/beam/runners/flink/ > > translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L334 > > >> ) > > >> > > >> On Mon, Sep 12, 2016 at 11:40 AM, Amit Sela <[email protected]> > > wrote: > > >> > > >> > If this issue doesn't make sense for "native" streaming systems, and > > >> it's > > >> > only a Spark issue (and my implementation of Read.Unbounded) - I > could > > >> keep > > >> > doing what I do, use a running id. > > >> > I was just wondering... ( hence the question mark in the title ;-) ) > > >> > > > >> > On Mon, Sep 12, 2016 at 9:31 PM Amit Sela <[email protected]> > > wrote: > > >> > > > >> > > Not sure how it works in Dataflow or Flink, but I'm working on an > > >> > > implementation for Spark using the (almost) only stateful operator > > it > > >> > has - > > >> > > "mapWithState" - and the State needs to correspond to a key. > > >> > > Each micro-batch, the Sources recreate the readers and "look-up" > the > > >> > > latest checkpoint. > > >> > > > > >> > > On Mon, Sep 12, 2016 at 9:15 PM Raghu Angadi > > >> <[email protected] > > >> > > > > >> > > wrote: > > >> > > > > >> > >> On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela <[email protected]> > > >> wrote: > > >> > >> > > >> > >> > @Raghu, hashing <topic, partition> is exactly what I mean, but > > I'm > > >> > >> asking > > >> > >> > if it should be abstracted in the Source.. Otherwise, runners > > will > > >> > have > > >> > >> to > > >> > >> > *is instance of* on every Source, and write their own hash > > >> > >> implementation. > > >> > >> > Since splits contain the "splitted" Source, and it contains > it's > > >> own > > >> > >> > CheckpointMark, and hashing would probably be tied to that > > >> > >> CheckpointMark, > > >> > >> > why not abstract it in the UnboundedSource ? > > >> > >> > > > >> > >> > > >> > >> I don't quite follow how a runner should be concerned about > hashing > > >> used > > >> > >> by > > >> > >> a Source for its own splits. Can you give a concrete example? To > me > > >> it > > >> > >> looks like source and checkpoint objects are completely opaque to > > the > > >> > >> runners. > > >> > >> > > >> > >> > > >> > >> > On Wed, Sep 7, 2016 at 3:02 AM Raghu Angadi > > >> > <[email protected] > > >> > >> > > > >> > >> > wrote: > > >> > >> > > > >> > >> > > > If splits (UnboundedSources) had an identifier, this could > be > > >> > >> avoided, > > >> > >> > > and checkpoints could be persisted accordingly. > > >> > >> > > > > >> > >> > > The order of the splits that a source returns is preserved. > So > > >> > during > > >> > >> an > > >> > >> > > update, you can expect 5th split gets invoked with the same > > >> > checkpoint > > >> > >> > mark > > >> > >> > > that 5th split saved before upgrade. You can hash <topic, > > >> partition> > > >> > >> to > > >> > >> > one > > >> > >> > > of the indices. > > >> > >> > > > > >> > >> > > KafkaIO.java doe not support change in partitions. > > >> > >> > > > > >> > >> > > On Tue, Sep 6, 2016 at 4:04 PM, Eugene Kirpichov < > > >> > >> > > [email protected]> wrote: > > >> > >> > > > > >> > >> > > > Oh! Okay, looks like this is a part of the code I was > > >> unfamiliar > > >> > >> with. > > >> > >> > > I'd > > >> > >> > > > like to know the answer too, in this case. > > >> > >> > > > +Daniel Mills <[email protected]> can you comment ? > > >> > >> > > > > > >> > >> > > > On Tue, Sep 6, 2016 at 3:32 PM Amit Sela < > > [email protected] > > >> > > > >> > >> wrote: > > >> > >> > > > > > >> > >> > > > > That is correct, as long as non of the Kafka topics > "grow" > > >> > another > > >> > >> > > > > partition (which it could). > > >> > >> > > > > In that case, some bundle will have to start reading from > > >> this > > >> > >> > > partition > > >> > >> > > > as > > >> > >> > > > > well, and since all other bundles already have a > "previous > > >> > >> > checkpoint" > > >> > >> > > it > > >> > >> > > > > matters which checkpoint to relate to. I don't know how > > it's > > >> > >> > > implemented > > >> > >> > > > in > > >> > >> > > > > Dataflow, but in Spark I'm testing using mapWithState to > > >> store > > >> > the > > >> > >> > > > > checkpoints per split. > > >> > >> > > > > It also seems that order does matter to the KafkIO: > > >> > >> > > > > > > >> > >> > > > > https://github.com/apache/incubator-beam/blob/master/ > > >> > >> > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/ > > >> > >> > > > kafka/KafkaIO.java#L636 > > >> > >> > > > > > > >> > >> > > > > On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov > > >> > >> > > > > <[email protected]> wrote: > > >> > >> > > > > > > >> > >> > > > > > Hi Amit, > > >> > >> > > > > > Could you explain more about why you're saying the > order > > of > > >> > >> splits > > >> > >> > > > > matters? > > >> > >> > > > > > AFAIK the semantics of Read.Unbounded is "read from all > > of > > >> the > > >> > >> > splits > > >> > >> > > > in > > >> > >> > > > > > parallel, checkpointing each of them independently", so > > >> their > > >> > >> order > > >> > >> > > > > > shouldn't matter. > > >> > >> > > > > > > > >> > >> > > > > > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela < > > >> > [email protected]> > > >> > >> > > wrote: > > >> > >> > > > > > > > >> > >> > > > > > > UnboundedSources generate initial splits, which are > > >> > basically > > >> > >> > > splits > > >> > >> > > > of > > >> > >> > > > > > > them selves - for example, if an UnboundedKafkaSource > > is > > >> set > > >> > >> to > > >> > >> > > read > > >> > >> > > > > from > > >> > >> > > > > > > topic T1 which is made of 2 partitions P1 and P2, it > > will > > >> > >> > > (optimally) > > >> > >> > > > > > split > > >> > >> > > > > > > into two UnboundedKafkaSource, one per partition. > > >> > >> > > > > > > During the lifecycle of the "reader" bundles, > > >> > CheckpointMarks > > >> > >> are > > >> > >> > > > used > > >> > >> > > > > to > > >> > >> > > > > > > checkpoint "last-read" and so readers may > > restart/resume. > > >> > I'm > > >> > >> > > > assuming > > >> > >> > > > > > this > > >> > >> > > > > > > is how newly created partitions will automatically be > > >> added > > >> > to > > >> > >> > > > readers. > > >> > >> > > > > > > > > >> > >> > > > > > > The problem is that it's all fine while there is only > > one > > >> > >> topic > > >> > >> > > > (Kafka > > >> > >> > > > > > > topic-partition pairs are ordered), but if reading > from > > >> more > > >> > >> then > > >> > >> > > one > > >> > >> > > > > > topic > > >> > >> > > > > > > this may break: > > >> > >> > > > > > > T1,P1 > > >> > >> > > > > > > T1,P2 > > >> > >> > > > > > > T1,P3 > > >> > >> > > > > > > T2,P1 > > >> > >> > > > > > > The order is not maintained and T2,P1 is 4th now. > > >> > >> > > > > > > > > >> > >> > > > > > > If splits (UnboundedSources) had an identifier, this > > >> could > > >> > be > > >> > >> > > > avoided, > > >> > >> > > > > > and > > >> > >> > > > > > > checkpoints could be persisted accordingly. > > >> > >> > > > > > > For example, an UnboundedKafkaSource could use the > hash > > >> code > > >> > >> of > > >> > >> > > it's > > >> > >> > > > > > > assigned topic-partition pairs. > > >> > >> > > > > > > > > >> > >> > > > > > > I don't know how relevant this is to other Sources, > > but I > > >> > >> guess > > >> > >> > it > > >> > >> > > is > > >> > >> > > > > as > > >> > >> > > > > > > long as they may grow their partitions dynamically > > >> (though I > > >> > >> > might > > >> > >> > > be > > >> > >> > > > > > > completely wrong...) and I don't see much of a > > downside. > > >> > >> > > > > > > > > >> > >> > > > > > > Thoughts ? > > >> > >> > > > > > > > > >> > >> > > > > > > This is something that troubles me now while working > on > > >> > >> > > > Read.Unbounded, > > >> > >> > > > > > and > > >> > >> > > > > > > from a quick look I saw that the FlinkRunner expects > > >> > "stable" > > >> > >> > > > splitting > > >> > >> > > > > > as > > >> > >> > > > > > > well.. How does Dataflow handle this ? > > >> > >> > > > > > > > > >> > >> > > > > > > Thanks, > > >> > >> > > > > > > Amit > > >> > >> > > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > > >> > > > >> > > > > > >
