startjava commented on issue #17983:
URL: https://github.com/apache/pulsar/issues/17983#issuecomment-1274494445
pulsar 2.10.1 version 。
```
package com.ghy.www.cumulativeacknowledgement.consumer.listener;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.stereotype.Component;
@Component
public class MyMessageListener1 implements MessageListener {
@Override
public void received(Consumer consumer, Message msg) {
try {
String messageString = new String(msg.getData());
System.out.println(messageString);
if (messageString.equals("acknowledge消息1")) {
consumer.acknowledge(msg);
System.out.println(messageString + "执行了acknowledge");
}
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}
```
```
package com.ghy.www.cumulativeacknowledgement.consumer.listener;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.stereotype.Component;
@Component
public class MyMessageListener2 implements MessageListener {
@Override
public void received(Consumer consumer, Message msg) {
try {
String messageString = new String(msg.getData());
System.out.println(messageString);
if (messageString.equals("acknowledgeCumulative消息1")) {
consumer.acknowledgeCumulative(msg);
System.out.println(messageString +
"执行了acknowledgeCumulative");
}
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}
```
```
@RequestMapping("getUnAckMsgCount")
public String getUnAckMsgCount() throws PulsarClientException,
PulsarAdminException {
{
List<? extends ConsumerStats> consumers =
pulsarAdmin.topics().getStats(myTopic1).getSubscriptions().get(myTopic1_subscriptionName).getConsumers();
for (int i = 0; i < consumers.size(); i++) {
ConsumerStats consumerStats = consumers.get(i);
System.out.println(myTopic1 + " getConsumerName=" +
consumerStats.getConsumerName() + " getUnackedMessages=" +
consumerStats.getUnackedMessages());
}
}
System.out.println();
{
List<? extends ConsumerStats> consumers =
pulsarAdmin.topics().getStats(myTopic2).getSubscriptions().get(myTopic2_subscriptionName).getConsumers();
for (int i = 0; i < consumers.size(); i++) {
ConsumerStats consumerStats = consumers.get(i);
System.out.println(myTopic2 + " getConsumerName=" +
consumerStats.getConsumerName() + " getUnackedMessages=" +
consumerStats.getUnackedMessages());
}
}
return "成功获得";
}
```
run url "getUnAckMsgCount" , print info :
myTopic1 getConsumerName=消费者1 getUnackedMessages=0
myTopic2 getConsumerName=消费者2 getUnackedMessages=0
alway value is 0 .
i where error ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]