a2l007 opened a new issue, #13923:
URL: https://github.com/apache/druid/issues/13923

   ### Introduction
   Apache Iceberg is a high-performance format for large analytic tables which 
presently has well-supported connectors for Spark, Flink, Trino, Hive and 
Impala. For ingesting Iceberg tables into Druid, the existing mechanism is 
either:
   - using a separate workflow to scan the iceberg catalog, identify the 
underlying data file paths followed by submitting a Druid ingestion spec with 
these paths
   -  or querying the Iceberg catalog using Trino/Spark SQL and then writing 
the data to a separate store for Druid to ingest it.
   
   ### Description
   
   This issue intends to propose a Druid extension that ingests Iceberg tables 
into Druid. Given a catalog, Iceberg table name and namespace, the extension 
creates an Iceberg driver that connects to the configured catalog, initiates an 
iceberg table scan and fetches the underlying plan files for the latest 
snapshot. This retrieves a list of data files which are essentially Parquet, 
AVRO or ORC files which are then fed into an `InputSource` and native batch 
indexing takes care of the rest. 
   To decouple the iceberg catalog reads from the inputsources, the proposal is 
to have an `InputSelector` that provides extensible APIs to connect to iceberg 
catalogs. The `InputSelector` interface could look like this:
   
   
   ```
    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = 
InputFormat.TYPE_PROPERTY)
    @JsonSubTypes(value = {
        @JsonSubTypes.Type(name = DefaultInputSelector.TYPE_KEY, value = 
DefaultInputSelector.class)
    })
    public interface InputSelector
    {
      List<String> chooseFiles();
   
    }
   ```
   
   The proposed extension is an implementation of the `InputSelector` interface 
that connects to the configured catalog and retrieves the data files as 
mentioned above.
   Each `InputSource` would then have a handler method that implements what it 
wants to do with the data files provided by the `InputSelector` and this 
handler is called by `AbstractBatchIndexTask` before constructing the input 
source reader. The `InputSource` would essentially mutate its list of paths by 
appending the file paths provided by the `InputSelector`.
   
   The native ingestion spec would look something like this:
   ```
   "ioConfig": {
         "type": "index_parallel",
         "inputSelector": {
           "type": "hive",
           "tableName": "logs",
           "namespace": "webapp",
           "partitionColumn": "event_time",
           "intervals": ["2023-01-26T00:00:00.000Z/2023-02-18T00:00:00.000Z"]
         },
         "inputFormat": {
           "type": "parquet"
         },
         "inputSource": {
           "type": "hdfs"
         }
   }
   ```
   The inputSelector optionally accepts a timestamp based partition column 
which can be used to filter the range of iceberg data files being scanned.
   
   At its present state, the iceberg driver always reads from the latest table 
snapshot and only supports reading from tables that handles row creates and/or 
updates. 
   
   **Features yet to be evaluated/implemented:**
   - MSQ integration
   - Iceberg deletes and delete file handling
   - Reads from older iceberg snapshots
   - Support for filtering on multiple partition columns
   
   While this extension has been working as expected in internal clusters, this 
proposal is to understand if there are major design concerns and also to gauge 
if there is any interest in the extension in its present state.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to