udaysagar2177 opened a new issue, #17694: URL: https://github.com/apache/pinot/issues/17694
## Motivation Apache Iceberg is becoming the de-facto standard for data lake table formats, widely adopted across modern data platforms. Many organizations store their analytical data in Iceberg tables with frequent incremental updates. Currently, ingesting this data into Pinot requires one of the following approaches: 1. Deploy Spark jobs to read Iceberg tables and write to Pinot, requiring additional infrastructure and operational overhead 2. Use batch ingestion through minions, which adds latency and complication. 3. Extract data file paths from committed file information or from Iceberg manifests, publish to Kafka, and use the new Kafka microbatch plugin proposed in [#17331](https://github.com/apache/pinot/issues/17331) This proposal explores the opportunity for **native, near real-time ingestion from Iceberg tables** by treating Iceberg snapshots as the source of file metadata, eliminating the need for Spark, additional orchestration, or Kafka as an intermediary. ## Design Overview The design **reuses 95% of the Kafka microbatch infrastructure**, created in https://github.com/apache/pinot/issues/17331, replacing only the metadata source from Kafka to Iceberg. ### Architecture Comparison ``` Kafka Microbatch: Kafka Topic → Protocol Messages → MicroBatchQueueManager → Download Files → Ingest Iceberg Microbatch: Iceberg Snapshots → Manifest Files → MicroBatchQueueManager → Download Files → Ingest ↑ NEW ↑ REUSED (100%) ``` ### Key Components #### 1. IcebergMicroBatchConsumer (NEW) Replaces `KafkaPartitionLevelMicroBatchConsumer` - polls Iceberg snapshots for new data files and submits them to the existing `MicroBatchQueueManager` for download and processing. #### 2. Offset Format (NEW) Composite offset tracking snapshot ID, file path, and record position: ```json { "sid": 8723456789012345, "dfp": "s3://bucket/warehouse/db/table/data/file-001.parquet", "rof": 1500 } ``` - `sid`: Snapshot ID (equivalent to Kafka offset) - `dfp`: Data file path (for identifying which file in snapshot) - `rof`: Record offset in file (for mid-file resume after segment commit) ### How It Works ``` 1. Consumer polls Iceberg table → table.refresh() 2. Compare current snapshot ID with last processed → detect new data 3. Use IncrementalAppendScan → get list of new DataFiles since last snapshot 4. For each DataFile: - Extract: file path, format, record count from manifest - Create MicroBatch object with file metadata - Submit to MicroBatchQueueManager (existing code!) 5. MicroBatchQueueManager: - Downloads file from PinotFS (S3, HDFS, etc.) - Converts to MessageBatch using existing readers - Returns to consumer 6. Consumer updates offset with new snapshot ID + file + record position ``` ## Open Questions 1. **Partition Mapping**: Should each Iceberg partition correspond to a Pinot segment, or can multiple partitions be merged? What is the recommended mapping strategy? Additionally, how should Iceberg partitions map to Kafka topic partitions (or virtual partitions) expected by the consumption path? 2. **Compaction Handling**: How should the ingestion system handle Iceberg compactions that replace or remove existing data files? 3. **Schema Evolution**: How should schema changes in Iceberg tables be reflected in Pinot? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
