On an unrelated note, is there any appetite for making the write path also
include an option to return elements that were not
able to be processed for some reason.

Usage might be like

saveAndIgnoreFailures() : Dataset

So that if some records cannot be parsed by the datasource for writing, or
violate some contract with the datasource the records can be returned for
further processing or dealt with by an alternate system.

On Wed, Sep 27, 2017 at 12:40 PM Ryan Blue <rb...@netflix.com.invalid>
wrote:

> Comments inline. I've written up what I'm proposing with a bit more detail.
>
> On Tue, Sep 26, 2017 at 11:17 AM, Wenchen Fan <cloud0...@gmail.com> wrote:
>
>> I'm trying to give a summary:
>>
>> Ideally data source API should only deal with data, not metadata. But one
>> key problem is, Spark still need to support data sources without metastore,
>> e.g. file format data sources.
>>
>> For this kind of data sources, users have to pass the metadata
>> information like partitioning/bucketing to every write action of a
>> "table"(or other identifiers like path of a file format data source), and
>> it's user's responsibility to make sure these metadata information are
>> consistent. If it's inconsistent, the behavior is undefined, different data
>> sources may have different behaviors.
>>
>
> Agreed so far. One minor point is that we currently throws an exception if
> you try to configure, for example, partitioning and also use `insertInto`.
>
>
>> If we agree on this, then data source write API should have a way to pass
>> these metadata information, and I think using data source options is a good
>> choice because it's the most implicit way and doesn't require new APIs.
>>
>
> What I don't understand is why we "can't avoid this problem" unless you
> mean the last point, that we have to support this. I don't think that using
> data source options is a good choice, but maybe I don't understand the
> alternatives. Here's a straw-man version of what I'm proposing so you can
> tell me what's wrong with it or why options are a better choice.
>
> I'm assuming we start with a query like this:
> ```
>
> df.write.partitionBy("utc_date").bucketBy("primary_key").format("parquet").saveAsTable("s3://bucket/path/")
> ```
>
> That creates a logical node, `CreateTableAsSelect`, with some options. It
> would contain a `Relation` (or `CatalogTable` definition?) that corresponds
> to the user's table name and `partitionBy`, `format`, etc. calls. It should
> also have a write mode and the logical plan for `df`.
>
> When this CTAS logical node is turned into a physical plan, the relation
> gets turned into a `DataSourceV2` instance and then Spark gets a writer and
> configures it with the proposed API. The main point of this is to pass the
> logical relation (with all of the user's options) through to the data
> source, not the writer. The data source creates the writer and can tell the
> writer what to do. Another benefit of this approach is that the relation
> gets resolved during analysis, when it is easy to add sorts and other
> requirements to the logical plan.
>
> If we were to implement what I'm suggesting, then we could handle metadata
> conflicts outside of the `DataSourceV2Writer`, in the data source. That
> eliminates problems about defining behavior when there are conflicts (the
> next point) and prepares implementations for a catalog API that would
> standardize how those conflicts are handled. In the short term, this
> doesn't have to be in a public API yet. It can be special handling for
> HadoopFS relations that we can eventually use underneath a public API.
>
> Please let me know if I've misunderstood something. Now that I've written
> out how we could actually implement conflict handling outside of the
> writer, I can see that it isn't as obvious of a change as I thought. But, I
> think in the long term this would be a better way to go.
>
>
>> But then we have another problem: how to define the behavior for data
>> sources with metastore when the given options contain metadata information?
>> A typical case is `DataFrameWriter.saveAsTable`, when a user calls it with
>> partition columns, he doesn't know what will happen. The table may not
>> exist and he may create the table successfully with specified partition
>> columns, or the table already exist but has inconsistent partition columns
>> and Spark throws exception. Besides, save mode doesn't play well in this
>> case, as we may need different save modes for data and metadata.
>>
>> My proposal: data source API should only focus on data, but concrete data
>> sources can implement some dirty features via options. e.g. file format
>> data sources can take partitioning/bucketing from options, data source with
>> metastore can use a special flag in options to indicate a create table
>> command(without writing data).
>>
>
> I can see how this would make changes smaller, but I don't think it is a
> good thing to do. If we do this, then I think we will not really accomplish
> what we want to with this (a clean write API).
>
>
>> In other words, Spark connects users to data sources with a clean
>> protocol that only focus on data, but this protocol has a backdoor: the
>> data source options. Concrete data sources are free to define how to deal
>> with metadata, e.g. Cassandra data source can ask users to create table at
>> Cassandra side first, then write data at Spark side, or ask users to
>> provide more details in options and do CTAS at Spark side. These can be
>> done via options.
>>
>> After catalog federation, hopefully only file format data sources still
>> use this backdoor.
>>
>
> Why would file format sources use a back door after catalog federation??
>
> rb
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to