Yes. We don't currently permit adding (or modifying) partitions in Kafka
while a Pipeline is running (without updates). Our understanding was that
this was a rare occurrence, but it's not impossible to support.

For generateInitialSplits, the UnboundedSource API doesn't require
deterministic splitting (although it's recommended), and a PipelineRunner
should keep track of the initially generated splits.

On Tue, Sep 13, 2016 at 1:49 AM, Amit Sela <[email protected]> wrote:

> If I understand correctly this will break
> https://github.com/apache/incubator-beam/blob/master/sdks/
> java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L857
> in
> KafkaIO.
>
> So it's a KafkaIO limitation (for now ?) ?
>
> On Tue, Sep 13, 2016 at 11:31 AM Amit Sela <[email protected]> wrote:
>
> > Thanks Thomas, you did understand correct.
> > Doing this, or assigning a running id, is basically the same, as long as
> > generateInitialSplits implementation is deterministic (KafkaIO actually
> > notes this).
> >
> > So what if partitions were added at runtime to one (or more) of the
> topics
> > I'm consuming from ?
> >
> > On Tue, Sep 13, 2016 at 3:51 AM Thomas Groh <[email protected]>
> > wrote:
> >
> >> I'm not sure if I've understood what the problem is - from what I can
> tell
> >> it's about associating UnboundedSource splits with Checkpoints in order
> to
> >> get consistent behavior from the sources. If I'm wrong, the following
> >> isn't
> >> really relevant to your problem - it's about the expected behavior of a
> >> runner interacting with any split of a Source.
> >>
> >> In the absence of updates, the evaluation of a split of UnboundedSource
> >> must to obey the general contract for UnboundedSource, which is that
> >> createReader(PipelineOptions, CheckpointMarkT) will only ever be called
> >> with a Checkpoint Mark that was generated by an UnboundedReader that was
> >> created from the source - i.e., a Source creates Readers and is provided
> >> only checkpoints from those readers it creates. Each Source instance
> >> (split
> >> and top-level) should be independent of all other instances. A split of
> a
> >> Source should generally be indistinguishable from a top-level source (it
> >> will just have slightly different configuration).
> >>
> >> Generally this means that Source splits have to have an associated
> >> identifier, but these identifiers are arbitrary and not relevant to the
> >> actual evaluation of the Source - so the runner gets to tag splits
> however
> >> it pleases, so long as those tags don't allow splits to bleed into each
> >> other.
> >>
> >> Could you instead store the Source paired with some (arbitrary and
> unique)
> >> key and pull out the checkpoint using the key (or even just store the
> keys
> >> and store the source with the checkpoint)? That way you always will keep
> >> the same association between Source and Checkpoint. Flink does something
> >> like this where they store the serialized source alongside the
> >> CheckpointMark so they're never separated (
> >>
> >> https://github.com/apache/incubator-beam/blob/master/runners
> /flink/runner/src/main/java/org/apache/beam/runners/flink/
> translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L164
> >> and
> >>
> >> https://github.com/apache/incubator-beam/blob/master/runners
> /flink/runner/src/main/java/org/apache/beam/runners/flink/
> translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L334
> >> )
> >>
> >> On Mon, Sep 12, 2016 at 11:40 AM, Amit Sela <[email protected]>
> wrote:
> >>
> >> > If this issue doesn't make sense for "native" streaming systems, and
> >> it's
> >> > only a Spark issue (and my implementation of Read.Unbounded) - I could
> >> keep
> >> > doing what I do, use a running id.
> >> > I was just wondering... ( hence the question mark in the title ;-) )
> >> >
> >> > On Mon, Sep 12, 2016 at 9:31 PM Amit Sela <[email protected]>
> wrote:
> >> >
> >> > > Not sure how it works in Dataflow or Flink, but I'm working on an
> >> > > implementation for Spark using the (almost) only stateful operator
> it
> >> > has -
> >> > > "mapWithState" - and the State needs to correspond to a key.
> >> > > Each micro-batch, the Sources recreate the readers and "look-up" the
> >> > > latest checkpoint.
> >> > >
> >> > > On Mon, Sep 12, 2016 at 9:15 PM Raghu Angadi
> >> <[email protected]
> >> > >
> >> > > wrote:
> >> > >
> >> > >> On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela <[email protected]>
> >> wrote:
> >> > >>
> >> > >> > @Raghu, hashing <topic, partition> is exactly what I mean, but
> I'm
> >> > >> asking
> >> > >> > if it should be abstracted in the Source.. Otherwise, runners
> will
> >> > have
> >> > >> to
> >> > >> > *is instance of* on every Source, and write their own hash
> >> > >> implementation.
> >> > >> > Since splits contain the "splitted" Source, and it contains it's
> >> own
> >> > >> > CheckpointMark, and hashing would probably be tied to that
> >> > >> CheckpointMark,
> >> > >> > why not abstract it in the UnboundedSource ?
> >> > >> >
> >> > >>
> >> > >> I don't quite follow how a runner should be concerned about hashing
> >> used
> >> > >> by
> >> > >> a Source for its own splits. Can you give a concrete example? To me
> >> it
> >> > >> looks like source and checkpoint objects are completely opaque to
> the
> >> > >> runners.
> >> > >>
> >> > >>
> >> > >> > On Wed, Sep 7, 2016 at 3:02 AM Raghu Angadi
> >> > <[email protected]
> >> > >> >
> >> > >> > wrote:
> >> > >> >
> >> > >> > > > If splits (UnboundedSources) had an identifier, this could be
> >> > >> avoided,
> >> > >> > > and checkpoints could be persisted accordingly.
> >> > >> > >
> >> > >> > > The order of the splits that a source returns is preserved. So
> >> > during
> >> > >> an
> >> > >> > > update, you can expect 5th split gets invoked with the same
> >> > checkpoint
> >> > >> > mark
> >> > >> > > that 5th split saved before upgrade. You can hash <topic,
> >> partition>
> >> > >> to
> >> > >> > one
> >> > >> > > of the indices.
> >> > >> > >
> >> > >> > > KafkaIO.java doe not support change in partitions.
> >> > >> > >
> >> > >> > > On Tue, Sep 6, 2016 at 4:04 PM, Eugene Kirpichov <
> >> > >> > > [email protected]> wrote:
> >> > >> > >
> >> > >> > > > Oh! Okay, looks like this is a part of the code I was
> >> unfamiliar
> >> > >> with.
> >> > >> > > I'd
> >> > >> > > > like to know the answer too, in this case.
> >> > >> > > > +Daniel Mills <[email protected]> can you comment ?
> >> > >> > > >
> >> > >> > > > On Tue, Sep 6, 2016 at 3:32 PM Amit Sela <
> [email protected]
> >> >
> >> > >> wrote:
> >> > >> > > >
> >> > >> > > > > That is correct, as long as non of the Kafka topics "grow"
> >> > another
> >> > >> > > > > partition (which it could).
> >> > >> > > > > In that case, some bundle will have to start reading from
> >> this
> >> > >> > > partition
> >> > >> > > > as
> >> > >> > > > > well, and since all other bundles already have a "previous
> >> > >> > checkpoint"
> >> > >> > > it
> >> > >> > > > > matters which checkpoint to relate to. I don't know how
> it's
> >> > >> > > implemented
> >> > >> > > > in
> >> > >> > > > > Dataflow, but in Spark I'm testing using mapWithState to
> >> store
> >> > the
> >> > >> > > > > checkpoints per split.
> >> > >> > > > > It also seems that order does matter to the KafkIO:
> >> > >> > > > >
> >> > >> > > > > https://github.com/apache/incubator-beam/blob/master/
> >> > >> > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> >> > >> > > > kafka/KafkaIO.java#L636
> >> > >> > > > >
> >> > >> > > > > On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov
> >> > >> > > > > <[email protected]> wrote:
> >> > >> > > > >
> >> > >> > > > > > Hi Amit,
> >> > >> > > > > > Could you explain more about why you're saying the order
> of
> >> > >> splits
> >> > >> > > > > matters?
> >> > >> > > > > > AFAIK the semantics of Read.Unbounded is "read from all
> of
> >> the
> >> > >> > splits
> >> > >> > > > in
> >> > >> > > > > > parallel, checkpointing each of them independently", so
> >> their
> >> > >> order
> >> > >> > > > > > shouldn't matter.
> >> > >> > > > > >
> >> > >> > > > > > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela <
> >> > [email protected]>
> >> > >> > > wrote:
> >> > >> > > > > >
> >> > >> > > > > > > UnboundedSources generate initial splits, which are
> >> > basically
> >> > >> > > splits
> >> > >> > > > of
> >> > >> > > > > > > them selves - for example, if an UnboundedKafkaSource
> is
> >> set
> >> > >> to
> >> > >> > > read
> >> > >> > > > > from
> >> > >> > > > > > > topic T1 which is made of 2 partitions P1 and P2, it
> will
> >> > >> > > (optimally)
> >> > >> > > > > > split
> >> > >> > > > > > > into two UnboundedKafkaSource, one per partition.
> >> > >> > > > > > > During the lifecycle of the "reader" bundles,
> >> > CheckpointMarks
> >> > >> are
> >> > >> > > > used
> >> > >> > > > > to
> >> > >> > > > > > > checkpoint "last-read" and so readers may
> restart/resume.
> >> > I'm
> >> > >> > > > assuming
> >> > >> > > > > > this
> >> > >> > > > > > > is how newly created partitions will automatically be
> >> added
> >> > to
> >> > >> > > > readers.
> >> > >> > > > > > >
> >> > >> > > > > > > The problem is that it's all fine while there is only
> one
> >> > >> topic
> >> > >> > > > (Kafka
> >> > >> > > > > > > topic-partition pairs are ordered), but if reading from
> >> more
> >> > >> then
> >> > >> > > one
> >> > >> > > > > > topic
> >> > >> > > > > > > this may break:
> >> > >> > > > > > > T1,P1
> >> > >> > > > > > > T1,P2
> >> > >> > > > > > > T1,P3
> >> > >> > > > > > > T2,P1
> >> > >> > > > > > > The order is not maintained and T2,P1 is 4th now.
> >> > >> > > > > > >
> >> > >> > > > > > > If splits (UnboundedSources) had an identifier, this
> >> could
> >> > be
> >> > >> > > > avoided,
> >> > >> > > > > > and
> >> > >> > > > > > > checkpoints could be persisted accordingly.
> >> > >> > > > > > > For example, an UnboundedKafkaSource could use the hash
> >> code
> >> > >> of
> >> > >> > > it's
> >> > >> > > > > > > assigned topic-partition pairs.
> >> > >> > > > > > >
> >> > >> > > > > > > I don't know how relevant this is to other Sources,
> but I
> >> > >> guess
> >> > >> > it
> >> > >> > > is
> >> > >> > > > > as
> >> > >> > > > > > > long as they may grow their partitions dynamically
> >> (though I
> >> > >> > might
> >> > >> > > be
> >> > >> > > > > > > completely wrong...) and I don't see much of a
> downside.
> >> > >> > > > > > >
> >> > >> > > > > > > Thoughts ?
> >> > >> > > > > > >
> >> > >> > > > > > > This is something that troubles me now while working on
> >> > >> > > > Read.Unbounded,
> >> > >> > > > > > and
> >> > >> > > > > > > from a quick look I saw that the FlinkRunner expects
> >> > "stable"
> >> > >> > > > splitting
> >> > >> > > > > > as
> >> > >> > > > > > > well.. How does Dataflow handle this ?
> >> > >> > > > > > >
> >> > >> > > > > > > Thanks,
> >> > >> > > > > > > Amit
> >> > >> > > > > > >
> >> > >> > > > > >
> >> > >> > > > >
> >> > >> > > >
> >> > >> > >
> >> > >> >
> >> > >>
> >> > >
> >> >
> >>
> >
>

Reply via email to