codelipenghui commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059286759
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
private final AtomicReference<ClientCnx>
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new
CopyOnWriteArrayList<Throwable>();
+ // Key is the ledger id and the entry id, entry is the acker that
represents which single messages are acknowledged
+ private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker>
batchMessageToAcker =
Review Comment:
I have tried them locally.
### The first case uses the message instance to ack the message and seeks to
the earliest position after acked half of the messages.
```java
@Test
public void testAckAfterSeek() throws Exception {
final String topic = "persistent://prop/use/ns-abc/testAckAfterSeek-" +
UUID.randomUUID();
@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("test")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
.create();
final int messages = 10;
for (int i = 0; i < messages; i++) {
producer.sendAsync("New message - " + i);
}
producer.flush();
// Only ack the first 5 messages
for (int i = 0; i < 5; i++) {
Message<String> received = consumer.receive();
log.info("[x] Received message: {}", received.getValue());
consumer.acknowledge(received);
}
consumer.seek(MessageId.earliest);
for (int i = 0; i < messages; i++) {
Message<String> received = consumer.receive();
log.info("[x] Received message: {}", received.getValue());
if (i > 4) {
consumer.acknowledge(received);
}
}
// Waiting for the message ack command send to the broker.
Thread.sleep(3000);
PersistentTopicInternalStats stats =
admin.topics().getInternalStats(topic);
log.info("The topic internal stats : {}", Json.pretty(stats));
}
```
The output:
```
2022-12-30T16:03:49,319 - INFO -
[mock-pulsar-bk-OrderedExecutor-0-0:ServerCnx@1500] - [/127.0.0.1:59957]
Created new producer:
Producer{topic=PersistentTopic{topic=persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68},
client=/127.0.0.1:59957, producerName=test-0-0, producerId=0}
2022-12-30T16:03:49,325 - INFO - [pulsar-client-io-37-5:ProducerImpl@1707]
-
[persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68]
[test-0-0] Created producer on cnx [id: 0xbe2e8d16, L:/127.0.0.1:59957 -
R:localhost/127.0.0.1:59949]
2022-12-30T16:03:49,401 - INFO - [main:SubscriptionSeekTest@815] - [x]
Received message: New message - 0
2022-12-30T16:03:49,403 - INFO - [main:SubscriptionSeekTest@815] - [x]
Received message: New message - 1
2022-12-30T16:03:49,403 - INFO - [main:SubscriptionSeekTest@815] - [x]
Received message: New message - 2
2022-12-30T16:03:49,403 - INFO - [main:SubscriptionSeekTest@815] - [x]
Received message: New message - 3
2022-12-30T16:03:49,403 - INFO - [main:SubscriptionSeekTest@815] - [x]
Received message: New message - 4
2022-12-30T16:03:49,405 - INFO - [main:ConsumerImpl@2144] -
[persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68][test]
Seeking subscription to the message -1:-1:-1
2022-12-30T16:03:49,406 - INFO - [pulsar-io-6-2:Consumer@385] -
Disconnecting consumer:
Consumer{subscription=PersistentSubscription{topic=persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68,
name=test}, consumerId=0, consumerName=78011, address=/127.0.0.1:59957}
2022-12-30T16:03:49,407 - INFO -
[pulsar-io-6-2:PersistentDispatcherMultipleConsumers@193] - Removed consumer
Consumer{subscription=PersistentSubscription{topic=persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68,
name=test}, consumerId=0, consumerName=78011, address=/127.0.0.1:59957} with
pending 1 acks
2022-12-30T16:03:49,408 - INFO -
[pulsar-io-6-2:PersistentDispatcherMultipleConsumers@200] -
[persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68
/ test] All consumers removed. Subscription is disconnected
2022-12-30T16:03:49,408 - INFO - [pulsar-io-6-2:PersistentSubscription@783]
-
[persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68][test]
Successfully disconnected consumers from subscription, proceeding with cursor
reset
2022-12-30T16:03:49,410 - INFO -
[mock-pulsar-bk-OrderedExecutor-0-0:ManagedCursorImpl@1227] -
[prop/use/ns-abc/persistent/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68]
Initiate reset readPosition to 3:-1 on cursor test
2022-12-30T16:03:49,411 - INFO - [pulsar-client-io-37-5:ClientCnx@796] -
[localhost/127.0.0.1:59949] Broker notification of Closed consumer: 0
2022-12-30T16:03:49,411 - INFO -
[pulsar-client-io-37-5:ConnectionHandler@144] -
[persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68]
[test] Closed connection [id: 0xbe2e8d16, L:/127.0.0.1:59957 -
R:localhost/127.0.0.1:59949] -- Will try again in 0.1 s
2022-12-30T16:03:49,415 - INFO -
[mock-pulsar-bk-OrderedExecutor-0-0:PulsarMockBookKeeper@122] - Creating ledger
7
2022-12-30T16:03:49,425 - INFO -
[bookkeeper-ml-scheduler-OrderedScheduler-0-0:ManagedCursorImpl$30@3049] -
[prop/use/ns-abc/persistent/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68]
Updated cursor test with ledger id 7 md-position=3:-1 rd-position=3:1
2022-12-30T16:03:49,433 - INFO -
[mock-pulsar-bk-OrderedExecutor-0-0:ManagedCursorImpl$9@1274] -
[prop/use/ns-abc/persistent/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68]
reset readPosition to 3:-1 before current read readPosition 3:1 on cursor test
2022-12-30T16:03:49,433 - INFO -
[mock-pulsar-bk-OrderedExecutor-0-0:ServerCnx@1770] - [/127.0.0.1:59957]
[persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68][test]
Reset subscription to message id -1:-1
2022-12-30T16:03:49,433 - INFO - [pulsar-client-io-37-5:ConsumerImpl@2147]
-
[persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68][test]
Successfully reset subscription to the message -1:-1:-1
2022-12-30T16:03:49,512 - INFO - [pulsar-timer-76-1:ConnectionHandler@148]
-
[persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68]
[test] Reconnecting after timeout
2022-12-30T16:03:49,517 - INFO -
[pulsar-web-32-14:Slf4jRequestLogWriter@62] - 127.0.0.1 - -
[30/Dec/2022:16:03:49 +0800] "GET
/lookup/v2/destination/persistent/prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68
HTTP/1.1" 200 217 "-" "Pulsar-Java-v2.12.0-SNAPSHOT" 4
2022-12-30T16:03:49,517 - INFO - [pulsar-client-io-37-3:ConsumerImpl@780] -
[persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68][test]
Subscribing to topic on cnx [id: 0xbe2e8d16, L:/127.0.0.1:59957 -
R:localhost/127.0.0.1:59949], consumerId 0
2022-12-30T16:03:49,518 - INFO - [pulsar-io-6-2:ServerCnx@1072] -
[/127.0.0.1:59957] Subscribing on topic
persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68
/ test
2022-12-30T16:03:49,530 - INFO -
[mock-pulsar-bk-OrderedExecutor-0-0:ManagedCursorImpl@2456] -
[prop/use/ns-abc/persistent/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68-test]
Rewind from 3:-1 to 3:0
2022-12-30T16:03:49,530 - INFO -
[mock-pulsar-bk-OrderedExecutor-0-0:ServerCnx@1156] - [/127.0.0.1:59957]
Created subscription on topic
persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68
/ test
2022-12-30T16:03:49,531 - INFO - [pulsar-client-io-37-5:ConsumerImpl@914] -
[persistent://prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68][test]
Subscribed to topic on localhost/127.0.0.1:59949 -- consumer: 0
2022-12-30T16:03:49,541 - INFO - [main:SubscriptionSeekTest@823] - [x]
Received message: New message - 0
2022-12-30T16:03:49,542 - INFO - [main:SubscriptionSeekTest@823] - [x]
Received message: New message - 1
2022-12-30T16:03:49,542 - INFO - [main:SubscriptionSeekTest@823] - [x]
Received message: New message - 2
2022-12-30T16:03:49,542 - INFO - [main:SubscriptionSeekTest@823] - [x]
Received message: New message - 3
2022-12-30T16:03:49,542 - INFO - [main:SubscriptionSeekTest@823] - [x]
Received message: New message - 4
2022-12-30T16:03:49,542 - INFO - [main:SubscriptionSeekTest@823] - [x]
Received message: New message - 5
2022-12-30T16:03:49,542 - INFO - [main:SubscriptionSeekTest@823] - [x]
Received message: New message - 6
2022-12-30T16:03:49,542 - INFO - [main:SubscriptionSeekTest@823] - [x]
Received message: New message - 7
2022-12-30T16:03:49,542 - INFO - [main:SubscriptionSeekTest@823] - [x]
Received message: New message - 8
2022-12-30T16:03:49,543 - INFO - [main:SubscriptionSeekTest@823] - [x]
Received message: New message - 9
2022-12-30T16:03:52,575 - INFO -
[pulsar-web-32-16:Slf4jRequestLogWriter@62] - 127.0.0.1 - -
[30/Dec/2022:16:03:52 +0800] "GET
/admin/persistent/prop/use/ns-abc/testAckAfterSeek-bfe62332-e92e-4aa1-8387-92ea0ffa6e68/internalStats?metadata=false
HTTP/1.1" 200 966 "-" "Pulsar-Java-v2.12.0-SNAPSHOT" 20
2022-12-30T16:03:52,596 - INFO - [main:SubscriptionSeekTest@833] - The
topic internal stats : {
"entriesAddedCounter" : 1,
"numberOfEntries" : 1,
"totalSize" : 275,
"currentLedgerEntries" : 1,
"currentLedgerSize" : 275,
"lastLedgerCreatedTimestamp" : "2022-12-30T16:03:49.057+08:00",
"waitingCursorsCount" : 2,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "3:0",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 3,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : {
"test" : {
"markDeletePosition" : "3:-1",
"readPosition" : "3:1",
"waitingReadOp" : true,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : 7,
"cursorLedgerLastEntry" : 1,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2022-12-30T16:03:49.184+08:00",
"state" : "Open",
"active" : true,
"numberOfEntriesSinceFirstNotAckedMessage" : 2,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : true,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
},
"schemaLedgers" : [ ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
}
```
### The second case uses the deserialized message ID to ack the message and
seeks to the earliest position after acked half of the messages.
```java
@Test
public void testAckAfterSeek() throws Exception {
final String topic =
"persistent://prop/use/ns-abc/testAckAfterSeek-" + UUID.randomUUID();
@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("test")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.batchingMaxPublishDelay(Long.MAX_VALUE,
TimeUnit.MILLISECONDS)
.create();
final int messages = 10;
for (int i = 0; i < messages; i++) {
producer.sendAsync("New message - " + i);
}
producer.flush();
// Only ack the first 5 messages
for (int i = 0; i < 5; i++) {
Message<String> received = consumer.receive();
log.info("[x] Received message: {}", received.getValue());
MessageId deserialized =
MessageId.fromByteArray(received.getMessageId().toByteArray());
consumer.acknowledge(deserialized);
}
consumer.seek(MessageId.earliest);
for (int i = 0; i < messages; i++) {
Message<String> received = consumer.receive();
log.info("[x] Received message: {}", received.getValue());
if (i > 4) {
MessageId deserialized =
MessageId.fromByteArray(received.getMessageId().toByteArray());
consumer.acknowledge(deserialized);
}
}
// Waiting for the message ack command send to the broker.
Thread.sleep(3000);
PersistentTopicInternalStats stats =
admin.topics().getInternalStats(topic);
log.info("The topic internal stats : {}", Json.pretty(stats));
}
```
The output:
```
2022-12-30T16:11:32,926 - INFO - [pulsar-client-io-37-5:ProducerImpl@1707]
-
[persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633]
[test-0-0] Created producer on cnx [id: 0x18c45e74, L:/127.0.0.1:60303 -
R:localhost/127.0.0.1:60295]
2022-12-30T16:11:32,996 - INFO - [main:SubscriptionSeekTest@815] - [x]
Received message: New message - 0
2022-12-30T16:11:32,999 - INFO - [main:SubscriptionSeekTest@815] - [x]
Received message: New message - 1
2022-12-30T16:11:32,999 - INFO - [main:SubscriptionSeekTest@815] - [x]
Received message: New message - 2
2022-12-30T16:11:33,000 - INFO - [main:SubscriptionSeekTest@815] - [x]
Received message: New message - 3
2022-12-30T16:11:33,000 - INFO - [main:SubscriptionSeekTest@815] - [x]
Received message: New message - 4
2022-12-30T16:11:33,002 - INFO - [main:ConsumerImpl@2248] -
[persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633][test]
Seeking subscription to the message -1:-1:-1
2022-12-30T16:11:33,003 - INFO - [pulsar-io-6-2:Consumer@385] -
Disconnecting consumer:
Consumer{subscription=PersistentSubscription{topic=persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633,
name=test}, consumerId=0, consumerName=aec48, address=/127.0.0.1:60303}
2022-12-30T16:11:33,005 - INFO -
[pulsar-io-6-2:PersistentDispatcherMultipleConsumers@193] - Removed consumer
Consumer{subscription=PersistentSubscription{topic=persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633,
name=test}, consumerId=0, consumerName=aec48, address=/127.0.0.1:60303} with
pending 1 acks
2022-12-30T16:11:33,006 - INFO -
[pulsar-io-6-2:PersistentDispatcherMultipleConsumers@200] -
[persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633
/ test] All consumers removed. Subscription is disconnected
2022-12-30T16:11:33,006 - INFO - [pulsar-io-6-2:PersistentSubscription@783]
-
[persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633][test]
Successfully disconnected consumers from subscription, proceeding with cursor
reset
2022-12-30T16:11:33,008 - INFO -
[mock-pulsar-bk-OrderedExecutor-0-0:ManagedCursorImpl@1227] -
[prop/use/ns-abc/persistent/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633]
Initiate reset readPosition to 3:-1 on cursor test
2022-12-30T16:11:33,009 - INFO - [pulsar-client-io-37-5:ClientCnx@796] -
[localhost/127.0.0.1:60295] Broker notification of Closed consumer: 0
2022-12-30T16:11:33,009 - INFO -
[pulsar-client-io-37-5:ConnectionHandler@144] -
[persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633]
[test] Closed connection [id: 0x18c45e74, L:/127.0.0.1:60303 -
R:localhost/127.0.0.1:60295] -- Will try again in 0.1 s
2022-12-30T16:11:33,014 - INFO -
[mock-pulsar-bk-OrderedExecutor-0-0:PulsarMockBookKeeper@122] - Creating ledger
7
2022-12-30T16:11:33,025 - INFO -
[bookkeeper-ml-scheduler-OrderedScheduler-11-0:ManagedCursorImpl$30@3049] -
[prop/use/ns-abc/persistent/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633]
Updated cursor test with ledger id 7 md-position=3:-1 rd-position=3:1
2022-12-30T16:11:33,032 - INFO -
[mock-pulsar-bk-OrderedExecutor-0-0:ManagedCursorImpl$9@1274] -
[prop/use/ns-abc/persistent/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633]
reset readPosition to 3:-1 before current read readPosition 3:1 on cursor test
2022-12-30T16:11:33,032 - INFO -
[mock-pulsar-bk-OrderedExecutor-0-0:ServerCnx@1770] - [/127.0.0.1:60303]
[persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633][test]
Reset subscription to message id -1:-1
2022-12-30T16:11:33,033 - INFO - [pulsar-client-io-37-5:ConsumerImpl@2251]
-
[persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633][test]
Successfully reset subscription to the message -1:-1:-1
2022-12-30T16:11:33,111 - INFO - [pulsar-timer-76-1:ConnectionHandler@148]
-
[persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633]
[test] Reconnecting after timeout
2022-12-30T16:11:33,118 - INFO -
[pulsar-web-32-14:Slf4jRequestLogWriter@62] - 127.0.0.1 - -
[30/Dec/2022:16:11:33 +0800] "GET
/lookup/v2/destination/persistent/prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633
HTTP/1.1" 200 217 "-" "Pulsar-Java-v2.12.0-SNAPSHOT" 6
2022-12-30T16:11:33,118 - INFO - [pulsar-client-io-37-3:ConsumerImpl@884] -
[persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633][test]
Subscribing to topic on cnx [id: 0x18c45e74, L:/127.0.0.1:60303 -
R:localhost/127.0.0.1:60295], consumerId 0
2022-12-30T16:11:33,119 - INFO - [pulsar-io-6-2:ServerCnx@1072] -
[/127.0.0.1:60303] Subscribing on topic
persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633
/ test
2022-12-30T16:11:33,133 - INFO -
[mock-pulsar-bk-OrderedExecutor-0-0:ManagedCursorImpl@2456] -
[prop/use/ns-abc/persistent/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633-test]
Rewind from 3:-1 to 3:0
2022-12-30T16:11:33,134 - INFO -
[mock-pulsar-bk-OrderedExecutor-0-0:ServerCnx@1156] - [/127.0.0.1:60303]
Created subscription on topic
persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633
/ test
2022-12-30T16:11:33,135 - INFO - [pulsar-client-io-37-5:ConsumerImpl@1018]
-
[persistent://prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633][test]
Subscribed to topic on localhost/127.0.0.1:60295 -- consumer: 0
2022-12-30T16:11:33,145 - INFO - [main:SubscriptionSeekTest@824] - [x]
Received message: New message - 0
2022-12-30T16:11:33,145 - INFO - [main:SubscriptionSeekTest@824] - [x]
Received message: New message - 1
2022-12-30T16:11:33,145 - INFO - [main:SubscriptionSeekTest@824] - [x]
Received message: New message - 2
2022-12-30T16:11:33,145 - INFO - [main:SubscriptionSeekTest@824] - [x]
Received message: New message - 3
2022-12-30T16:11:33,145 - INFO - [main:SubscriptionSeekTest@824] - [x]
Received message: New message - 4
2022-12-30T16:11:33,146 - INFO - [main:SubscriptionSeekTest@824] - [x]
Received message: New message - 5
2022-12-30T16:11:33,146 - INFO - [main:SubscriptionSeekTest@824] - [x]
Received message: New message - 6
2022-12-30T16:11:33,146 - INFO - [main:SubscriptionSeekTest@824] - [x]
Received message: New message - 7
2022-12-30T16:11:33,147 - INFO - [main:SubscriptionSeekTest@824] - [x]
Received message: New message - 8
2022-12-30T16:11:33,147 - INFO - [main:SubscriptionSeekTest@824] - [x]
Received message: New message - 9
2022-12-30T16:11:36,182 - INFO -
[pulsar-web-32-16:Slf4jRequestLogWriter@62] - 127.0.0.1 - -
[30/Dec/2022:16:11:36 +0800] "GET
/admin/persistent/prop/use/ns-abc/testAckAfterSeek-667ef06b-e0a6-43d7-8b71-3ab14fb2a633/internalStats?metadata=false
HTTP/1.1" 200 965 "-" "Pulsar-Java-v2.12.0-SNAPSHOT" 23
2022-12-30T16:11:36,209 - INFO - [main:SubscriptionSeekTest@835] - The
topic internal stats : {
"entriesAddedCounter" : 1,
"numberOfEntries" : 1,
"totalSize" : 275,
"currentLedgerEntries" : 1,
"currentLedgerSize" : 275,
"lastLedgerCreatedTimestamp" : "2022-12-30T16:11:32.628+08:00",
"waitingCursorsCount" : 1,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "3:0",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 3,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : {
"test" : {
"markDeletePosition" : "3:0",
"readPosition" : "3:1",
"waitingReadOp" : true,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 1,
"cursorLedger" : 7,
"cursorLedgerLastEntry" : 2,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2022-12-30T16:11:32.772+08:00",
"state" : "Open",
"active" : true,
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : true,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
},
"schemaLedgers" : [ ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
}
```
You will see that the batch will not be acknowledged by using the message
instance but be acknowledged by using the deserialized message ID (from the
topic internal stats output). Hope I didn't miss any details 😁
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]