Should we really plan the API for a source with state that grows indefinitely? It sounds like we're letting a bad case influence the design, when we probably shouldn't.
On Mon, Apr 30, 2018 at 11:05 AM, Joseph Torres < joseph.tor...@databricks.com> wrote: > Offset is just a type alias for arbitrary JSON-serializable state. Most > implementations should (and do) just toss the blob at Spark and let Spark > handle recovery on its own. > > In the case of file streams, the obstacle is that the conceptual offset is > very large: a list of every file which the stream has ever read. In order > to parse this efficiently, the stream connector needs detailed control over > how it's stored; the current implementation even has complex > compactification and retention logic. > > > On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rb...@netflix.com> wrote: > >> Why don't we just have the source return a Serializable of state when it >> reports offsets? Then Spark could handle storing the source's state and the >> source wouldn't need to worry about file system paths. I think that would >> be easier for implementations and better for recovery because it wouldn't >> leave unknown state on a single machine's file system. >> >> rb >> >> On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres < >> joseph.tor...@databricks.com> wrote: >> >>> The precise interactions with the DataSourceV2 API haven't yet been >>> hammered out in design. But much of this comes down to the core of >>> Structured Streaming rather than the API details. >>> >>> The execution engine handles checkpointing and recovery. It asks the >>> streaming data source for offsets, and then determines that batch N >>> contains the data between offset A and offset B. On recovery, if batch N >>> needs to be re-run, the execution engine just asks the source for the same >>> offset range again. Sources also get a handle to their own subfolder of the >>> checkpoint, which they can use as scratch space if they need. For example, >>> Spark's FileStreamReader keeps a log of all the files it's seen, so its >>> offsets can be simply indices into the log rather than huge strings >>> containing all the paths. >>> >>> SPARK-23323 is orthogonal. That commit coordinator is responsible for >>> ensuring that, within a single Spark job, two different tasks can't commit >>> the same partition. >>> >>> On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh < >>> jthak...@conversantmedia.com> wrote: >>> >>>> Wondering if this issue is related to SPARK-23323? >>>> >>>> >>>> >>>> Any pointers will be greatly appreciated…. >>>> >>>> >>>> >>>> Thanks, >>>> >>>> Jayesh >>>> >>>> >>>> >>>> *From: *"Thakrar, Jayesh" <jthak...@conversantmedia.com> >>>> *Date: *Monday, April 23, 2018 at 9:49 PM >>>> *To: *"dev@spark.apache.org" <dev@spark.apache.org> >>>> *Subject: *Datasource API V2 and checkpointing >>>> >>>> >>>> >>>> I was wondering when checkpointing is enabled, who does the actual work? >>>> >>>> The streaming datasource or the execution engine/driver? >>>> >>>> >>>> >>>> I have written a small/trivial datasource that just generates strings. >>>> >>>> After enabling checkpointing, I do see a folder being created under the >>>> checkpoint folder, but there's nothing else in there. >>>> >>>> >>>> >>>> Same question for write-ahead and recovery? >>>> >>>> And on a restart from a failed streaming session - who should set the >>>> offsets? >>>> >>>> The driver/Spark or the datasource? >>>> >>>> >>>> >>>> Any pointers to design docs would also be greatly appreciated. >>>> >>>> >>>> >>>> Thanks, >>>> >>>> Jayesh >>>> >>>> >>>> >>> >>> >> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > > -- Ryan Blue Software Engineer Netflix