udaysagar2177 opened a new pull request, #17688:
URL: https://github.com/apache/pinot/pull/17688

   This PR introduces a new streaming ingestion pattern for Apache Pinot - 
MicroBatch ingestion from Kafka. Instead of processing individual records from 
Kafka messages, this consumer reads Kafka messages containing JSON protocol 
references to batch files stored in PinotFS (S3, HDFS, GCS, Azure, etc.) or 
inline base64-encoded data. See https://github.com/apache/pinot/issues/17331 
for motivation.
   
   ### Design Overview
   
   **Wire Protocol (Version 1):**
   ```
   +----------+---------------------------+
   | version  | JSON payload bytes        |
   | (1 byte) |                           |
   +----------+---------------------------+
   ```
   
   **JSON Payload Examples:**
   ```json
   // URI type - reference to file in PinotFS
   
{"type":"uri","format":"avro","uri":"s3://bucket/batch.avro","numRecords":1000}
   
   // DATA type - inline base64-encoded data
   {"type":"data","format":"avro","data":"<base64>","numRecords":100}
   ```
   
   **Key Components:**
   - `KafkaMicroBatchConsumerFactory` - Factory that creates MicroBatch 
consumers
   - `MicroBatchProtocol` / `MicroBatchPayloadV1` - Protocol parsing and 
validation
   - `MicroBatchQueueManager` - Orchestrates parallel file downloads and batch 
conversion
   - `MicroBatchStreamPartitionMsgOffset` - Composite offset (Kafka offset + 
record offset within batch) enabling mid-batch resume after segment commits
   - `MessageBatchReader` - Converts downloaded files to 
`MessageBatch<GenericRow>` using Pinot's RecordReader
   
   **Supported Formats:** AVRO, Parquet, JSON
   
   ### Configuration
   
   ```json
   {
     "streamConfigs": {
       "streamType": "kafka",
       "stream.kafka.topic.name": "microbatch-topic",
       "stream.kafka.broker.list": "localhost:9092",
       "stream.kafka.consumer.factory.class.name":
         
"org.apache.pinot.plugin.stream.microbatch.kafka30.KafkaMicroBatchConsumerFactory",
       "stream.microbatch.kafka.file.fetch.threads": "2"
     }
   }
   ```
   
   See `CONFIGURATION.md` for complete documentation including PinotFS setup 
for S3/HDFS/GCS/Azure.
   
   ### Testing
   
   - **Unit tests:** `MicroBatchProtocolTest` (30+ test cases), 
`MicroBatchStreamPartitionMsgOffsetFactoryTest`, 
`MicroBatchStreamMetadataProviderTest`, `MicroBatchQueueManagerTest`
   - **Integration test:** `KafkaPartitionLevelMicroBatchConsumerTest` - Tests 
with real embedded Kafka, including mid-batch resume scenarios
   - **Cluster integration test:** `MicroBatchRealtimeClusterIntegrationTest` - 
End-to-end test with full Pinot cluster
   
   ### Compatibility Notes
   
   - This is a new plugin module - no changes to existing APIs or behavior
   - The composite offset format (`{"kmo":X,"mbro":Y}`) is specific to 
MicroBatch consumers and not compatible with standard Kafka consumer offsets
   - `supportsOffsetLag()` returns false since we cannot accurately estimate 
lag without knowing record counts in unconsumed batches. We can have it return 
batch counts if needed.
   
   ### Open Questions for Reviewers
   
   I’ve added TODOs in the code to highlight open questions for reviewers, 
please take a look.
   
   ---
   
   ## Checklist
   
   - [x] Code compiles and follows Pinot style guidelines
   - [x] Unit tests added and passing
   - [x] Integration tests added and passing
   - [x] Documentation added (`CONFIGURATION.md`)
   - [x] No new external dependencies introduced


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to