Thanks Amit!

A little bit inline.

On Fri, Oct 7, 2016 at 4:55 PM, Amit Sela <[email protected]> wrote:

> I started a thread about (suggesting) UnboundedSource splitId's and it
> turned into an UnboundedSource/KafkaIO discussion, and I think it's best to
> start over in a clear [DISCUSS] thread.
>
> When working on UnboundedSource support for the Spark runner, I've raised
> some questions, some were general-UnboundedSource, and others
> Kafka-specific.
>
> I'd like to recap them here, and maybe have a more productive and
> well-documented discussion for everyone.
>
>    1. UnboundedSource id's - I assume any runner persists the
>    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder how
> it
>    matches the appropriate split (of the UnboundedSource) to it's
> previously
>    persisted CheckpointMark in any specific worker ?
>    *Thomas Groh* mentioned that Source splits have to have an
> associated identifier,
>    and so the runner gets to tag splits however it pleases, so long as
>    those tags don't allow splits to bleed into each other.
>

   2. Consistent splitting - an UnboundedSource splitting seems to require
>    consistent splitting if it were to "pick-up where it left", correct ?
> this
>    is not mentioned as a requirement or a recommendation in
>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only issue
> ?
>    *Raghu Angadi* mentioned that Kafka already does so by applying
>    partitions to readers in a round-robin manner.
>    *Thomas Groh* also added that while the UnboundedSource API doesn't
>    require deterministic splitting (although it's recommended), a
>    PipelineRunner
>    should keep track of the initially generated splits.
>

The usual approach (I think this answers both 1 & 2) is that a runner
persists the actual original source (which is required to be serializable)
along with the checkpoints (which are required to be Codeable). This way
when the runner wants to resume from a checkpoint, it can use that same
source -- and thus splitting need not be deterministic.

It sounded like "pick up where it left off" is reminiscent of Dataflow's
pipeline update. That indeed requires
UnboundedSource#generateInitialSplits() to be deterministic, if sources
have configuration information, because we may try to link up new sources
with old checkpoints and we need to do so correctly. But of course this is
not super-awesome, because it's hard to make deterministic functions.
Especially if, e.g., the Kafka partitioning has changed in the meantime.


>    3. Support reading of Kafka partitions that were added to topic/s while
>    a Pipeline reads from them - BEAM-727
>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>

See above :). Tough, but useful.


>    4. Reading/persisting Kafka start offsets - since Spark works in
>    micro-batches, if "latest" was applied on a fairly sparse topic each
> worker
>    would actually begin reading only after it saw a message during the time
>    window it had to read messages. This is because fetching the offsets is
>    done by the worker running the Reader. This means that each Reader sees
> a
>    different state of "latest" (for his partition/s), such that a failing
>    Reader that hasn't read yet might fetch a different "latest" once it's
>    recovered then what it originally fetched. While this may not be as
> painful
>    for other runners, IMHO it lacks correctness and I'd suggest either
> reading
>    Kafka metadata of the Kafka cluster once upon initial splitting, or add
>    some of it to the CheckpointMark. Filed BEAM-704
>    <https://issues.apache.org/jira/browse/BEAM-704>.
>

+1. This is a great point. The notion that a runner may stop a reader and
resume it from a checkpoint frequently is definitely part of the Beam model
-- right now Spark and Direct runners (at least) do it very often. The
current behavior is definitely, if not broken... unexpected.

Both proposed solutions make sense to me -- either log the last offset for
all partitions during splitting, or simply log the previous offset in the
checkpoint mark when we start reading for the first time.


>
> The original thread is called "Should UnboundedSource provide a split
> identifier ?".
>
> While the only specific implementation of UnboundedSource discussed here is
> Kafka, it is probably the most popular open-source UnboundedSource. Having
> said that, I wonder where this meets PubSub ? or any other UnboundedSource
> that those questions might affect.
>

FWIW, Google Cloud Pub/Sub sources do not have state. Rather than connect
to specific partitions, they just say "hey give me some messages". So the
splitting is easy and deterministic for free. [link
<https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java#L1043>
]


>
> Thanks,
> Amit
>

Reply via email to