a2l007 opened a new issue, #19190:
URL: https://github.com/apache/druid/issues/19190

   ## 1. Background and Motivation
   
   ### 1.1 Current State
   
   The Druid Iceberg extension (`druid-iceberg-extensions`) supports ingesting 
data from Iceberg
   tables via `IcebergInputSource`. Internally it:
   
   1. Connects to an Iceberg catalog (Hive, Glue, REST, or local)
   2. Lists data files up to the current (or a specified) snapshot
   3. Passes those raw file paths to a native Druid `InputSource` via 
`warehouseSource`
   4. Druid reads the Parquet files directly using the configured `InputFormat`
   
   This works correctly for **Iceberg v1** tables because v1 only supports 
append operations —
   every file in the table is a live data file.
   
   ### 1.2 The Problem
   
   **Iceberg v2** introduced row-level deletes. A v2 table can contain two 
additional file types
   alongside data files:
   
   | File Type | Content | Purpose |
   |-----------|---------|---------|
   | **Positional Delete File** | `(file_path, row_position)` pairs | Delete 
the row at position N in file F |
   | **Equality Delete File** | Column value sets | Delete any row where column 
values match |
   
   When Druid reads an Iceberg v2 table today, it only reads the **data files** 
and completely
   ignores delete files. As a result, deleted rows are silently included in 
ingested Druid segments.
   
   ### 1.3 Example
   
   ```
   Snapshot 1 (append):  data-001.parquet → rows: order_id=1, order_id=2, 
order_id=3
   Snapshot 2 (delete):  eq-delete-001.parquet → "delete where order_id = 2"
   ```
   
   **Today (v1-only behaviour):**
   Druid ingests all three rows including `order_id=2`, which was deleted in 
Iceberg.
   
   **Expected (v2 behaviour):**
   Druid should ingest only `order_id=1` and `order_id=3`.
   
   ---
   
   ## 2. Proposed Design
   
   In the V2 path, the Iceberg input source is used to obtain `FileScanTask` 
objects (data
   file paths + associated delete file paths + table schema). That metadata is 
serialized into the
   per-worker task spec. Workers open the associated data files and applies the 
position delete and equality delete files before converting the records to 
Druid `InputRow`. 
   
   ### 2.1 High-Level Architecture
   
   #### Proposed (v2-aware) flow:
   
   
   ```
   IcebergInputSource
       │
       ├─► IcebergCatalog.extractFileScanTasksWithSchema()
       │       └─► tableScan.planFiles() → List<FileScanTask> + table Schema
       │
       └─► Are there deletes?
               │
               ├─NO──► [V1 path] extract paths → warehouseSource.create(paths)  
(unchanged)
               │
               └─YES─► [V2 path] encode tasks as V2 splits
                           └─► creates a IcebergFileTaskInputSource object per 
split:
                                   carries: data file path, delete file paths,
                                            serialized table schema,
                                            warehouseSource (for worker-side 
file access)
   
   IcebergFileTaskInputSource
       └─► IcebergNativeRecordReader
               ├─► Deserialize table schema from JSON
               ├─► Open files via WarehouseFileIO
               ├─► Read delete files, apply deletes row by row
               └─► IcebergRecordConverter → MapBasedInputRow
   ```
   
   ### 2.2 Differentiate between V1/V2 during catalog read
   
   At scan planning time, `IcebergInputSource.retrieveIcebergDatafiles()` 
checks whether any
   returned `FileScanTask` carries delete files. Based on this, the following 
paths can be followed:
   
   
   - **No delete files (V1 path):** File paths are extracted and handed to the 
existing
     `warehouseSource` input source — identical to today's behaviour.
   - **Delete files present (V2 path):** Tasks are encoded as V2 splits, each 
carrying the data
     file metadata, delete file metadata, and the serialized table schema. 
`warehouseSource` is
     passed through to each `IcebergFileTaskInputSource` for use on the worker.
   
   Please note that a v2-format table that has never had any rows deleted (no 
delete files) goes through the V1
   path.
   
   
   
   ### 2.3 Handling Delete files
   
   A new `InputSourceReader` implementation: `IcebergNativeRecordReader` will 
be used that reads each of the assigned Iceberg data file and applies any 
associated position-delete and equality-delete files before converting records 
to Druid `InputRow`s.
   
   #### Step 1: Deserialize table schema
   
   The Iceberg `Schema` is deserialized from the JSON string embedded in
   `IcebergFileTaskInputSource` using `SchemaParser.fromJson(tableSchemaJson)`. 
This is the schema
   that was captured from `IcebergInputSource`.
   
   #### Step 2: Collect positional deletes
   
   Each positional delete file is read using Iceberg's
   `DeleteSchemaUtil.pathPosSchema()`. Only rows matching the current data file 
path are retained:
   
   ```
   pos-delete-001.parquet:
       file_path="data-001.parquet", pos=1
       file_path="data-001.parquet", pos=3
       file_path="data-002.parquet", pos=0   ← different file, ignored
   
   Result: deletedPositions = {1, 3}
   ```
   
   #### Step 3: Collect equality deletes
   
   Each equality delete file is read using a schema projected to
   its equality fields (identified by field IDs in the delete file metadata):
   
   ```
   eq-delete-001.parquet (equality field: order_id, field ID=1):
       order_id=1002
       order_id=1005
   
   Result: equalityDeleteSets = [ {fieldNames=["order_id"], 
keys={{"order_id":1002}, {"order_id":1005}}} ]
   ```
   
   #### Step 4: Stream data file and apply both delete sets
   
   For each record:
   1. If its 0-indexed position is in `deletedPositions` → skip.
   2. Otherwise check each equality delete set → skip if matched.
   3. Surviving records are passed to `IcebergRecordConverter`.
   
   ```
   data-001.parquet (read via WarehouseFileIO):
       pos=0  order_id=1001  → KEEP
       pos=1  order_id=1002  → SKIP (positional delete)
       pos=2  order_id=1003  → KEEP
       pos=3  order_id=1004  → SKIP (positional delete)
       pos=4  order_id=1005  → SKIP (equality delete)
   
   Output: rows for order_id=1001 and order_id=1003
   ```
   
   #### Step 5:  Record-to-InputRow Conversion
   
   `IcebergRecordConverter` converts an Iceberg `GenericRecord` to a Druid 
`InputRow` in two steps:
   
   1. **Record → Map**: iterate `Schema.columns()`, call 
`record.getField(name)`, apply type
      conversion.
   2. **Map → InputRow**: use `TimestampSpec.extractTimestamp()` and 
`DimensionsSpec` to build a
      `MapBasedInputRow`. If the dimensions list is empty (auto mode), all 
columns except the
      timestamp column are used as dimensions.
   
   
   
   
   ---
   
   No changes to the ingestion spec are required. Existing specs continue to 
work unchanged.
   `warehouseSource` is used actively in both the V1 and V2 paths.
   
   ---
   
   ## 3. Limitations
   
   
   ### 3.1 Materializing all records from Datafile in Memory per Task
   
   `IcebergNativeRecordReader` currently materializes all surviving records 
from a data file into a
   `List<InputRow>` before returning the iterator. For very large data files 
this increases worker
   heap pressure. Streaming row-by-row can be a candidate for follow-up 
optimization.
   
   ### 3.2 Already-Ingested Druid Segments Are Not Corrected
   
   This proposal only fixes **new ingestion runs** which that rows deleted in 
Iceberg are not
   included when Druid reads the table going forward.
   
   It does **not** correct Druid segments that are already ingested into Druid. 
If a row was ingested into Druid at time T₁ and then deleted from Iceberg at
   time T₂, the Druid segment written at T₁ still contains that row.
   
   A full re-ingestion would do the right thing but incremental / append-based 
ingestion can accumulate stale segments for any rows that were deleted or 
updated in Iceberg between runs.
   
   This gap will be addressed in a follow up Phase 2 proposal, which can do a 
Iceberg snapshot diff to detect which Druid time intervals are affected by 
Iceberg deletes/updates, and issues targeted MSQ `REPLACE`
   tasks to overwrite those intervals with the current, correct Iceberg state.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to