codelipenghui commented on a change in pull request #8285:
URL: https://github.com/apache/pulsar/pull/8285#discussion_r508438788
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1350,7 +1351,11 @@ protected void handleSeek(CommandSeek seek) {
Subscription subscription = consumer.getSubscription();
MessageIdData msgIdData = seek.getMessageId();
- Position position = new PositionImpl(msgIdData.getLedgerId(),
msgIdData.getEntryId());
+ long[] ackSet = msgIdData.getAckSetList().stream().mapToLong(x ->
x).toArray();
+ if (ackSet == null) ackSet = new long[0];
+
+ Position position = new PositionImpl(msgIdData.getLedgerId(),
+ msgIdData.getEntryId(), ackSet);
Review comment:
It's better to check the msgIdData.hasAckSetList() rather than create a
long[] first
##########
File path:
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -760,24 +760,30 @@ public static ByteBuf newUnsubscribe(long consumerId,
long requestId) {
public static ByteBuf newActiveConsumerChange(long consumerId, boolean
isActive) {
CommandActiveConsumerChange.Builder changeBuilder =
CommandActiveConsumerChange.newBuilder()
.setConsumerId(consumerId)
- .setIsActive(isActive);
+ .setIsActive(isActive);
CommandActiveConsumerChange change = changeBuilder.build();
ByteBuf res = serializeWithSize(
-
BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change));
+
BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change));
changeBuilder.recycle();
change.recycle();
return res;
}
- public static ByteBuf newSeek(long consumerId, long requestId, long
ledgerId, long entryId) {
+ public static ByteBuf newSeek(long consumerId, long requestId,
+ long ledgerId, long entryId, long[] ackSet) {
CommandSeek.Builder seekBuilder = CommandSeek.newBuilder();
seekBuilder.setConsumerId(consumerId);
seekBuilder.setRequestId(requestId);
MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
messageIdBuilder.setLedgerId(ledgerId);
messageIdBuilder.setEntryId(entryId);
+
+ for (long l : ackSet) {
+ messageIdBuilder.addAckSet(l);
+ }
Review comment:
```suggestion
messageIdBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(ackSet);
```
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
##########
@@ -116,6 +122,93 @@ public void testSeek() throws Exception {
assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
}
+ @Test
+ public void testSeekForBatch() throws Exception {
+ final String topicName =
"persistent://prop/use/ns-abcd/testSeekForBatch";
+ String subscriptionName = "my-subscription-batch";
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(true)
+ .batchingMaxMessages(3)
+ .topic(topicName).create();
+
+
+ List<MessageId> messageIds = new ArrayList<>();
+ List<CompletableFuture<MessageId>> futureMessageIds = new
ArrayList<>();
+
+ List<String> messages = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ messages.add(message);
+ CompletableFuture<MessageId> messageIdCompletableFuture =
producer.sendAsync(message);
+ futureMessageIds.add(messageIdCompletableFuture);
+ }
+
+ futureMessageIds.forEach(future -> {
+ MessageId messageId = null;
+ try {
+ messageId = future.get();
+ messageIds.add(messageId);
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ });
+ producer.flush();
+ producer.close();
+
+
+ org.apache.pulsar.client.api.Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .subscribe();
+
+ PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ assertNotNull(topicRef);
+
+ assertEquals(topicRef.getSubscriptions().size(), 1);
+
+ MessageId resetId = messageIds.get(4);
+ consumer.seek(resetId);
+ // Wait for consumer to reconnect
+ Thread.sleep(500);
+
+ Message<String> nextMessage = consumer.receive();
+ MessageId nextId = nextMessage.getMessageId();
+ consumer.acknowledge(nextId);
+ // For non-durable we are going to restart from the next entry
Review comment:
The consumer uses a durable cursor no a non-durable cursor.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]