qianye1001 opened a new pull request, #1249: URL: https://github.com/apache/rocketmq-clients/pull/1249
## Summary Add batch message consumption capability to the Java PushConsumer, allowing messages to be accumulated in a buffer and delivered in batches based on configurable policies. - **BatchMessageListener** — new public API interface that receives `List<MessageView>` and returns a single `ConsumeResult` for the entire batch - **BatchPolicy** — configurable batching strategy with Builder pattern and sensible defaults (32 msgs, 4MB, 5s). Flush triggers on any of: maxBatchSize, maxBatchBytes, maxWaitTime - **BatchConsumeService** — Standard mode implementation with precise batch splitting (`extractBatch` respects both count and bytes limits simultaneously) and forward-progress guarantee (oversized single message still flushes) - **FifoBatchConsumeService** — FIFO mode via inheritance with whole-batch retry semantics and single-batch-in-flight constraint. On failure, entire batch is retried until max attempts exhausted, then forwarded to DLQ - **ConsumeService** enhanced — added `close()` lifecycle method, protected constructor for batch services (no NOOP listener hack), and accessor methods - **Config validation** in builder: `maxCacheMessageCount >= maxBatchSize`, `maxCacheMessageSizeInBytes >= maxBatchBytes` - **Comprehensive tests** using awaitility — 34 new test cases covering Standard/FIFO modes, retry, DLQ, concurrency, forward-progress, corrupted message handling, Builder validation - **BatchPushConsumerExample** demonstrating usage ### Key Design Decisions | Decision | Choice | Rationale | |----------|--------|-----------| | Batch result type | Single `ConsumeResult` per batch | Simple API; entire batch succeeds or fails together | | Buffer scope | Global (mixed across ProcessQueues) | No PQ-based grouping; messages from all sources batch together | | FIFO retry | Whole-batch retry | Preserves ordering guarantee; only one batch in-flight at a time | | Class hierarchy | `FifoBatchConsumeService extends BatchConsumeService` | Clean separation via inheritance instead of if-else flag | | Oversized messages | Forward-progress guarantee | Single message > maxBatchBytes still flushes as batch-of-one | ## Test plan - [x] `BatchPolicyTest` — 15 cases: constructor validation, Builder defaults, Builder custom values, invalid params - [x] `BatchConsumeTaskTest` — 6 cases: success, failure, exception, null return, unmodifiable list, batch size - [x] `BatchConsumeServiceTest` — 13 cases: maxBatchSize flush, maxBatchBytes flush, maxWaitTime flush, Standard failure, FIFO success, FIFO retry+DLQ, FIFO single-batch-in-flight, graceful shutdown, concurrent PQ submission, corrupted message discard, precise batch splitting, forward-progress, FIFO corrupted discard - [x] Full `mvn -B package` passes locally (JDK 11, 223 tests, 0 failures) 🤖 Generated with [Claude Code](https://claude.com/claude-code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
