udaysagar2177 opened a new pull request, #17688: URL: https://github.com/apache/pinot/pull/17688
This PR introduces a new streaming ingestion pattern for Apache Pinot - MicroBatch ingestion from Kafka. Instead of processing individual records from Kafka messages, this consumer reads Kafka messages containing JSON protocol references to batch files stored in PinotFS (S3, HDFS, GCS, Azure, etc.) or inline base64-encoded data. See https://github.com/apache/pinot/issues/17331 for motivation. ### Design Overview **Wire Protocol (Version 1):** ``` +----------+---------------------------+ | version | JSON payload bytes | | (1 byte) | | +----------+---------------------------+ ``` **JSON Payload Examples:** ```json // URI type - reference to file in PinotFS {"type":"uri","format":"avro","uri":"s3://bucket/batch.avro","numRecords":1000} // DATA type - inline base64-encoded data {"type":"data","format":"avro","data":"<base64>","numRecords":100} ``` **Key Components:** - `KafkaMicroBatchConsumerFactory` - Factory that creates MicroBatch consumers - `MicroBatchProtocol` / `MicroBatchPayloadV1` - Protocol parsing and validation - `MicroBatchQueueManager` - Orchestrates parallel file downloads and batch conversion - `MicroBatchStreamPartitionMsgOffset` - Composite offset (Kafka offset + record offset within batch) enabling mid-batch resume after segment commits - `MessageBatchReader` - Converts downloaded files to `MessageBatch<GenericRow>` using Pinot's RecordReader **Supported Formats:** AVRO, Parquet, JSON ### Configuration ```json { "streamConfigs": { "streamType": "kafka", "stream.kafka.topic.name": "microbatch-topic", "stream.kafka.broker.list": "localhost:9092", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.microbatch.kafka30.KafkaMicroBatchConsumerFactory", "stream.microbatch.kafka.file.fetch.threads": "2" } } ``` See `CONFIGURATION.md` for complete documentation including PinotFS setup for S3/HDFS/GCS/Azure. ### Testing - **Unit tests:** `MicroBatchProtocolTest` (30+ test cases), `MicroBatchStreamPartitionMsgOffsetFactoryTest`, `MicroBatchStreamMetadataProviderTest`, `MicroBatchQueueManagerTest` - **Integration test:** `KafkaPartitionLevelMicroBatchConsumerTest` - Tests with real embedded Kafka, including mid-batch resume scenarios - **Cluster integration test:** `MicroBatchRealtimeClusterIntegrationTest` - End-to-end test with full Pinot cluster ### Compatibility Notes - This is a new plugin module - no changes to existing APIs or behavior - The composite offset format (`{"kmo":X,"mbro":Y}`) is specific to MicroBatch consumers and not compatible with standard Kafka consumer offsets - `supportsOffsetLag()` returns false since we cannot accurately estimate lag without knowing record counts in unconsumed batches. We can have it return batch counts if needed. ### Open Questions for Reviewers I’ve added TODOs in the code to highlight open questions for reviewers, please take a look. --- ## Checklist - [x] Code compiles and follows Pinot style guidelines - [x] Unit tests added and passing - [x] Integration tests added and passing - [x] Documentation added (`CONFIGURATION.md`) - [x] No new external dependencies introduced -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
