I think there's a difference. You're right that we wanted to clean up the API in V2 to avoid file sources using side channels. But there's a big difference between adding, for example, a way to report partitioning and designing for sources that need unbounded state. It's a judgment call, but I think unbounded state is definitely not something that we should design around. Another way to think about it: yes, we want to design a better API using existing sources as guides, but we don't need to assume that everything those sources do should to be supported. It is reasonable to say that this is a case we don't want to design for and the source needs to change. Why can't we use a high watermark of files' modified timestamps?
For most sources, I think Spark should handle state serialization and recovery. Maybe we can find a good way to make the file source with unbounded state work, but this shouldn't be one of the driving cases for the design and consequently a reason for every source to need to manage its own state in a checkpoint directory. rb On Mon, Apr 30, 2018 at 12:37 PM, Joseph Torres < joseph.tor...@databricks.com> wrote: > I'd argue that letting bad cases influence the design is an explicit goal > of DataSourceV2. One of the primary motivations for the project was that > file sources hook into a series of weird internal side channels, with > favorable performance characteristics that are difficult to match in the > API we actually declare to Spark users. So a design that we can't migrate > file sources to without a side channel would be worrying; won't we end up > regressing to the same situation? > > On Mon, Apr 30, 2018 at 11:59 AM, Ryan Blue <rb...@netflix.com> wrote: > >> Should we really plan the API for a source with state that grows >> indefinitely? It sounds like we're letting a bad case influence the >> design, when we probably shouldn't. >> >> On Mon, Apr 30, 2018 at 11:05 AM, Joseph Torres < >> joseph.tor...@databricks.com> wrote: >> >>> Offset is just a type alias for arbitrary JSON-serializable state. Most >>> implementations should (and do) just toss the blob at Spark and let Spark >>> handle recovery on its own. >>> >>> In the case of file streams, the obstacle is that the conceptual offset >>> is very large: a list of every file which the stream has ever read. In >>> order to parse this efficiently, the stream connector needs detailed >>> control over how it's stored; the current implementation even has complex >>> compactification and retention logic. >>> >>> >>> On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rb...@netflix.com> wrote: >>> >>>> Why don't we just have the source return a Serializable of state when >>>> it reports offsets? Then Spark could handle storing the source's state and >>>> the source wouldn't need to worry about file system paths. I think that >>>> would be easier for implementations and better for recovery because it >>>> wouldn't leave unknown state on a single machine's file system. >>>> >>>> rb >>>> >>>> On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres < >>>> joseph.tor...@databricks.com> wrote: >>>> >>>>> The precise interactions with the DataSourceV2 API haven't yet been >>>>> hammered out in design. But much of this comes down to the core of >>>>> Structured Streaming rather than the API details. >>>>> >>>>> The execution engine handles checkpointing and recovery. It asks the >>>>> streaming data source for offsets, and then determines that batch N >>>>> contains the data between offset A and offset B. On recovery, if batch N >>>>> needs to be re-run, the execution engine just asks the source for the same >>>>> offset range again. Sources also get a handle to their own subfolder of >>>>> the >>>>> checkpoint, which they can use as scratch space if they need. For example, >>>>> Spark's FileStreamReader keeps a log of all the files it's seen, so its >>>>> offsets can be simply indices into the log rather than huge strings >>>>> containing all the paths. >>>>> >>>>> SPARK-23323 is orthogonal. That commit coordinator is responsible for >>>>> ensuring that, within a single Spark job, two different tasks can't commit >>>>> the same partition. >>>>> >>>>> On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh < >>>>> jthak...@conversantmedia.com> wrote: >>>>> >>>>>> Wondering if this issue is related to SPARK-23323? >>>>>> >>>>>> >>>>>> >>>>>> Any pointers will be greatly appreciated…. >>>>>> >>>>>> >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jayesh >>>>>> >>>>>> >>>>>> >>>>>> *From: *"Thakrar, Jayesh" <jthak...@conversantmedia.com> >>>>>> *Date: *Monday, April 23, 2018 at 9:49 PM >>>>>> *To: *"dev@spark.apache.org" <dev@spark.apache.org> >>>>>> *Subject: *Datasource API V2 and checkpointing >>>>>> >>>>>> >>>>>> >>>>>> I was wondering when checkpointing is enabled, who does the actual >>>>>> work? >>>>>> >>>>>> The streaming datasource or the execution engine/driver? >>>>>> >>>>>> >>>>>> >>>>>> I have written a small/trivial datasource that just generates strings. >>>>>> >>>>>> After enabling checkpointing, I do see a folder being created under >>>>>> the checkpoint folder, but there's nothing else in there. >>>>>> >>>>>> >>>>>> >>>>>> Same question for write-ahead and recovery? >>>>>> >>>>>> And on a restart from a failed streaming session - who should set the >>>>>> offsets? >>>>>> >>>>>> The driver/Spark or the datasource? >>>>>> >>>>>> >>>>>> >>>>>> Any pointers to design docs would also be greatly appreciated. >>>>>> >>>>>> >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jayesh >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >>>> >>> >>> >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > > -- Ryan Blue Software Engineer Netflix