a2l007 opened a new issue, #19190:
URL: https://github.com/apache/druid/issues/19190
## 1. Background and Motivation
### 1.1 Current State
The Druid Iceberg extension (`druid-iceberg-extensions`) supports ingesting
data from Iceberg
tables via `IcebergInputSource`. Internally it:
1. Connects to an Iceberg catalog (Hive, Glue, REST, or local)
2. Lists data files up to the current (or a specified) snapshot
3. Passes those raw file paths to a native Druid `InputSource` via
`warehouseSource`
4. Druid reads the Parquet files directly using the configured `InputFormat`
This works correctly for **Iceberg v1** tables because v1 only supports
append operations —
every file in the table is a live data file.
### 1.2 The Problem
**Iceberg v2** introduced row-level deletes. A v2 table can contain two
additional file types
alongside data files:
| File Type | Content | Purpose |
|-----------|---------|---------|
| **Positional Delete File** | `(file_path, row_position)` pairs | Delete
the row at position N in file F |
| **Equality Delete File** | Column value sets | Delete any row where column
values match |
When Druid reads an Iceberg v2 table today, it only reads the **data files**
and completely
ignores delete files. As a result, deleted rows are silently included in
ingested Druid segments.
### 1.3 Example
```
Snapshot 1 (append): data-001.parquet → rows: order_id=1, order_id=2,
order_id=3
Snapshot 2 (delete): eq-delete-001.parquet → "delete where order_id = 2"
```
**Today (v1-only behaviour):**
Druid ingests all three rows including `order_id=2`, which was deleted in
Iceberg.
**Expected (v2 behaviour):**
Druid should ingest only `order_id=1` and `order_id=3`.
---
## 2. Proposed Design
In the V2 path, the Iceberg input source is used to obtain `FileScanTask`
objects (data
file paths + associated delete file paths + table schema). That metadata is
serialized into the
per-worker task spec. Workers open the associated data files and applies the
position delete and equality delete files before converting the records to
Druid `InputRow`.
### 2.1 High-Level Architecture
#### Proposed (v2-aware) flow:
```
IcebergInputSource
│
├─► IcebergCatalog.extractFileScanTasksWithSchema()
│ └─► tableScan.planFiles() → List<FileScanTask> + table Schema
│
└─► Are there deletes?
│
├─NO──► [V1 path] extract paths → warehouseSource.create(paths)
(unchanged)
│
└─YES─► [V2 path] encode tasks as V2 splits
└─► creates a IcebergFileTaskInputSource object per
split:
carries: data file path, delete file paths,
serialized table schema,
warehouseSource (for worker-side
file access)
IcebergFileTaskInputSource
└─► IcebergNativeRecordReader
├─► Deserialize table schema from JSON
├─► Open files via WarehouseFileIO
├─► Read delete files, apply deletes row by row
└─► IcebergRecordConverter → MapBasedInputRow
```
### 2.2 Differentiate between V1/V2 during catalog read
At scan planning time, `IcebergInputSource.retrieveIcebergDatafiles()`
checks whether any
returned `FileScanTask` carries delete files. Based on this, the following
paths can be followed:
- **No delete files (V1 path):** File paths are extracted and handed to the
existing
`warehouseSource` input source — identical to today's behaviour.
- **Delete files present (V2 path):** Tasks are encoded as V2 splits, each
carrying the data
file metadata, delete file metadata, and the serialized table schema.
`warehouseSource` is
passed through to each `IcebergFileTaskInputSource` for use on the worker.
Please note that a v2-format table that has never had any rows deleted (no
delete files) goes through the V1
path.
### 2.3 Handling Delete files
A new `InputSourceReader` implementation: `IcebergNativeRecordReader` will
be used that reads each of the assigned Iceberg data file and applies any
associated position-delete and equality-delete files before converting records
to Druid `InputRow`s.
#### Step 1: Deserialize table schema
The Iceberg `Schema` is deserialized from the JSON string embedded in
`IcebergFileTaskInputSource` using `SchemaParser.fromJson(tableSchemaJson)`.
This is the schema
that was captured from `IcebergInputSource`.
#### Step 2: Collect positional deletes
Each positional delete file is read using Iceberg's
`DeleteSchemaUtil.pathPosSchema()`. Only rows matching the current data file
path are retained:
```
pos-delete-001.parquet:
file_path="data-001.parquet", pos=1
file_path="data-001.parquet", pos=3
file_path="data-002.parquet", pos=0 ← different file, ignored
Result: deletedPositions = {1, 3}
```
#### Step 3: Collect equality deletes
Each equality delete file is read using a schema projected to
its equality fields (identified by field IDs in the delete file metadata):
```
eq-delete-001.parquet (equality field: order_id, field ID=1):
order_id=1002
order_id=1005
Result: equalityDeleteSets = [ {fieldNames=["order_id"],
keys={{"order_id":1002}, {"order_id":1005}}} ]
```
#### Step 4: Stream data file and apply both delete sets
For each record:
1. If its 0-indexed position is in `deletedPositions` → skip.
2. Otherwise check each equality delete set → skip if matched.
3. Surviving records are passed to `IcebergRecordConverter`.
```
data-001.parquet (read via WarehouseFileIO):
pos=0 order_id=1001 → KEEP
pos=1 order_id=1002 → SKIP (positional delete)
pos=2 order_id=1003 → KEEP
pos=3 order_id=1004 → SKIP (positional delete)
pos=4 order_id=1005 → SKIP (equality delete)
Output: rows for order_id=1001 and order_id=1003
```
#### Step 5: Record-to-InputRow Conversion
`IcebergRecordConverter` converts an Iceberg `GenericRecord` to a Druid
`InputRow` in two steps:
1. **Record → Map**: iterate `Schema.columns()`, call
`record.getField(name)`, apply type
conversion.
2. **Map → InputRow**: use `TimestampSpec.extractTimestamp()` and
`DimensionsSpec` to build a
`MapBasedInputRow`. If the dimensions list is empty (auto mode), all
columns except the
timestamp column are used as dimensions.
---
No changes to the ingestion spec are required. Existing specs continue to
work unchanged.
`warehouseSource` is used actively in both the V1 and V2 paths.
---
## 3. Limitations
### 3.1 Materializing all records from Datafile in Memory per Task
`IcebergNativeRecordReader` currently materializes all surviving records
from a data file into a
`List<InputRow>` before returning the iterator. For very large data files
this increases worker
heap pressure. Streaming row-by-row can be a candidate for follow-up
optimization.
### 3.2 Already-Ingested Druid Segments Are Not Corrected
This proposal only fixes **new ingestion runs** which that rows deleted in
Iceberg are not
included when Druid reads the table going forward.
It does **not** correct Druid segments that are already ingested into Druid.
If a row was ingested into Druid at time T₁ and then deleted from Iceberg at
time T₂, the Druid segment written at T₁ still contains that row.
A full re-ingestion would do the right thing but incremental / append-based
ingestion can accumulate stale segments for any rows that were deleted or
updated in Iceberg between runs.
This gap will be addressed in a follow up Phase 2 proposal, which can do a
Iceberg snapshot diff to detect which Druid time intervals are affected by
Iceberg deletes/updates, and issues targeted MSQ `REPLACE`
tasks to overwrite those intervals with the current, correct Iceberg state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]