> I think it is a bad idea to let this problem leak into the new storage
API.

Well, I think using data source options is a good compromise for this. We
can't avoid this problem until catalog federation is done, and this may not
happen within Spark 2.3, but we definitely need data source write API in
Spark 2.3.

> Why can't we use an in-memory catalog to store the configuration of
HadoopFS tables?

We still need to support existing Spark applications which have
`df.write.partitionBy(...).parquet(...)`. And I think it's similar to
`DataFrameWrier.path`, according to your theory, we should not leak `path`
to the storage API too, but we don't have other solutions for Hadoop FS
data sources.


Eventually I think only Hadoop FS data sources need to take these special
options, but for now data sources that want to support
partitioning/bucketing need to take these special options too.


On Tue, Sep 26, 2017 at 4:36 AM, Ryan Blue <rb...@netflix.com> wrote:

> I think it is a bad idea to let this problem leak into the new storage
> API. By not setting the expectation that metadata for a table will exist,
> this will needlessly complicate writers just to support the existing
> problematic design. Why can't we use an in-memory catalog to store the
> configuration of HadoopFS tables? I see no compelling reason why this needs
> to be passed into the V2 write API.
>
> If this is limited to an implementation hack for the Hadoop FS writers,
> then I guess that's not terrible. I just don't understand why it is
> necessary.
>
> On Mon, Sep 25, 2017 at 11:26 AM, Wenchen Fan <cloud0...@gmail.com> wrote:
>
>> Catalog federation is to publish the Spark catalog API(kind of a data
>> source API for metadata), so that Spark is able to read/write metadata from
>> external systems. (SPARK-15777)
>>
>> Currently Spark can only read/write Hive metastore, which means for other
>> systems like Cassandra, we can only implicitly create tables with data
>> source API.
>>
>> Again this is not ideal but just a workaround before we finish catalog
>> federation. That's why the save mode description mostly refer to how data
>> will be handled instead of metadata.
>>
>> Because of this, I think we still need to pass metadata like
>> partitioning/bucketing to the data source write API. And I propose to use
>> data source options so that it's not at API level and we can easily ignore
>> these options in the future if catalog federation is done.
>>
>> The same thing applies to Hadoop FS data sources, we need to pass
>> metadata to the writer anyway.
>>
>>
>>
>> On Tue, Sep 26, 2017 at 1:08 AM, Ryan Blue <rb...@netflix.com> wrote:
>>
>>> However, without catalog federation, Spark doesn’t have an API to ask an
>>> external system(like Cassandra) to create a table. Currently it’s all done
>>> by data source write API. Data source implementations are responsible to
>>> create or insert a table according to the save mode.
>>>
>>> What’s catalog federation? Is there a SPIP for it? It sounds
>>> straight-forward based on your comments, but I’d rather make sure we’re
>>> talking about the same thing.
>>>
>>> What I’m proposing doesn’t require a change to either the public API,
>>> nor does it depend on being able to create tables. Why do writers
>>> necessarily need to create tables? I think other components (e.g. a
>>> federated catalog) should manage table creation outside of this
>>> abstraction. Just because data sources currently create tables doesn’t mean
>>> that we are tied to that implementation.
>>>
>>> I would also disagree that data source implementations are responsible
>>> for creating for inserting according to save mode. The modes are “append”,
>>> “overwrite”, “failIfExists” and “ignore”, and the descriptions indicate to
>>> me that the mode refers to how *data* will be handled, not table
>>> metadata. Overwrite’s docs
>>> <https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/SaveMode.java#L37>
>>> state that “existing *data* is expected to be overwritten.”
>>>
>>> Save mode currently introduces confusion because it isn’t clear whether
>>> the mode applies to tables or to writes. In Hive, overwrite removes
>>> conflicting partitions, but I think the Hadoop FS relations will delete
>>> tables. We get around this some by using external tables and preserving
>>> data, but this is an area where we should have clear semantics for external
>>> systems like Cassandra. I’d like to see a cleaner public API that separates
>>> these concerns, but that’s a different discussion. For now, I don’t think
>>> requiring that a table exists is unreasonable. If a table has no metastore
>>> (Hadoop FS tables) then we can just pass the table metadata in when
>>> creating the writer since there is no existence in this case.
>>>
>>> rb
>>> ​
>>>
>>> On Sun, Sep 24, 2017 at 7:17 PM, Wenchen Fan <cloud0...@gmail.com>
>>> wrote:
>>>
>>>> I agree it would be a clean approach if data source is only responsible
>>>> to write into an already-configured table. However, without catalog
>>>> federation, Spark doesn't have an API to ask an external system(like
>>>> Cassandra) to create a table. Currently it's all done by data source write
>>>> API. Data source implementations are responsible to create or insert a
>>>> table according to the save mode.
>>>>
>>>> As a workaround, I think it's acceptable to pass partitioning/bucketing
>>>> information via data source options, and data sources should decide to take
>>>> these informations and create the table, or throw exception if these
>>>> informations don't match the already-configured table.
>>>>
>>>>
>>>> On Fri, Sep 22, 2017 at 9:35 AM, Ryan Blue <rb...@netflix.com> wrote:
>>>>
>>>>> > input data requirement
>>>>>
>>>>> Clustering and sorting within partitions are a good start. We can
>>>>> always add more later when they are needed.
>>>>>
>>>>> The primary use case I'm thinking of for this is partitioning and
>>>>> bucketing. If I'm implementing a partitioned table format, I need to tell
>>>>> Spark to cluster by my partition columns. Should there also be a way to
>>>>> pass those columns separately, since they may not be stored in the same 
>>>>> way
>>>>> like partitions are in the current format?
>>>>>
>>>>> On Wed, Sep 20, 2017 at 3:10 AM, Wenchen Fan <cloud0...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I want to have some discussion about Data Source V2 write path before
>>>>>> starting a voting.
>>>>>>
>>>>>> The Data Source V1 write path asks implementations to write a
>>>>>> DataFrame directly, which is painful:
>>>>>> 1. Exposing upper-level API like DataFrame to Data Source API is not
>>>>>> good for maintenance.
>>>>>> 2. Data sources may need to preprocess the input data before writing,
>>>>>> like cluster/sort the input by some columns. It's better to do the
>>>>>> preprocessing in Spark instead of in the data source.
>>>>>> 3. Data sources need to take care of transaction themselves, which is
>>>>>> hard. And different data sources may come up with a very similar approach
>>>>>> for the transaction, which leads to many duplicated codes.
>>>>>>
>>>>>>
>>>>>> To solve these pain points, I'm proposing a data source writing
>>>>>> framework which is very similar to the reading framework, i.e.,
>>>>>> WriteSupport -> DataSourceV2Writer -> WriteTask -> DataWriter. You can 
>>>>>> take
>>>>>> a look at my prototype to see what it looks like:
>>>>>> https://github.com/apache/spark/pull/19269
>>>>>>
>>>>>> There are some other details need further discussion:
>>>>>> 1. *partitioning/bucketing*
>>>>>> Currently only the built-in file-based data sources support them, but
>>>>>> there is nothing stopping us from exposing them to all data sources. One
>>>>>> question is, shall we make them as mix-in interfaces for data source v2
>>>>>> reader/writer, or just encode them into data source options(a
>>>>>> string-to-string map)? Ideally it's more like options, Spark just 
>>>>>> transfers
>>>>>> these user-given informations to data sources, and doesn't do anything 
>>>>>> for
>>>>>> it.
>>>>>>
>>>>>> 2. *input data requirement*
>>>>>> Data sources should be able to ask Spark to preprocess the input
>>>>>> data, and this can be a mix-in interface for DataSourceV2Writer. I think 
>>>>>> we
>>>>>> need to add clustering request and sorting within partitions request, any
>>>>>> more?
>>>>>>
>>>>>> 3. *transaction*
>>>>>> I think we can just follow `FileCommitProtocol`, which is the
>>>>>> internal framework Spark uses to guarantee transaction for built-in
>>>>>> file-based data sources. Generally speaking, we need task level and job
>>>>>> level commit/abort. Again you can see more details in my prototype about
>>>>>> it: https://github.com/apache/spark/pull/19269
>>>>>>
>>>>>> 4. *data source table*
>>>>>> This is the trickiest one. In Spark you can create a table which
>>>>>> points to a data source, so you can read/write this data source easily by
>>>>>> referencing the table name. Ideally data source table is just a pointer
>>>>>> which points to a data source with a list of predefined options, to save
>>>>>> users from typing these options again and again for each query.
>>>>>> If that's all, then everything is good, we don't need to add more
>>>>>> interfaces to Data Source V2. However, data source tables provide special
>>>>>> operators like ALTER TABLE SCHEMA, ADD PARTITION, etc., which requires 
>>>>>> data
>>>>>> sources to have some extra ability.
>>>>>> Currently these special operators only work for built-in file-based
>>>>>> data sources, and I don't think we will extend it in the near future, I
>>>>>> propose to mark them as out of the scope.
>>>>>>
>>>>>>
>>>>>> Any comments are welcome!
>>>>>> Thanks,
>>>>>> Wenchen
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to