I think there's a difference. You're right that we wanted to clean up the
API in V2 to avoid file sources using side channels. But there's a big
difference between adding, for example, a way to report partitioning and
designing for sources that need unbounded state. It's a judgment call, but
I think unbounded state is definitely not something that we should design
around. Another way to think about it: yes, we want to design a better API
using existing sources as guides, but we don't need to assume that
everything those sources do should to be supported. It is reasonable to say
that this is a case we don't want to design for and the source needs to
change. Why can't we use a high watermark of files' modified timestamps?

For most sources, I think Spark should handle state serialization and
recovery. Maybe we can find a good way to make the file source with
unbounded state work, but this shouldn't be one of the driving cases for
the design and consequently a reason for every source to need to manage its
own state in a checkpoint directory.

rb

On Mon, Apr 30, 2018 at 12:37 PM, Joseph Torres <
joseph.tor...@databricks.com> wrote:

> I'd argue that letting bad cases influence the design is an explicit goal
> of DataSourceV2. One of the primary motivations for the project was that
> file sources hook into a series of weird internal side channels, with
> favorable performance characteristics that are difficult to match in the
> API we actually declare to Spark users. So a design that we can't migrate
> file sources to without a side channel would be worrying; won't we end up
> regressing to the same situation?
>
> On Mon, Apr 30, 2018 at 11:59 AM, Ryan Blue <rb...@netflix.com> wrote:
>
>> Should we really plan the API for a source with state that grows
>> indefinitely? It sounds like we're letting a bad case influence the
>> design, when we probably shouldn't.
>>
>> On Mon, Apr 30, 2018 at 11:05 AM, Joseph Torres <
>> joseph.tor...@databricks.com> wrote:
>>
>>> Offset is just a type alias for arbitrary JSON-serializable state. Most
>>> implementations should (and do) just toss the blob at Spark and let Spark
>>> handle recovery on its own.
>>>
>>> In the case of file streams, the obstacle is that the conceptual offset
>>> is very large: a list of every file which the stream has ever read. In
>>> order to parse this efficiently, the stream connector needs detailed
>>> control over how it's stored; the current implementation even has complex
>>> compactification and retention logic.
>>>
>>>
>>> On Mon, Apr 30, 2018 at 10:48 AM, Ryan Blue <rb...@netflix.com> wrote:
>>>
>>>> Why don't we just have the source return a Serializable of state when
>>>> it reports offsets? Then Spark could handle storing the source's state and
>>>> the source wouldn't need to worry about file system paths. I think that
>>>> would be easier for implementations and better for recovery because it
>>>> wouldn't leave unknown state on a single machine's file system.
>>>>
>>>> rb
>>>>
>>>> On Fri, Apr 27, 2018 at 9:23 AM, Joseph Torres <
>>>> joseph.tor...@databricks.com> wrote:
>>>>
>>>>> The precise interactions with the DataSourceV2 API haven't yet been
>>>>> hammered out in design. But much of this comes down to the core of
>>>>> Structured Streaming rather than the API details.
>>>>>
>>>>> The execution engine handles checkpointing and recovery. It asks the
>>>>> streaming data source for offsets, and then determines that batch N
>>>>> contains the data between offset A and offset B. On recovery, if batch N
>>>>> needs to be re-run, the execution engine just asks the source for the same
>>>>> offset range again. Sources also get a handle to their own subfolder of 
>>>>> the
>>>>> checkpoint, which they can use as scratch space if they need. For example,
>>>>> Spark's FileStreamReader keeps a log of all the files it's seen, so its
>>>>> offsets can be simply indices into the log rather than huge strings
>>>>> containing all the paths.
>>>>>
>>>>> SPARK-23323 is orthogonal. That commit coordinator is responsible for
>>>>> ensuring that, within a single Spark job, two different tasks can't commit
>>>>> the same partition.
>>>>>
>>>>> On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
>>>>> jthak...@conversantmedia.com> wrote:
>>>>>
>>>>>> Wondering if this issue is related to SPARK-23323?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Any pointers will be greatly appreciated….
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jayesh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *"Thakrar, Jayesh" <jthak...@conversantmedia.com>
>>>>>> *Date: *Monday, April 23, 2018 at 9:49 PM
>>>>>> *To: *"dev@spark.apache.org" <dev@spark.apache.org>
>>>>>> *Subject: *Datasource API V2 and checkpointing
>>>>>>
>>>>>>
>>>>>>
>>>>>> I was wondering when checkpointing is enabled, who does the actual
>>>>>> work?
>>>>>>
>>>>>> The streaming datasource or the execution engine/driver?
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have written a small/trivial datasource that just generates strings.
>>>>>>
>>>>>> After enabling checkpointing, I do see a folder being created under
>>>>>> the checkpoint folder, but there's nothing else in there.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Same question for write-ahead and recovery?
>>>>>>
>>>>>> And on a restart from a failed streaming session - who should set the
>>>>>> offsets?
>>>>>>
>>>>>> The driver/Spark or the datasource?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Any pointers to design docs would also be greatly appreciated.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jayesh
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to