yakir-Yang commented on issue #12059:
URL: https://github.com/apache/pulsar/issues/12059#issuecomment-920709586
Here's my simple code that can reproduce this problem quickly.
You can check the unacked messages through pulsar-admin tools
```
root@pulsar-node-102135:/pulsar# ./bin/pulsar-admin topics stats
persistent://edge-sit-message-center/default/subscriber_1415926654114594816
{
"count" : 0,
....
"subscriptions" : {
"kk-subscription" : {
......
"unackedMessages" : 4842,
"consumers" : [ {
"unackedMessages" : 4842,
} ],
"isDurable" : false,
}
},
```
```
package com.mycompany.app;
import org.apache.commons.cli.*;
import org.apache.pulsar.client.api.*;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
//import org.json.JSONException;
//import org.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Map;
/**
* Hello world!
*
*/
public class App
{
static CommandLine commandLine = null;
static long pkts = 0;
public static void main( String[] args ) {
CommandLineParser commandLineParser = new DefaultParser();
Options options = new Options();
options.addOption("addr", true, "Pulsar Server 地址");
options.addOption("topic", true, "主题名称");
try {
commandLine = commandLineParser.parse(options, args);
} catch (ParseException e) {
System.out.println("---- exception");
System.out.println(e);
return;
}
new PulsarThread().start();
new MetricThread().start();
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
}
}
static private class PulsarThread extends Thread {
public PulsarThread() {
}
public void run() {
try {
PulsarClient client =
PulsarClient.builder().serviceUrl(commandLine.getOptionValue("addr")).build();
Consumer<byte[]> consumer = client.newConsumer()
.topic(commandLine.getOptionValue("topic"))
.consumerName("javatest")
.subscriptionName("kk-subscription")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
.subscriptionMode(SubscriptionMode.NonDurable)
.subscribe();
while (true) {
Message<byte[]> message = consumer.receive();
if (message == null) {
continue;
}
// consumer.acknowledge(message);
MessageId msgId =
MessageId.fromByteArrayWithTopic(message.getMessageId().toByteArray(),
"persistent://edge-sit-message-center/default/subscriber_1415926654114594816");
consumer.acknowledgeAsync(msgId);
pkts += 1;
}
} catch (Exception e) {
}
}
}
static private class MetricThread extends Thread {
public void run() {
long last = 0;
while (true) {
System.out.printf("consumer speed: %d pkt/s\n", pkts - last);
last = pkts;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
}
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]