[
https://issues.apache.org/jira/browse/SPARK-17812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573526#comment-15573526
]
Michael Armbrust commented on SPARK-17812:
------------------------------------------
As far as I understand it, {{auto.offset.reset}} is conflating a few things
that make it hard for me to reason about exactly-once semantics in my query.
It is answering all of the following:
- Where do I start when I'm creating this {{group.id}} for the first time?
- What do I do when a new partition is added to a topic I'm watching?
- What do I do when the current offset is invalid because of retention?
The model of structured streaming is an append only table, where we are
computing the same answer incrementally as if you were running a batch query
over all of the data in the table. The whole goal is to make it easy to reason
about correctness and push the hard work of incremental processing and late
data management into the optimizer / query planner. As a result, I think we
are trying to answer a different set of questions than a distributed set of
consumers that share a {{group.id}}:
- Should this append only table contain all of the historical data available,
or do I begin at this moment and start appending? This is what
{{startingOffsets}} answers. I think we should handle {{"earliest"}} (all
data), {{"latest"}} (only data that arrives after now), and a very specific
point in time across partitions (probably when some other query stopped
running).
- When I get into a situation where data has been deleted by the retention
mechanism without me seeing it, what should I do? Fail the query? Or issue a
warning and compute best effort on the data available. This is what
{{failOnDataLoss}} answers.
In particular, I think the kafka method of configuration makes it confusing to
do something like, "starting now, compute some aggregation exactly once". The
documentation even points out some of the pit falls:
bq. ... If this is set to largest, the consumer may lose some messages when the
number of partitions, for the topics it subscribes to, changes on the broker.
To prevent data loss during partition addition, set auto.offset.reset to
smallest.
Really what I want here is, "begin the query at largest", but "start new
partitions at smallest (and in fact, tell me if I'm so late joining a new
partition that I have already lost some data)".
> More granular control of starting offsets (assign)
> --------------------------------------------------
>
> Key: SPARK-17812
> URL: https://issues.apache.org/jira/browse/SPARK-17812
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Reporter: Michael Armbrust
>
> Right now you can only run a Streaming Query starting from either the
> earliest or latests offsets available at the moment the query is started.
> Sometimes this is a lot of data. It would be nice to be able to do the
> following:
> - seek to user specified offsets for manually specified topicpartitions
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]