Below are my notes from the sync yesterday. Thanks to everyone that participated! For the format of this sync, I think it would help to make a small change. Since we have so many people and it take a long time to introduce everyone, let’s try to get to the content faster by not doing the round of introductions and topic gathering. Instead, please send your topics to the sync thread on this list ahead of time. Just make sure I have them and I’ll add them to the invite and agenda, along with any links for background.
I also want to add a quick note about the live stream. After running a couple of tests, it looks like live streams only work within an organization. In the future, I won’t add a live stream since no one but people from Netflix can join. Last, here are the notes: *Attendees* Ryan Blue - Netflix John Zhuge - Netflix Yuanjian Li - Baidu - Interested in Catalog API Felix Cheung - Uber Hyukjin Kwon - Hortonworks Vinoo Ganesh - Palantir Soumya Sanyal - ? Bruce Robbins - Cloudera Alessandro Bellina - Oath, here to learn Jamison Bennett - Cloudera - Interested in Catalog API Anton Okolnychyi - Apple Gengliang Wang - DataBricks - ORC source Wenchen Fan - DataBricks Dilip Biswal - IBM - Push-down of new operators like limit Kevin Yu - IBM Matt Cheah - Palantir - Interested in CBO Austin Nobis - Cloudera Jungtaek Lim - Hortonworks - Interested in exactly-once semantics Vikram Agrawal - Custom metrics Sribasti Chakravarti *Suggested Topics* - DSv2 API changes - New proposal - Alternative #1: Combining Scan with Batch or Stream - Alternative #2: Combining micro-batch and continuous APIs - Capabilities API - CatalogTableIdentifier - Push-down API - CBO and stats API - Exactly-once semantics *Discussion* The entire discussion was about the DSv2 API changes in Wenchen’s design doc. - Wenchen went through the current status and the new proposal. - Not many questions, the API and behavior are clear and understandable. - Some discussion, started by Dilip about how join push-down will work. Ryan noted that we just need to make sure that the design doesn’t preclude reasonable options for later. Wenchen suggested one such option, to add methods to push a join into the ScanBuilder. It isn’t clear how exactly this will work, but consensus seemed to be that this will not break later designs. Dilip has a join push-down design doc (please reply with a link!). - Consensus was to go with the new proposal. - Wenchen went through alternative #1, which merges Scan into the next layer to produce BatchScan, MicroBatchStreamScan, ContinuousStreamScan - Ryan’s commentary: concerned that Scan is a distinct concept and may be useful in implementations. Would merging it into other objects cause duplication or force an inheritance hierarchy? Clearly, the names show that it is mixing two concepts: BatchScan = Batch + Scan - Matt commented that it seems unlikely that Scan will be independently useful - Wenchen noted that we can merge later if it isn’t useful - Ryan noted that separate interfaces give the most flexibility for implementations. An implementation can create BatchScan that extends both. - Conclusion: keep the interfaces separate for now and reassess later. - Ryan went through alternative #2, which merges micro-batch and continuous read interfaces - To merge execution code, Spark would be responsible for stopping tasks. Tasks would attempt to read forever and Spark determines whether to run a batch or run forever. - Some tasks are naturally limited, like data files added to a table. Spark would need to handle tasks stopping themselves early. - Some tasks are naturally boundless, like Kafka topic partitions. Tasks would need to provide offsets for Spark to decide when to stop reading. - The resulting task reader behavior is awkward and no longer fits either naturally limited (must provide “offset”) nor naturally boundless tasks (why stop early? why use micro-batch?) - Conclusion was to have simpler APIs by keeping modes separate. On Tue, Nov 13, 2018 at 4:00 PM Ryan Blue <rb...@netflix.com> wrote: > Hi everyone, > I just wanted to send out a reminder that there’s a DSv2 sync tomorrow at > 17:00 PST, which is 01:00 UTC. > > Here are some of the topics under discussion in the last couple of weeks: > > - Read API for v2 - see Wenchen’s doc > > <https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?ts=5be4868a#heading=h.2h7sf1665hzn> > - Capabilities API - see the dev list thread > > <https://mail-archives.apache.org/mod_mbox/spark-dev/201811.mbox/%3CCAO4re1%3Doizqo1oFfVViK3bKWCp7MROeATXcWAEUY5%2B8Vpf6GGw%40mail.gmail.com%3E> > - Using CatalogTableIdentifier to reliably separate v2 code paths - > see PR #21978 <https://github.com/apache/spark/pull/21978> > - A replacement for InternalRow > > I know that a lot of people are also interested in combining the source > API for micro-batch and continuous streaming. Wenchen and I have been > discussing a way to do that and Wenchen has added it to the Read API doc as > Alternative #2. I think this would be a good thing to plan on discussing. > > rb > > Here’s some additional background on combining micro-batch and continuous > APIs: > > The basic idea is to update how tasks end so that the same tasks can be > used in micro-batch or streaming. For tasks that are naturally limited like > data files, when the data is exhausted, Spark stops reading. For tasks that > are not limited, like a Kafka partition, Spark decides when to stop in > micro-batch mode by hitting a pre-determined LocalOffset or Spark can just > keep running in continuous mode. > > Note that a task deciding to stop can happen in both modes, either when a > task is exhausted in micro-batch or when a stream needs to be reconfigured > in continuous. > > Here’s the task reader API. The offset returned is optional so that a task > can avoid stopping if there isn’t a resumeable offset, like if it is in the > middle of an input file: > > interface StreamPartitionReader<T> extends InputPartitionReader<T> { > Optional<LocalOffset> currentOffset(); > boolean next() // from InputPartitionReader > T get() // from InputPartitionReader > } > > The streaming code would look something like this: > > Stream stream = scan.toStream() > StreamReaderFactory factory = stream.createReaderFactory() > > while (true) { > Offset start = stream.currentOffset() > Offset end = if (isContinuousMode) { > None > } else { > // rate limiting would happen here > Some(stream.latestOffset()) > } > > InputPartition[] parts = stream.planInputPartitions(start) > > // returns when needsReconfiguration is true or all tasks finish > runTasks(parts, factory, end) > > // the stream's current offset has been updated at the last epoch > } > > -- > Ryan Blue > Software Engineer > Netflix > -- Ryan Blue Software Engineer Netflix