This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a5b4146 [Issue 8260] Support reset cursor to a batch index of the
batching message (#8285)
a5b4146 is described below
commit a5b4146ca0f6f333741763ed3a1dda3495dccb65
Author: Renkai <[email protected]>
AuthorDate: Wed Oct 21 19:16:29 2020 +0800
[Issue 8260] Support reset cursor to a batch index of the batching message
(#8285)
### Motivation
Make reset cursor command able to reset to a specific index in batch mode.
### Modifications
Now reset offset command change index info on broker side
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 5 ++
.../apache/pulsar/broker/service/ServerCnx.java | 10 +++-
.../broker/service/SubscriptionSeekTest.java | 62 ++++++++++++++++++++++
.../pulsar/client/impl/BatchMessageAcker.java | 8 +++
.../apache/pulsar/client/impl/ConsumerImpl.java | 24 +++++++--
.../apache/pulsar/common/protocol/Commands.java | 9 ++--
6 files changed, 109 insertions(+), 9 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 2aa02eb..344a1dd 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -992,6 +992,11 @@ public class ManagedCursorImpl implements ManagedCursor {
if (config.isDeletionAtBatchIndexLevelEnabled() &&
batchDeletedIndexes != null) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
batchDeletedIndexes.clear();
+ long[] resetWords = newPosition.ackSet;
+ if (resetWords != null) {
+ BitSetRecyclable ackSet =
BitSetRecyclable.create().resetWords(resetWords);
+ batchDeletedIndexes.put(newPosition, ackSet);
+ }
}
PositionImpl oldReadPosition = readPosition;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 505ecf2..09a86d57 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -124,6 +124,8 @@ import
org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -1350,7 +1352,13 @@ public class ServerCnx extends PulsarHandler {
Subscription subscription = consumer.getSubscription();
MessageIdData msgIdData = seek.getMessageId();
- Position position = new PositionImpl(msgIdData.getLedgerId(),
msgIdData.getEntryId());
+ long[] ackSet = null;
+ if (msgIdData.getAckSetCount() > 0) {
+ ackSet =
SafeCollectionUtils.longListToArray(msgIdData.getAckSetList());
+ }
+
+ Position position = new PositionImpl(msgIdData.getLedgerId(),
+ msgIdData.getEntryId(), ackSet);
subscription.resetCursor(position).thenRun(() -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index 0f5986d..31a46cf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -21,23 +21,28 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
+
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
@@ -54,6 +59,7 @@ public class SubscriptionSeekTest extends BrokerTestBase {
@Override
protected void setup() throws Exception {
super.baseSetup();
+ conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
}
@AfterClass
@@ -117,6 +123,62 @@ public class SubscriptionSeekTest extends BrokerTestBase {
}
@Test
+ public void testSeekForBatch() throws Exception {
+ final String topicName =
"persistent://prop/use/ns-abcd/testSeekForBatch";
+ String subscriptionName = "my-subscription-batch";
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(true)
+ .batchingMaxMessages(3)
+ .topic(topicName).create();
+
+
+ List<MessageId> messageIds = new ArrayList<>();
+ List<CompletableFuture<MessageId>> futureMessageIds = new
ArrayList<>();
+
+ List<String> messages = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ messages.add(message);
+ CompletableFuture<MessageId> messageIdCompletableFuture =
producer.sendAsync(message);
+ futureMessageIds.add(messageIdCompletableFuture);
+ }
+
+ for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
+ MessageId messageId = futureMessageId.get();
+ messageIds.add(messageId);
+ }
+
+ producer.close();
+
+
+ org.apache.pulsar.client.api.Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .startMessageIdInclusive()
+ .subscribe();
+
+ PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ assertNotNull(topicRef);
+
+ assertEquals(topicRef.getSubscriptions().size(), 1);
+
+ consumer.seek(MessageId.earliest);
+ Message<String> receiveBeforEarliest = consumer.receive();
+ assertEquals(receiveBeforEarliest.getValue(), messages.get(0));
+ consumer.seek(MessageId.latest);
+ Message<String> receiveAfterLatest = consumer.receive(1,
TimeUnit.SECONDS);
+ assertNull(receiveAfterLatest);
+
+ for (MessageId messageId : messageIds) {
+ consumer.seek(messageId);
+ MessageId receiveId = consumer.receive().getMessageId();
+ assertEquals(receiveId, messageId);
+ }
+ }
+
+
+ @Test
public void testConcurrentResetCursor() throws Exception {
final String topicName =
"persistent://prop/use/ns-abc/testConcurrentReset_" +
System.currentTimeMillis();
final String subscriptionName = "test-sub-name";
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
index e0b8197..a16dcb4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
@@ -86,4 +86,12 @@ class BatchMessageAcker {
return prevBatchCumulativelyAcked;
}
+ @Override
+ public String toString() {
+ return "BatchMessageAcker{" +
+ "batchSize=" + batchSize +
+ ", bitSet=" + bitSet +
+ ", prevBatchCumulativelyAcked=" + prevBatchCumulativelyAcked +
+ '}';
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5b6bcfa..c73d2f1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1922,15 +1922,29 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (!isConnected()) {
return FutureUtil.failedFuture(new PulsarClientException(
- String.format("The client is not connected to the broker when
seeking the subscription %s of the " +
- "topic %s to the message %s", subscription,
topicName.toString(), messageId.toString())));
+ String.format("The client is not connected to the broker
when seeking the subscription %s of the " +
+ "topic %s to the message %s", subscription,
topicName.toString(), messageId.toString())));
}
final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
long requestId = client.newRequestId();
- MessageIdImpl msgId = (MessageIdImpl) messageId;
- ByteBuf seek = Commands.newSeek(consumerId, requestId,
msgId.getLedgerId(), msgId.getEntryId());
+ ByteBuf seek = null;
+ if (messageId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
+ // Initialize ack set
+ BitSetRecyclable ackSet = BitSetRecyclable.create();
+ ackSet.set(0, msgId.getBatchSize());
+ ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
+ long[] ackSetArr = ackSet.toLongArray();
+ ackSet.recycle();
+
+ seek = Commands.newSeek(consumerId, requestId,
msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
+ } else {
+ MessageIdImpl msgId = (MessageIdImpl) messageId;
+ seek = Commands.newSeek(consumerId, requestId,
msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
+ }
+
ClientCnx cnx = cnx();
log.info("[{}][{}] Seek subscription to message id {}", topic,
subscription, messageId);
@@ -2470,4 +2484,4 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private static final Logger log =
LoggerFactory.getLogger(ConsumerImpl.class);
-}
\ No newline at end of file
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 5b01e40..3830af8 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -760,17 +760,18 @@ public class Commands {
public static ByteBuf newActiveConsumerChange(long consumerId, boolean
isActive) {
CommandActiveConsumerChange.Builder changeBuilder =
CommandActiveConsumerChange.newBuilder()
.setConsumerId(consumerId)
- .setIsActive(isActive);
+ .setIsActive(isActive);
CommandActiveConsumerChange change = changeBuilder.build();
ByteBuf res = serializeWithSize(
-
BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change));
+
BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change));
changeBuilder.recycle();
change.recycle();
return res;
}
- public static ByteBuf newSeek(long consumerId, long requestId, long
ledgerId, long entryId) {
+ public static ByteBuf newSeek(long consumerId, long requestId,
+ long ledgerId, long entryId, long[] ackSet) {
CommandSeek.Builder seekBuilder = CommandSeek.newBuilder();
seekBuilder.setConsumerId(consumerId);
seekBuilder.setRequestId(requestId);
@@ -778,6 +779,8 @@ public class Commands {
MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
messageIdBuilder.setLedgerId(ledgerId);
messageIdBuilder.setEntryId(entryId);
+
messageIdBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(ackSet));
+
MessageIdData messageId = messageIdBuilder.build();
seekBuilder.setMessageId(messageId);