xloya opened a new issue, #5226:
URL: https://github.com/apache/gravitino/issues/5226

   ### Describe the feature
   
   In the current integration of Fileset and Spark, we can already read / write 
the data under Fileset in the form of gvfs path which file format supported by 
Spark:
   ```
   // Dataframe API
   val df = sparkSession
     .read
     .parquet("gvfs://fileset/fileset_catalog/tmp/test_fileset/date=20241021")
   
   df.write
     .mode("overwrite")
     .parquet("gvfs://fileset/fileset_catalog/tmp/test_fileset/date=20241022")
   
   // SQL API
   SELECT * FROM `parquet`.`gvfs://fileset/fileset_catalog/tmp/test_fileset` 
WHERE date = 20241022
   
   INSERT OVERWRITE DIRECTORY 
'gvfs://fileset/fileset_catalog/tmp/test_fileset/date=20241023' USING parquet 
OPTIONS (col1 'name', col2 'age', col3 'test') SELECT * FROM test_table;
   ```
   In fact, Databricks also handles Volume in this way: 
https://docs.databricks.com/en/sql/language-manual/sql-ref-volumes.html.
   
   ### Motivation
   
   But there is two problems here:
   1. If users want to read a specific file format that is not officially 
supported by Spark under Fileset through SQL, there is currently no way.
   2. If the file format does not support schema inference, additional schema 
metadata is required.
   
   ### Describe the solution
   
   My idea is that we should consider providing a unified Fileset data source 
which extends the Spark FileDatasourceV2 interface. For the file formats 
currently supported by Spark, we can reuse the relevant logic (such as 
parquet/csv/json, etc.); for user-defined file formats, users can extend and 
implement them themselves (such as sequence file, tfrecord, etc.). 
   Then users can access the Fileset data source in a unified manner through 
the following forms. The specific file format and schema are obtained and 
routed through the Fileset metadata:
   ```
   // Dataframe API
   val df = sparkSession
     .read
     .format("fileset")
     .load("gvfs://fileset/fileset_catalog/tmp/test_fileset")
     .where("date=20241021")
   
   df.write()
       .format("fileset")
       .save("gvfs://fileset/fileset_catalog/tmp/test_fileset/date=20241022");
   
   // SQL API
   SELECT * FROM `fileset`.`gvfs://fileset/fileset_catalog/tmp/test_fileset` 
WHERE date = 20241022
   
   INSERT OVERWRITE DIRECTORY 
'gvfs://fileset/fileset_catalog/tmp/test_fileset/date=20241023' USING fileset 
SELECT * FROM test_table;
   ```
   At the same time, since we have a unified read and write data source, users 
can control when reading data, serializing and deserializing it, and writing it 
to files. One production case in Xiaomi it that we need to serialize and 
deserialize it through thrift class when reading and writing sequence files.
   
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to