Yes. We don't currently permit adding (or modifying) partitions in Kafka while a Pipeline is running (without updates). Our understanding was that this was a rare occurrence, but it's not impossible to support.
For generateInitialSplits, the UnboundedSource API doesn't require deterministic splitting (although it's recommended), and a PipelineRunner should keep track of the initially generated splits. On Tue, Sep 13, 2016 at 1:49 AM, Amit Sela <[email protected]> wrote: > If I understand correctly this will break > https://github.com/apache/incubator-beam/blob/master/sdks/ > java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L857 > in > KafkaIO. > > So it's a KafkaIO limitation (for now ?) ? > > On Tue, Sep 13, 2016 at 11:31 AM Amit Sela <[email protected]> wrote: > > > Thanks Thomas, you did understand correct. > > Doing this, or assigning a running id, is basically the same, as long as > > generateInitialSplits implementation is deterministic (KafkaIO actually > > notes this). > > > > So what if partitions were added at runtime to one (or more) of the > topics > > I'm consuming from ? > > > > On Tue, Sep 13, 2016 at 3:51 AM Thomas Groh <[email protected]> > > wrote: > > > >> I'm not sure if I've understood what the problem is - from what I can > tell > >> it's about associating UnboundedSource splits with Checkpoints in order > to > >> get consistent behavior from the sources. If I'm wrong, the following > >> isn't > >> really relevant to your problem - it's about the expected behavior of a > >> runner interacting with any split of a Source. > >> > >> In the absence of updates, the evaluation of a split of UnboundedSource > >> must to obey the general contract for UnboundedSource, which is that > >> createReader(PipelineOptions, CheckpointMarkT) will only ever be called > >> with a Checkpoint Mark that was generated by an UnboundedReader that was > >> created from the source - i.e., a Source creates Readers and is provided > >> only checkpoints from those readers it creates. Each Source instance > >> (split > >> and top-level) should be independent of all other instances. A split of > a > >> Source should generally be indistinguishable from a top-level source (it > >> will just have slightly different configuration). > >> > >> Generally this means that Source splits have to have an associated > >> identifier, but these identifiers are arbitrary and not relevant to the > >> actual evaluation of the Source - so the runner gets to tag splits > however > >> it pleases, so long as those tags don't allow splits to bleed into each > >> other. > >> > >> Could you instead store the Source paired with some (arbitrary and > unique) > >> key and pull out the checkpoint using the key (or even just store the > keys > >> and store the source with the checkpoint)? That way you always will keep > >> the same association between Source and Checkpoint. Flink does something > >> like this where they store the serialized source alongside the > >> CheckpointMark so they're never separated ( > >> > >> https://github.com/apache/incubator-beam/blob/master/runners > /flink/runner/src/main/java/org/apache/beam/runners/flink/ > translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L164 > >> and > >> > >> https://github.com/apache/incubator-beam/blob/master/runners > /flink/runner/src/main/java/org/apache/beam/runners/flink/ > translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L334 > >> ) > >> > >> On Mon, Sep 12, 2016 at 11:40 AM, Amit Sela <[email protected]> > wrote: > >> > >> > If this issue doesn't make sense for "native" streaming systems, and > >> it's > >> > only a Spark issue (and my implementation of Read.Unbounded) - I could > >> keep > >> > doing what I do, use a running id. > >> > I was just wondering... ( hence the question mark in the title ;-) ) > >> > > >> > On Mon, Sep 12, 2016 at 9:31 PM Amit Sela <[email protected]> > wrote: > >> > > >> > > Not sure how it works in Dataflow or Flink, but I'm working on an > >> > > implementation for Spark using the (almost) only stateful operator > it > >> > has - > >> > > "mapWithState" - and the State needs to correspond to a key. > >> > > Each micro-batch, the Sources recreate the readers and "look-up" the > >> > > latest checkpoint. > >> > > > >> > > On Mon, Sep 12, 2016 at 9:15 PM Raghu Angadi > >> <[email protected] > >> > > > >> > > wrote: > >> > > > >> > >> On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela <[email protected]> > >> wrote: > >> > >> > >> > >> > @Raghu, hashing <topic, partition> is exactly what I mean, but > I'm > >> > >> asking > >> > >> > if it should be abstracted in the Source.. Otherwise, runners > will > >> > have > >> > >> to > >> > >> > *is instance of* on every Source, and write their own hash > >> > >> implementation. > >> > >> > Since splits contain the "splitted" Source, and it contains it's > >> own > >> > >> > CheckpointMark, and hashing would probably be tied to that > >> > >> CheckpointMark, > >> > >> > why not abstract it in the UnboundedSource ? > >> > >> > > >> > >> > >> > >> I don't quite follow how a runner should be concerned about hashing > >> used > >> > >> by > >> > >> a Source for its own splits. Can you give a concrete example? To me > >> it > >> > >> looks like source and checkpoint objects are completely opaque to > the > >> > >> runners. > >> > >> > >> > >> > >> > >> > On Wed, Sep 7, 2016 at 3:02 AM Raghu Angadi > >> > <[email protected] > >> > >> > > >> > >> > wrote: > >> > >> > > >> > >> > > > If splits (UnboundedSources) had an identifier, this could be > >> > >> avoided, > >> > >> > > and checkpoints could be persisted accordingly. > >> > >> > > > >> > >> > > The order of the splits that a source returns is preserved. So > >> > during > >> > >> an > >> > >> > > update, you can expect 5th split gets invoked with the same > >> > checkpoint > >> > >> > mark > >> > >> > > that 5th split saved before upgrade. You can hash <topic, > >> partition> > >> > >> to > >> > >> > one > >> > >> > > of the indices. > >> > >> > > > >> > >> > > KafkaIO.java doe not support change in partitions. > >> > >> > > > >> > >> > > On Tue, Sep 6, 2016 at 4:04 PM, Eugene Kirpichov < > >> > >> > > [email protected]> wrote: > >> > >> > > > >> > >> > > > Oh! Okay, looks like this is a part of the code I was > >> unfamiliar > >> > >> with. > >> > >> > > I'd > >> > >> > > > like to know the answer too, in this case. > >> > >> > > > +Daniel Mills <[email protected]> can you comment ? > >> > >> > > > > >> > >> > > > On Tue, Sep 6, 2016 at 3:32 PM Amit Sela < > [email protected] > >> > > >> > >> wrote: > >> > >> > > > > >> > >> > > > > That is correct, as long as non of the Kafka topics "grow" > >> > another > >> > >> > > > > partition (which it could). > >> > >> > > > > In that case, some bundle will have to start reading from > >> this > >> > >> > > partition > >> > >> > > > as > >> > >> > > > > well, and since all other bundles already have a "previous > >> > >> > checkpoint" > >> > >> > > it > >> > >> > > > > matters which checkpoint to relate to. I don't know how > it's > >> > >> > > implemented > >> > >> > > > in > >> > >> > > > > Dataflow, but in Spark I'm testing using mapWithState to > >> store > >> > the > >> > >> > > > > checkpoints per split. > >> > >> > > > > It also seems that order does matter to the KafkIO: > >> > >> > > > > > >> > >> > > > > https://github.com/apache/incubator-beam/blob/master/ > >> > >> > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/ > >> > >> > > > kafka/KafkaIO.java#L636 > >> > >> > > > > > >> > >> > > > > On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov > >> > >> > > > > <[email protected]> wrote: > >> > >> > > > > > >> > >> > > > > > Hi Amit, > >> > >> > > > > > Could you explain more about why you're saying the order > of > >> > >> splits > >> > >> > > > > matters? > >> > >> > > > > > AFAIK the semantics of Read.Unbounded is "read from all > of > >> the > >> > >> > splits > >> > >> > > > in > >> > >> > > > > > parallel, checkpointing each of them independently", so > >> their > >> > >> order > >> > >> > > > > > shouldn't matter. > >> > >> > > > > > > >> > >> > > > > > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela < > >> > [email protected]> > >> > >> > > wrote: > >> > >> > > > > > > >> > >> > > > > > > UnboundedSources generate initial splits, which are > >> > basically > >> > >> > > splits > >> > >> > > > of > >> > >> > > > > > > them selves - for example, if an UnboundedKafkaSource > is > >> set > >> > >> to > >> > >> > > read > >> > >> > > > > from > >> > >> > > > > > > topic T1 which is made of 2 partitions P1 and P2, it > will > >> > >> > > (optimally) > >> > >> > > > > > split > >> > >> > > > > > > into two UnboundedKafkaSource, one per partition. > >> > >> > > > > > > During the lifecycle of the "reader" bundles, > >> > CheckpointMarks > >> > >> are > >> > >> > > > used > >> > >> > > > > to > >> > >> > > > > > > checkpoint "last-read" and so readers may > restart/resume. > >> > I'm > >> > >> > > > assuming > >> > >> > > > > > this > >> > >> > > > > > > is how newly created partitions will automatically be > >> added > >> > to > >> > >> > > > readers. > >> > >> > > > > > > > >> > >> > > > > > > The problem is that it's all fine while there is only > one > >> > >> topic > >> > >> > > > (Kafka > >> > >> > > > > > > topic-partition pairs are ordered), but if reading from > >> more > >> > >> then > >> > >> > > one > >> > >> > > > > > topic > >> > >> > > > > > > this may break: > >> > >> > > > > > > T1,P1 > >> > >> > > > > > > T1,P2 > >> > >> > > > > > > T1,P3 > >> > >> > > > > > > T2,P1 > >> > >> > > > > > > The order is not maintained and T2,P1 is 4th now. > >> > >> > > > > > > > >> > >> > > > > > > If splits (UnboundedSources) had an identifier, this > >> could > >> > be > >> > >> > > > avoided, > >> > >> > > > > > and > >> > >> > > > > > > checkpoints could be persisted accordingly. > >> > >> > > > > > > For example, an UnboundedKafkaSource could use the hash > >> code > >> > >> of > >> > >> > > it's > >> > >> > > > > > > assigned topic-partition pairs. > >> > >> > > > > > > > >> > >> > > > > > > I don't know how relevant this is to other Sources, > but I > >> > >> guess > >> > >> > it > >> > >> > > is > >> > >> > > > > as > >> > >> > > > > > > long as they may grow their partitions dynamically > >> (though I > >> > >> > might > >> > >> > > be > >> > >> > > > > > > completely wrong...) and I don't see much of a > downside. > >> > >> > > > > > > > >> > >> > > > > > > Thoughts ? > >> > >> > > > > > > > >> > >> > > > > > > This is something that troubles me now while working on > >> > >> > > > Read.Unbounded, > >> > >> > > > > > and > >> > >> > > > > > > from a quick look I saw that the FlinkRunner expects > >> > "stable" > >> > >> > > > splitting > >> > >> > > > > > as > >> > >> > > > > > > well.. How does Dataflow handle this ? > >> > >> > > > > > > > >> > >> > > > > > > Thanks, > >> > >> > > > > > > Amit > >> > >> > > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > > > >> > > >> > > >
