jiangmincong opened a new issue #7805:
URL: https://github.com/apache/pulsar/issues/7805
Why use batchReceive, you can consume data for the first time, but cannot
consume data afterwards; topic has been producing data
code:
Consumer<byte[]> consumer
/*= client.newConsumer()
.topics(ConfigMsg.subscribeTopicNames)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(ConfigMsg.subscriptionName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY)
.subscribe();*/
= client.newConsumer()
.topics(ConfigMsg.subscribeTopicNames)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(ConfigMsg.subscriptionName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
//.batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY) //
默认批次消费策略
.batchReceivePolicy(
BatchReceivePolicy
.builder()
.maxNumMessages(ConfigMsg.batchReceiveMaxNumMsgs)
.maxNumBytes(ConfigMsg.batchReceiveMaxNumBytes)
.timeout(ConfigMsg.batchReceiveTimeout, TimeUnit.MILLISECONDS)
.build()
)
.subscribe();
while (true) {
// Wait for a message
Messages messages = consumer.batchReceive();
System.out.println("-------------size: " + messages.size() +
"------------");
try {
Iterator iterator = messages.iterator();
while(iterator.hasNext()){
Message msg = (Message) iterator.next();
System.out.printf("Message received: %s \n", new
String(msg.getData()));
}
// Acknowledge the message so that it can be deleted by the
message broker
consumer.acknowledge(messages);
} catch (Exception e) {
e.printStackTrace();
consumer.negativeAcknowledge(messages);
}
}

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]