jiangmincong opened a new issue #7805:
URL: https://github.com/apache/pulsar/issues/7805


   Why use batchReceive, you can consume data for the first time, but cannot 
consume data afterwards; topic has been producing data
   
   code:
   Consumer<byte[]> consumer
                /*= client.newConsumer()
                .topics(ConfigMsg.subscribeTopicNames)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionName(ConfigMsg.subscriptionName)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                .batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY)
                .subscribe();*/
   = client.newConsumer()
                .topics(ConfigMsg.subscribeTopicNames)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionName(ConfigMsg.subscriptionName)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                //.batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY) // 
默认批次消费策略
                .batchReceivePolicy(
                                BatchReceivePolicy
                                                .builder()
                                                
.maxNumMessages(ConfigMsg.batchReceiveMaxNumMsgs)
                                                
.maxNumBytes(ConfigMsg.batchReceiveMaxNumBytes)
                                                
.timeout(ConfigMsg.batchReceiveTimeout, TimeUnit.MILLISECONDS)
                                                .build()
                )
                .subscribe();
   
   while (true) {
        // Wait for a message
        Messages messages = consumer.batchReceive();
        System.out.println("-------------size: " + messages.size() + 
"------------");
        try {
                Iterator iterator = messages.iterator();
                while(iterator.hasNext()){
                        Message msg = (Message) iterator.next();
                        System.out.printf("Message received: %s \n", new 
String(msg.getData()));
                }
                // Acknowledge the message so that it can be deleted by the 
message broker
                consumer.acknowledge(messages);
        } catch (Exception e) {
                e.printStackTrace();
                consumer.negativeAcknowledge(messages);
        }
   }
   
   
![image](https://user-images.githubusercontent.com/22076140/90087664-e86f8900-dd4f-11ea-975a-48dae058170e.png)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to