IMO, the currentOffset should not be optional.
For continuous mode I assume this offset gets periodically check pointed
(so mandatory) ?
For the micro batch mode the currentOffset would be the start offset for a
micro-batch.

And if the micro-batch could be executed without knowing the 'latest'
offset (say until 'next' returns false), we only need the current offset
(to figure out the offset boundaries of a micro-batch) and may be then the
'latest' offset is not needed at all.

- Arun


On Tue, 13 Nov 2018 at 16:01, Ryan Blue <rb...@netflix.com.invalid> wrote:

> Hi everyone,
> I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at
> 17:00 PST, which is 01:00 UTC.
>
> Here are some of the topics under discussion in the last couple of weeks:
>
>    - Read API for v2 - see Wenchen’s doc
>    
> <https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#heading=h.2h7sf1665hzn>
>    - Capabilities API - see the dev list thread
>    
> <https://mail-archives.apache.org/mod_mbox/spark-dev/201811.mbox/%3CCAO4re1%3Doizqo1oFfVViK3bKWCp7MROeATXcWAEUY5%2B8Vpf6GGw%40mail.gmail.com%3E>
>    - Using CatalogTableIdentifier to reliably separate v2 code paths -
>    see PR #21978 <https://github.com/apache/spark/pull/21978>
>    - A replacement for InternalRow
>
> I know that a lot of people are also interested in combining the source
> API for micro-batch and continuous streaming. Wenchen and I have been
> discussing a way to do that and Wenchen has added it to the Read API doc as
> Alternative #2. I think this would be a good thing to plan on discussing.
>
> rb
>
> Here’s some additional background on combining micro-batch and continuous
> APIs:
>
> The basic idea is to update how tasks end so that the same tasks can be
> used in micro-batch or streaming. For tasks that are naturally limited like
> data files, when the data is exhausted, Spark stops reading. For tasks that
> are not limited, like a Kafka partition, Spark decides when to stop in
> micro-batch mode by hitting a pre-determined LocalOffset or Spark can just
> keep running in continuous mode.
>
> Note that a task deciding to stop can happen in both modes, either when a
> task is exhausted in micro-batch or when a stream needs to be reconfigured
> in continuous.
>
> Here’s the task reader API. The offset returned is optional so that a task
> can avoid stopping if there isn’t a resumeable offset, like if it is in the
> middle of an input file:
>
> interface StreamPartitionReader<T> extends InputPartitionReader<T> {
>   Optional<LocalOffset> currentOffset();
>   boolean next() // from InputPartitionReader
>   T get()        // from InputPartitionReader
> }
>
> The streaming code would look something like this:
>
> Stream stream = scan.toStream()
> StreamReaderFactory factory = stream.createReaderFactory()
>
> while (true) {
>   Offset start = stream.currentOffset()
>   Offset end = if (isContinuousMode) {
>     None
>   } else {
>     // rate limiting would happen here
>     Some(stream.latestOffset())
>   }
>
>   InputPartition[] parts = stream.planInputPartitions(start)
>
>   // returns when needsReconfiguration is true or all tasks finish
>   runTasks(parts, factory, end)
>
>   // the stream's current offset has been updated at the last epoch
> }
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to