Hi Amit,
For 1., the runner is responsible of the checkpoint storage (associated
with the source). It's the way for the runner to retry and know the
failed bundles.
For 4, are you proposing that KafkaRecord store additional metadata for
that ? It sounds like what I proposed in the "Technical Vision" appendix
document: there I proposed to introduce a DataXchange object that store
some additional metadata (like offset) used by the runner. It would be
the same with SDF as the tracker state should be persistent as well.
Regards
JB
On 10/08/2016 01:55 AM, Amit Sela wrote:
I started a thread about (suggesting) UnboundedSource splitId's and it
turned into an UnboundedSource/KafkaIO discussion, and I think it's best to
start over in a clear [DISCUSS] thread.
When working on UnboundedSource support for the Spark runner, I've raised
some questions, some were general-UnboundedSource, and others
Kafka-specific.
I'd like to recap them here, and maybe have a more productive and
well-documented discussion for everyone.
1. UnboundedSource id's - I assume any runner persists the
UnboundedSources's CheckpointMark for fault-tolerance, but I wonder how it
matches the appropriate split (of the UnboundedSource) to it's previously
persisted CheckpointMark in any specific worker ?
*Thomas Groh* mentioned that Source splits have to have an
associated identifier,
and so the runner gets to tag splits however it pleases, so long as
those tags don't allow splits to bleed into each other.
2. Consistent splitting - an UnboundedSource splitting seems to require
consistent splitting if it were to "pick-up where it left", correct ? this
is not mentioned as a requirement or a recommendation in
UnboundedSource#generateInitialSplits(), so is this a Kafka-only issue ?
*Raghu Angadi* mentioned that Kafka already does so by applying
partitions to readers in a round-robin manner.
*Thomas Groh* also added that while the UnboundedSource API doesn't
require deterministic splitting (although it's recommended), a
PipelineRunner
should keep track of the initially generated splits.
3. Support reading of Kafka partitions that were added to topic/s while
a Pipeline reads from them - BEAM-727
<https://issues.apache.org/jira/browse/BEAM-727> was filed.
4. Reading/persisting Kafka start offsets - since Spark works in
micro-batches, if "latest" was applied on a fairly sparse topic each worker
would actually begin reading only after it saw a message during the time
window it had to read messages. This is because fetching the offsets is
done by the worker running the Reader. This means that each Reader sees a
different state of "latest" (for his partition/s), such that a failing
Reader that hasn't read yet might fetch a different "latest" once it's
recovered then what it originally fetched. While this may not be as painful
for other runners, IMHO it lacks correctness and I'd suggest either reading
Kafka metadata of the Kafka cluster once upon initial splitting, or add
some of it to the CheckpointMark. Filed BEAM-704
<https://issues.apache.org/jira/browse/BEAM-704>.
The original thread is called "Should UnboundedSource provide a split
identifier ?".
While the only specific implementation of UnboundedSource discussed here is
Kafka, it is probably the most popular open-source UnboundedSource. Having
said that, I wonder where this meets PubSub ? or any other UnboundedSource
that those questions might affect.
Thanks,
Amit
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com