Hi everyone,
I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at
17:00 PST, which is 01:00 UTC.

Here are some of the topics under discussion in the last couple of weeks:

   - Read API for v2 - see Wenchen’s doc
   
<https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#heading=h.2h7sf1665hzn>
   - Capabilities API - see the dev list thread
   
<https://mail-archives.apache.org/mod_mbox/spark-dev/201811.mbox/%3CCAO4re1%3Doizqo1oFfVViK3bKWCp7MROeATXcWAEUY5%2B8Vpf6GGw%40mail.gmail.com%3E>
   - Using CatalogTableIdentifier to reliably separate v2 code paths - see PR
   #21978 <https://github.com/apache/spark/pull/21978>
   - A replacement for InternalRow

I know that a lot of people are also interested in combining the source API
for micro-batch and continuous streaming. Wenchen and I have been
discussing a way to do that and Wenchen has added it to the Read API doc as
Alternative #2. I think this would be a good thing to plan on discussing.

rb

Here’s some additional background on combining micro-batch and continuous
APIs:

The basic idea is to update how tasks end so that the same tasks can be
used in micro-batch or streaming. For tasks that are naturally limited like
data files, when the data is exhausted, Spark stops reading. For tasks that
are not limited, like a Kafka partition, Spark decides when to stop in
micro-batch mode by hitting a pre-determined LocalOffset or Spark can just
keep running in continuous mode.

Note that a task deciding to stop can happen in both modes, either when a
task is exhausted in micro-batch or when a stream needs to be reconfigured
in continuous.

Here’s the task reader API. The offset returned is optional so that a task
can avoid stopping if there isn’t a resumeable offset, like if it is in the
middle of an input file:

interface StreamPartitionReader<T> extends InputPartitionReader<T> {
  Optional<LocalOffset> currentOffset();
  boolean next() // from InputPartitionReader
  T get()        // from InputPartitionReader
}

The streaming code would look something like this:

Stream stream = scan.toStream()
StreamReaderFactory factory = stream.createReaderFactory()

while (true) {
  Offset start = stream.currentOffset()
  Offset end = if (isContinuousMode) {
    None
  } else {
    // rate limiting would happen here
    Some(stream.latestOffset())
  }

  InputPartition[] parts = stream.planInputPartitions(start)

  // returns when needsReconfiguration is true or all tasks finish
  runTasks(parts, factory, end)

  // the stream's current offset has been updated at the last epoch
}

-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to