Michael Armbrust commented on SPARK-17812:

As far as I understand it, {{auto.offset.reset}} is conflating a few things 
that make it hard for me to reason about exactly-once semantics in my query.  
It is answering all of the following:
 - Where do I start when I'm creating this {{group.id}} for the first time?
 - What do I do when a new partition is added to a topic I'm watching?
 - What do I do when the current offset is invalid because of retention?

The model of structured streaming is an append only table, where we are 
computing the same answer incrementally as if you were running a batch query 
over all of the data in the table.  The whole goal is to make it easy to reason 
about correctness and push the hard work of incremental processing and late 
data management into the optimizer / query planner.  As a result, I think we 
are trying to answer a different set of questions than a distributed set of 
consumers that share a {{group.id}}:
 - Should this append only table contain all of the historical data available, 
or do I begin at this moment and start appending?  This is what 
{{startingOffsets}} answers.  I think we should handle {{"earliest"}} (all 
data), {{"latest"}} (only data that arrives after now), and a very specific 
point in time across partitions (probably when some other query stopped 
 - When I get into a situation where data has been deleted by the retention 
mechanism without me seeing it, what should I do?  Fail the query?  Or issue a 
warning and compute best effort on the data available.   This is what 
{{failOnDataLoss}} answers.

In particular, I think the kafka method of configuration makes it confusing to 
do something like, "starting now, compute some aggregation exactly once".  The 
documentation even points out some of the pit falls:
bq. ... If this is set to largest, the consumer may lose some messages when the 
number of partitions, for the topics it subscribes to, changes on the broker. 
To prevent data loss during partition addition, set auto.offset.reset to 

Really what I want here is, "begin the query at largest", but "start new 
partitions at smallest (and in fact, tell me if I'm so late joining a new 
partition that I have already lost some data)".

> More granular control of starting offsets (assign)
> --------------------------------------------------
>                 Key: SPARK-17812
>                 URL: https://issues.apache.org/jira/browse/SPARK-17812
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Michael Armbrust
> Right now you can only run a Streaming Query starting from either the 
> earliest or latests offsets available at the moment the query is started.  
> Sometimes this is a lot of data.  It would be nice to be able to do the 
> following:
>  - seek to user specified offsets for manually specified topicpartitions

This message was sent by Atlassian JIRA

To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to