a2l007 opened a new issue, #13923:
URL: https://github.com/apache/druid/issues/13923
### Introduction
Apache Iceberg is a high-performance format for large analytic tables which
presently has well-supported connectors for Spark, Flink, Trino, Hive and
Impala. For ingesting Iceberg tables into Druid, the existing mechanism is
either:
- using a separate workflow to scan the iceberg catalog, identify the
underlying data file paths followed by submitting a Druid ingestion spec with
these paths
- or querying the Iceberg catalog using Trino/Spark SQL and then writing
the data to a separate store for Druid to ingest it.
### Description
This issue intends to propose a Druid extension that ingests Iceberg tables
into Druid. Given a catalog, Iceberg table name and namespace, the extension
creates an Iceberg driver that connects to the configured catalog, initiates an
iceberg table scan and fetches the underlying plan files for the latest
snapshot. This retrieves a list of data files which are essentially Parquet,
AVRO or ORC files which are then fed into an `InputSource` and native batch
indexing takes care of the rest.
To decouple the iceberg catalog reads from the inputsources, the proposal is
to have an `InputSelector` that provides extensible APIs to connect to iceberg
catalogs. The `InputSelector` interface could look like this:
```
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property =
InputFormat.TYPE_PROPERTY)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = DefaultInputSelector.TYPE_KEY, value =
DefaultInputSelector.class)
})
public interface InputSelector
{
List<String> chooseFiles();
}
```
The proposed extension is an implementation of the `InputSelector` interface
that connects to the configured catalog and retrieves the data files as
mentioned above.
Each `InputSource` would then have a handler method that implements what it
wants to do with the data files provided by the `InputSelector` and this
handler is called by `AbstractBatchIndexTask` before constructing the input
source reader. The `InputSource` would essentially mutate its list of paths by
appending the file paths provided by the `InputSelector`.
The native ingestion spec would look something like this:
```
"ioConfig": {
"type": "index_parallel",
"inputSelector": {
"type": "hive",
"tableName": "logs",
"namespace": "webapp",
"partitionColumn": "event_time",
"intervals": ["2023-01-26T00:00:00.000Z/2023-02-18T00:00:00.000Z"]
},
"inputFormat": {
"type": "parquet"
},
"inputSource": {
"type": "hdfs"
}
}
```
The inputSelector optionally accepts a timestamp based partition column
which can be used to filter the range of iceberg data files being scanned.
At its present state, the iceberg driver always reads from the latest table
snapshot and only supports reading from tables that handles row creates and/or
updates.
**Features yet to be evaluated/implemented:**
- MSQ integration
- Iceberg deletes and delete file handling
- Reads from older iceberg snapshots
- Support for filtering on multiple partition columns
While this extension has been working as expected in internal clusters, this
proposal is to understand if there are major design concerns and also to gauge
if there is any interest in the extension in its present state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]