On Tue, Sep 13, 2016 at 7:30 PM Thomas Groh <[email protected]>
wrote:

> Yes. We don't currently permit adding (or modifying) partitions in Kafka
> while a Pipeline is running (without updates). Our understanding was that
> this was a rare occurrence, but it's not impossible to support.
>
Don't know how rare it is, but I know that Kafka supports adding
partitions, and Spark can handle this while reading from Kafka.

>
> For generateInitialSplits, the UnboundedSource API doesn't require
> deterministic splitting (although it's recommended), and a PipelineRunner
> should keep track of the initially generated splits.
>
If the splitting were to be consistent, in such way that newly added
partitions would be assigned with a new "splitId" while existing ones would
still be assigned with the same (consistent) splitId, it could support
newly added partitions, no ?

>
> On Tue, Sep 13, 2016 at 1:49 AM, Amit Sela <[email protected]> wrote:
>
> > If I understand correctly this will break
> > https://github.com/apache/incubator-beam/blob/master/sdks/
> >
> java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L857
> > in
> > KafkaIO.
> >
> > So it's a KafkaIO limitation (for now ?) ?
> >
> > On Tue, Sep 13, 2016 at 11:31 AM Amit Sela <[email protected]> wrote:
> >
> > > Thanks Thomas, you did understand correct.
> > > Doing this, or assigning a running id, is basically the same, as long
> as
> > > generateInitialSplits implementation is deterministic (KafkaIO actually
> > > notes this).
> > >
> > > So what if partitions were added at runtime to one (or more) of the
> > topics
> > > I'm consuming from ?
> > >
> > > On Tue, Sep 13, 2016 at 3:51 AM Thomas Groh <[email protected]>
> > > wrote:
> > >
> > >> I'm not sure if I've understood what the problem is - from what I can
> > tell
> > >> it's about associating UnboundedSource splits with Checkpoints in
> order
> > to
> > >> get consistent behavior from the sources. If I'm wrong, the following
> > >> isn't
> > >> really relevant to your problem - it's about the expected behavior of
> a
> > >> runner interacting with any split of a Source.
> > >>
> > >> In the absence of updates, the evaluation of a split of
> UnboundedSource
> > >> must to obey the general contract for UnboundedSource, which is that
> > >> createReader(PipelineOptions, CheckpointMarkT) will only ever be
> called
> > >> with a Checkpoint Mark that was generated by an UnboundedReader that
> was
> > >> created from the source - i.e., a Source creates Readers and is
> provided
> > >> only checkpoints from those readers it creates. Each Source instance
> > >> (split
> > >> and top-level) should be independent of all other instances. A split
> of
> > a
> > >> Source should generally be indistinguishable from a top-level source
> (it
> > >> will just have slightly different configuration).
> > >>
> > >> Generally this means that Source splits have to have an associated
> > >> identifier, but these identifiers are arbitrary and not relevant to
> the
> > >> actual evaluation of the Source - so the runner gets to tag splits
> > however
> > >> it pleases, so long as those tags don't allow splits to bleed into
> each
> > >> other.
> > >>
> > >> Could you instead store the Source paired with some (arbitrary and
> > unique)
> > >> key and pull out the checkpoint using the key (or even just store the
> > keys
> > >> and store the source with the checkpoint)? That way you always will
> keep
> > >> the same association between Source and Checkpoint. Flink does
> something
> > >> like this where they store the serialized source alongside the
> > >> CheckpointMark so they're never separated (
> > >>
> > >> https://github.com/apache/incubator-beam/blob/master/runners
> > /flink/runner/src/main/java/org/apache/beam/runners/flink/
> > translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L164
> > >> and
> > >>
> > >> https://github.com/apache/incubator-beam/blob/master/runners
> > /flink/runner/src/main/java/org/apache/beam/runners/flink/
> > translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L334
> > >> )
> > >>
> > >> On Mon, Sep 12, 2016 at 11:40 AM, Amit Sela <[email protected]>
> > wrote:
> > >>
> > >> > If this issue doesn't make sense for "native" streaming systems, and
> > >> it's
> > >> > only a Spark issue (and my implementation of Read.Unbounded) - I
> could
> > >> keep
> > >> > doing what I do, use a running id.
> > >> > I was just wondering... ( hence the question mark in the title ;-) )
> > >> >
> > >> > On Mon, Sep 12, 2016 at 9:31 PM Amit Sela <[email protected]>
> > wrote:
> > >> >
> > >> > > Not sure how it works in Dataflow or Flink, but I'm working on an
> > >> > > implementation for Spark using the (almost) only stateful operator
> > it
> > >> > has -
> > >> > > "mapWithState" - and the State needs to correspond to a key.
> > >> > > Each micro-batch, the Sources recreate the readers and "look-up"
> the
> > >> > > latest checkpoint.
> > >> > >
> > >> > > On Mon, Sep 12, 2016 at 9:15 PM Raghu Angadi
> > >> <[email protected]
> > >> > >
> > >> > > wrote:
> > >> > >
> > >> > >> On Wed, Sep 7, 2016 at 7:13 AM, Amit Sela <[email protected]>
> > >> wrote:
> > >> > >>
> > >> > >> > @Raghu, hashing <topic, partition> is exactly what I mean, but
> > I'm
> > >> > >> asking
> > >> > >> > if it should be abstracted in the Source.. Otherwise, runners
> > will
> > >> > have
> > >> > >> to
> > >> > >> > *is instance of* on every Source, and write their own hash
> > >> > >> implementation.
> > >> > >> > Since splits contain the "splitted" Source, and it contains
> it's
> > >> own
> > >> > >> > CheckpointMark, and hashing would probably be tied to that
> > >> > >> CheckpointMark,
> > >> > >> > why not abstract it in the UnboundedSource ?
> > >> > >> >
> > >> > >>
> > >> > >> I don't quite follow how a runner should be concerned about
> hashing
> > >> used
> > >> > >> by
> > >> > >> a Source for its own splits. Can you give a concrete example? To
> me
> > >> it
> > >> > >> looks like source and checkpoint objects are completely opaque to
> > the
> > >> > >> runners.
> > >> > >>
> > >> > >>
> > >> > >> > On Wed, Sep 7, 2016 at 3:02 AM Raghu Angadi
> > >> > <[email protected]
> > >> > >> >
> > >> > >> > wrote:
> > >> > >> >
> > >> > >> > > > If splits (UnboundedSources) had an identifier, this could
> be
> > >> > >> avoided,
> > >> > >> > > and checkpoints could be persisted accordingly.
> > >> > >> > >
> > >> > >> > > The order of the splits that a source returns is preserved.
> So
> > >> > during
> > >> > >> an
> > >> > >> > > update, you can expect 5th split gets invoked with the same
> > >> > checkpoint
> > >> > >> > mark
> > >> > >> > > that 5th split saved before upgrade. You can hash <topic,
> > >> partition>
> > >> > >> to
> > >> > >> > one
> > >> > >> > > of the indices.
> > >> > >> > >
> > >> > >> > > KafkaIO.java doe not support change in partitions.
> > >> > >> > >
> > >> > >> > > On Tue, Sep 6, 2016 at 4:04 PM, Eugene Kirpichov <
> > >> > >> > > [email protected]> wrote:
> > >> > >> > >
> > >> > >> > > > Oh! Okay, looks like this is a part of the code I was
> > >> unfamiliar
> > >> > >> with.
> > >> > >> > > I'd
> > >> > >> > > > like to know the answer too, in this case.
> > >> > >> > > > +Daniel Mills <[email protected]> can you comment ?
> > >> > >> > > >
> > >> > >> > > > On Tue, Sep 6, 2016 at 3:32 PM Amit Sela <
> > [email protected]
> > >> >
> > >> > >> wrote:
> > >> > >> > > >
> > >> > >> > > > > That is correct, as long as non of the Kafka topics
> "grow"
> > >> > another
> > >> > >> > > > > partition (which it could).
> > >> > >> > > > > In that case, some bundle will have to start reading from
> > >> this
> > >> > >> > > partition
> > >> > >> > > > as
> > >> > >> > > > > well, and since all other bundles already have a
> "previous
> > >> > >> > checkpoint"
> > >> > >> > > it
> > >> > >> > > > > matters which checkpoint to relate to. I don't know how
> > it's
> > >> > >> > > implemented
> > >> > >> > > > in
> > >> > >> > > > > Dataflow, but in Spark I'm testing using mapWithState to
> > >> store
> > >> > the
> > >> > >> > > > > checkpoints per split.
> > >> > >> > > > > It also seems that order does matter to the KafkIO:
> > >> > >> > > > >
> > >> > >> > > > > https://github.com/apache/incubator-beam/blob/master/
> > >> > >> > > > sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/
> > >> > >> > > > kafka/KafkaIO.java#L636
> > >> > >> > > > >
> > >> > >> > > > > On Wed, Sep 7, 2016 at 1:24 AM Eugene Kirpichov
> > >> > >> > > > > <[email protected]> wrote:
> > >> > >> > > > >
> > >> > >> > > > > > Hi Amit,
> > >> > >> > > > > > Could you explain more about why you're saying the
> order
> > of
> > >> > >> splits
> > >> > >> > > > > matters?
> > >> > >> > > > > > AFAIK the semantics of Read.Unbounded is "read from all
> > of
> > >> the
> > >> > >> > splits
> > >> > >> > > > in
> > >> > >> > > > > > parallel, checkpointing each of them independently", so
> > >> their
> > >> > >> order
> > >> > >> > > > > > shouldn't matter.
> > >> > >> > > > > >
> > >> > >> > > > > > On Tue, Sep 6, 2016 at 3:17 PM Amit Sela <
> > >> > [email protected]>
> > >> > >> > > wrote:
> > >> > >> > > > > >
> > >> > >> > > > > > > UnboundedSources generate initial splits, which are
> > >> > basically
> > >> > >> > > splits
> > >> > >> > > > of
> > >> > >> > > > > > > them selves - for example, if an UnboundedKafkaSource
> > is
> > >> set
> > >> > >> to
> > >> > >> > > read
> > >> > >> > > > > from
> > >> > >> > > > > > > topic T1 which is made of 2 partitions P1 and P2, it
> > will
> > >> > >> > > (optimally)
> > >> > >> > > > > > split
> > >> > >> > > > > > > into two UnboundedKafkaSource, one per partition.
> > >> > >> > > > > > > During the lifecycle of the "reader" bundles,
> > >> > CheckpointMarks
> > >> > >> are
> > >> > >> > > > used
> > >> > >> > > > > to
> > >> > >> > > > > > > checkpoint "last-read" and so readers may
> > restart/resume.
> > >> > I'm
> > >> > >> > > > assuming
> > >> > >> > > > > > this
> > >> > >> > > > > > > is how newly created partitions will automatically be
> > >> added
> > >> > to
> > >> > >> > > > readers.
> > >> > >> > > > > > >
> > >> > >> > > > > > > The problem is that it's all fine while there is only
> > one
> > >> > >> topic
> > >> > >> > > > (Kafka
> > >> > >> > > > > > > topic-partition pairs are ordered), but if reading
> from
> > >> more
> > >> > >> then
> > >> > >> > > one
> > >> > >> > > > > > topic
> > >> > >> > > > > > > this may break:
> > >> > >> > > > > > > T1,P1
> > >> > >> > > > > > > T1,P2
> > >> > >> > > > > > > T1,P3
> > >> > >> > > > > > > T2,P1
> > >> > >> > > > > > > The order is not maintained and T2,P1 is 4th now.
> > >> > >> > > > > > >
> > >> > >> > > > > > > If splits (UnboundedSources) had an identifier, this
> > >> could
> > >> > be
> > >> > >> > > > avoided,
> > >> > >> > > > > > and
> > >> > >> > > > > > > checkpoints could be persisted accordingly.
> > >> > >> > > > > > > For example, an UnboundedKafkaSource could use the
> hash
> > >> code
> > >> > >> of
> > >> > >> > > it's
> > >> > >> > > > > > > assigned topic-partition pairs.
> > >> > >> > > > > > >
> > >> > >> > > > > > > I don't know how relevant this is to other Sources,
> > but I
> > >> > >> guess
> > >> > >> > it
> > >> > >> > > is
> > >> > >> > > > > as
> > >> > >> > > > > > > long as they may grow their partitions dynamically
> > >> (though I
> > >> > >> > might
> > >> > >> > > be
> > >> > >> > > > > > > completely wrong...) and I don't see much of a
> > downside.
> > >> > >> > > > > > >
> > >> > >> > > > > > > Thoughts ?
> > >> > >> > > > > > >
> > >> > >> > > > > > > This is something that troubles me now while working
> on
> > >> > >> > > > Read.Unbounded,
> > >> > >> > > > > > and
> > >> > >> > > > > > > from a quick look I saw that the FlinkRunner expects
> > >> > "stable"
> > >> > >> > > > splitting
> > >> > >> > > > > > as
> > >> > >> > > > > > > well.. How does Dataflow handle this ?
> > >> > >> > > > > > >
> > >> > >> > > > > > > Thanks,
> > >> > >> > > > > > > Amit
> > >> > >> > > > > > >
> > >> > >> > > > > >
> > >> > >> > > > >
> > >> > >> > > >
> > >> > >> > >
> > >> > >> >
> > >> > >>
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to