Regards
JB
On 10/08/2016 01:55 AM, Amit Sela wrote:
I started a thread about (suggesting) UnboundedSource splitId's and it
turned into an UnboundedSource/KafkaIO discussion, and I think it's best
to
start over in a clear [DISCUSS] thread.
When working on UnboundedSource support for the Spark runner, I've
raised
some questions, some were general-UnboundedSource, and others
Kafka-specific.
I'd like to recap them here, and maybe have a more productive and
well-documented discussion for everyone.
1. UnboundedSource id's - I assume any runner persists the
UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
how it
matches the appropriate split (of the UnboundedSource) to it's
previously
persisted CheckpointMark in any specific worker ?
*Thomas Groh* mentioned that Source splits have to have an
associated identifier,
and so the runner gets to tag splits however it pleases, so long as
those tags don't allow splits to bleed into each other.
2. Consistent splitting - an UnboundedSource splitting seems to
require
consistent splitting if it were to "pick-up where it left", correct ?
this
is not mentioned as a requirement or a recommendation in
UnboundedSource#generateInitialSplits(), so is this a Kafka-only
issue ?
*Raghu Angadi* mentioned that Kafka already does so by applying
partitions to readers in a round-robin manner.
*Thomas Groh* also added that while the UnboundedSource API doesn't
require deterministic splitting (although it's recommended), a
PipelineRunner
should keep track of the initially generated splits.
3. Support reading of Kafka partitions that were added to topic/s
while
a Pipeline reads from them - BEAM-727
<https://issues.apache.org/jira/browse/BEAM-727> was filed.
4. Reading/persisting Kafka start offsets - since Spark works in
micro-batches, if "latest" was applied on a fairly sparse topic each
worker
would actually begin reading only after it saw a message during the
time
window it had to read messages. This is because fetching the offsets
is
done by the worker running the Reader. This means that each Reader
sees a
different state of "latest" (for his partition/s), such that a
failing
Reader that hasn't read yet might fetch a different "latest" once
it's
recovered then what it originally fetched. While this may not be as
painful
for other runners, IMHO it lacks correctness and I'd suggest either
reading
Kafka metadata of the Kafka cluster once upon initial splitting, or
add
some of it to the CheckpointMark. Filed BEAM-704
<https://issues.apache.org/jira/browse/BEAM-704>.
The original thread is called "Should UnboundedSource provide a split
identifier ?".
While the only specific implementation of UnboundedSource discussed here
is
Kafka, it is probably the most popular open-source UnboundedSource.
Having
said that, I wonder where this meets PubSub ? or any other
UnboundedSource
that those questions might affect.
Thanks,
Amit
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com