[
https://issues.apache.org/jira/browse/SPARK-17937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon updated SPARK-17937:
---------------------------------
Labels: bulk-closed (was: )
> Clarify Kafka offset semantics for Structured Streaming
> -------------------------------------------------------
>
> Key: SPARK-17937
> URL: https://issues.apache.org/jira/browse/SPARK-17937
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Reporter: Cody Koeninger
> Priority: Critical
> Labels: bulk-closed
>
> Possible events for which offsets are needed:
> # New partition is discovered
> # Offset out of range (aka, data has been lost). It's possible to separate
> this into offset too small and offset too large, but I'm not sure it matters
> for us.
> Possible sources of offsets:
> # *Earliest* position in log
> # *Latest* position in log
> # *Fail* and kill the query
> # *Checkpoint* position
> # *User specified* per topicpartition
> # *Kafka commit log*. Currently unsupported. This means users who want to
> migrate from existing kafka jobs need to jump through hoops. Even if we
> never want to support it, as soon as we take on SPARK-17815 we need to make
> sure Kafka commit log state is clearly documented and handled.
> # *Timestamp*. Currently unsupported. This could be supported with old,
> inaccurate Kafka time api, or upcoming time index
> # *X offsets* before or after latest / earliest position. Currently
> unsupported. I think the semantics of this are super unclear by comparison
> with timestamp, given that Kafka doesn't have a single range of offsets.
> Currently allowed pre-query configuration, all "ORs" are exclusive:
> # startingOffsets: *earliest* OR *latest* OR *User specified* json per
> topicpartition (SPARK-17812)
> # failOnDataLoss: true (which implies *Fail* above) OR false (which implies
> *Earliest* above) In general, I see no reason this couldn't specify Latest
> as an option.
> Possible lifecycle times in which an offset-related event may happen:
> # At initial query start
> #* New partition: if startingOffsets is *Earliest* or *Latest*, use that. If
> startingOffsets is *User specified* perTopicpartition, and the new partition
> isn't in the map, *Fail*. Note that this is effectively undistinguishable
> from new parititon during query, because partitions may have changed in
> between pre-query configuration and query start, but we treat it differently,
> and users in this case are SOL
> #* Offset out of range on driver: We don't technically have behavior for this
> case yet. Could use the value of failOnDataLoss, but it's possible people
> may want to know at startup that something was wrong, even if they're ok with
> earliest for a during-query out of range
> #* Offset out of range on executor: seems like it should be *Fail* or
> *Earliest*, based on failOnDataLoss. but it looks like this setting is
> currently ignored, and the executor will just fail...
> # During query
> #* New partition: *Earliest*, only. This seems to be by fiat, I see no
> reason this can't be configurable.
> #* Offset out of range on driver: this _probably_ doesn't happen, because
> we're doing explicit seeks to the latest position
> #* Offset out of range on executor: ?
> # At query restart
> #* New partition: *Checkpoint*, fall back to *Earliest*. Again, no reason
> this couldn't be configurable fall back to Latest
> #* Offset out of range on driver: this _probably_ doesn't happen, because
> we're doing explicit seeks to the specified position
> #* Offset out of range on executor: ?
> I've probably missed something, chime in.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]