KKcorps commented on code in PR #8538:
URL: https://github.com/apache/pinot/pull/8538#discussion_r851301660
##########
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java:
##########
@@ -273,13 +269,31 @@ public void testPartitionLevelConsumerBatchMessages()
}
}
- public MessageId getMessageIdForPartitionAndIndex(int partitionNum, int
index) {
+ private static MessageBatch consumeMessageBatch(PartitionGroupConsumer
consumer, MessageId startMsgId,
+ MessageId endMsgId, int expectedMsgCount)
+ throws TimeoutException {
+ int retryCount = 0;
+ MessageBatch messageBatch = null;
+ while (retryCount < DEFAULT_RETRY_COUNT) {
+ retryCount++;
+ messageBatch = consumer.fetchMessages(new
MessageIdStreamOffset(startMsgId),
Review Comment:
This method was added to handle the scenarios where messages are published
in batches which actually changes the format of MessageIds in Pulsar.
The only way to publish in batch is to enable async producer. In sync mode,
all messages are flushed as soon as they are received.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]