This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 3cc9001da99 [fix][broker] If ledger lost, cursor mark delete position
can not forward (#18620)
3cc9001da99 is described below
commit 3cc9001da99078c41df2e541b971ea29ad801de6
Author: fengyubiao <[email protected]>
AuthorDate: Mon May 29 14:46:38 2023 +0800
[fix][broker] If ledger lost, cursor mark delete position can not forward
(#18620)
Motivation:
Configuration `autoSkipNonRecoverableData` is designed to turn this feature
on if we can accept partial data loss. When a ledger is lost, the broker will
still work. But now we have this problem: If a ledger is lost, consumer and
producer can work, but the cursor mark delete position can not forward.
Modifications:
- When an unrecoverable ledger is found, remove the records
in`individualDeletedMessages` and `batchDeletedIndexes`.
- When the managed cursor is recovered, check whether there are invalid
records in `individualDeletedMessages` and `batchDeletedIndexes` and print a
warning log.
(cherry picked from commit ab810f4f59dd7d6c8b5313ceb334873a7d2cde31)
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 6 +
.../apache/bookkeeper/mledger/ManagedLedger.java | 6 +
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 40 +++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 +
.../bookkeeper/mledger/impl/OpReadEntry.java | 4 +
.../LedgerLostAndSkipNonRecoverableTest.java | 295 +++++++++++++++++++++
6 files changed, 358 insertions(+)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 46ca0f14003..3012706891e 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -681,6 +681,12 @@ public interface ManagedCursor {
*/
long getEstimatedSizeSinceMarkDeletePosition();
+ /**
+ * If a ledger is lost, this ledger will be skipped after enabled
"autoSkipNonRecoverableData", and the method is
+ * used to delete information about this ledger in the ManagedCursor.
+ */
+ default void skipNonRecoverableLedger(long ledgerId){}
+
/**
* Returns cursor throttle mark-delete rate.
*
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 497cbeb171c..b8785858661 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -623,6 +623,12 @@ public interface ManagedLedger {
*/
void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
+ /**
+ * If a ledger is lost, this ledger will be skipped after enabled
"autoSkipNonRecoverableData", and the method is
+ * used to delete information about this ledger in the ManagedCursor.
+ */
+ default void skipNonRecoverableLedger(long ledgerId){}
+
/**
* Roll current ledger if it is full.
*/
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 011d3df77f1..636577992cd 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2555,6 +2555,46 @@ public class ManagedCursorImpl implements ManagedCursor {
}
}
+ /**
+ * Manually acknowledge all entries in the lost ledger.
+ * - Since this is an uncommon event, we focus on maintainability. So we
do not modify
+ * {@link #individualDeletedMessages} and {@link #batchDeletedIndexes},
but call
+ * {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
+ * - This method is valid regardless of the consumer ACK type.
+ * - If there is a consumer ack request after this event, it will also
work.
+ */
+ @Override
+ public void skipNonRecoverableLedger(final long ledgerId){
+ LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+ if (ledgerInfo == null) {
+ return;
+ }
+ lock.writeLock().lock();
+ log.warn("[{}] [{}] Since the ledger [{}] is lost and the
autoSkipNonRecoverableData is true, this ledger will"
+ + " be auto acknowledge in subscription", ledger.getName(),
name, ledgerId);
+ try {
+ for (int i = 0; i < ledgerInfo.getEntries(); i++) {
+ if (!individualDeletedMessages.contains(ledgerId, i)) {
+ asyncDelete(PositionImpl.get(ledgerId, i), new
AsyncCallbacks.DeleteCallback() {
+ @Override
+ public void deleteComplete(Object ctx) {
+ // ignore.
+ }
+
+ @Override
+ public void deleteFailed(ManagedLedgerException ex,
Object ctx) {
+ // The method internalMarkDelete already handled
the failure operation. We only need to
+ // make sure the memory state is updated.
+ // If the broker crashed, the non-recoverable
ledger will be detected again.
+ }
+ }, null);
+ }
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
// //////////////////////////////////////////////////
void startCreatingNewMetadataLedger() {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index bcb607cef43..56b2e8f0900 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1689,6 +1689,13 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
+ @Override
+ public void skipNonRecoverableLedger(long ledgerId){
+ for (ManagedCursor managedCursor : cursors) {
+ managedCursor.skipNonRecoverableLedger(ledgerId);
+ }
+ }
+
synchronized void createLedgerAfterClosed() {
if (isNeededCreateNewLedgerAfterCloseLedger()) {
log.info("[{}] Creating a new ledger after closed", name);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index a805802e633..b8018db511a 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -103,6 +103,7 @@ class OpReadEntry implements ReadEntriesCallback {
// try to find and move to next valid ledger
final Position nexReadPosition =
cursor.getNextLedgerPosition(readPosition.getLedgerId());
// fail callback if it couldn't find next valid ledger
+ Long lostLedger = readPosition.ledgerId;
if (nexReadPosition == null) {
callback.readEntriesFailed(exception, ctx);
cursor.ledger.mbean.recordReadEntriesError();
@@ -110,6 +111,9 @@ class OpReadEntry implements ReadEntriesCallback {
return;
}
updateReadPosition(nexReadPosition);
+ if (lostLedger < nexReadPosition.getLedgerId()) {
+ cursor.getManagedLedger().skipNonRecoverableLedger(lostLedger);
+ }
checkReadCompletion();
} else {
if (!(exception instanceof TooManyRequestsException)) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/LedgerLostAndSkipNonRecoverableTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/LedgerLostAndSkipNonRecoverableTest.java
new file mode 100644
index 00000000000..2cab6c5e761
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/LedgerLostAndSkipNonRecoverableTest.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class LedgerLostAndSkipNonRecoverableTest extends ProducerConsumerBase {
+
+ private static final String DEFAULT_NAMESPACE = "my-property/my-ns";
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ protected void doInitConf() throws Exception {
+ conf.setAutoSkipNonRecoverableData(true);
+ }
+
+ @DataProvider(name = "batchEnabled")
+ public Object[][] batchEnabled(){
+ return new Object[][]{
+ {true},
+ {false}
+ };
+ }
+
+ @Test(timeOut = 30000, dataProvider = "batchEnabled")
+ public void testMarkDeletedPositionCanForwardAfterTopicLedgerLost(boolean
enabledBatch) throws Exception {
+ String topicSimpleName = UUID.randomUUID().toString().replaceAll("-",
"");
+ String subName = UUID.randomUUID().toString().replaceAll("-", "");
+ String topicName = String.format("persistent://%s/%s",
DEFAULT_NAMESPACE, topicSimpleName);
+
+ log.info("create topic and subscription.");
+ Consumer sub = createConsumer(topicName, subName, enabledBatch);
+ sub.redeliverUnacknowledgedMessages();
+ sub.close();
+
+ log.info("send many messages.");
+ int ledgerCount = 3;
+ int messageCountPerLedger = enabledBatch ? 25 : 5;
+ int messageCountPerEntry = enabledBatch ? 5 : 1;
+ List<MessageIdImpl>[] sendMessages =
+ sendManyMessages(topicName, ledgerCount,
messageCountPerLedger, messageCountPerEntry);
+ int sendMessageCount = Arrays.asList(sendMessages).stream()
+ .flatMap(s -> s.stream()).collect(Collectors.toList()).size();
+ log.info("send {} messages", sendMessageCount);
+
+ log.info("make individual ack.");
+ ConsumerAndReceivedMessages consumerAndReceivedMessages1 =
+ waitConsumeAndAllMessages(topicName, subName,
enabledBatch,false);
+ List<MessageIdImpl>[] messageIds =
consumerAndReceivedMessages1.messageIds;
+ Consumer consumer = consumerAndReceivedMessages1.consumer;
+ MessageIdImpl individualPosition =
messageIds[1].get(messageCountPerEntry - 1);
+ MessageIdImpl expectedMarkDeletedPosition =
+ new MessageIdImpl(messageIds[0].get(0).getLedgerId(),
messageIds[0].get(0).getEntryId(), -1);
+ MessageIdImpl lastPosition =
+ new MessageIdImpl(messageIds[2].get(4).getLedgerId(),
messageIds[2].get(4).getEntryId(), -1);
+ consumer.acknowledge(individualPosition);
+ consumer.acknowledge(expectedMarkDeletedPosition);
+ waitPersistentCursorLedger(topicName, subName,
expectedMarkDeletedPosition.getLedgerId(),
+ expectedMarkDeletedPosition.getEntryId());
+ consumer.close();
+
+ log.info("Make lost ledger [{}].", individualPosition.getLedgerId());
+ pulsar.getBrokerService().getTopic(topicName,
false).get().get().close(false);
+ mockBookKeeper.deleteLedger(individualPosition.getLedgerId());
+
+ log.info("send some messages.");
+ sendManyMessages(topicName, 3, messageCountPerEntry);
+
+ log.info("receive all messages then verify mark deleted position");
+ ConsumerAndReceivedMessages consumerAndReceivedMessages2 =
+ waitConsumeAndAllMessages(topicName, subName, enabledBatch,
true);
+ waitMarkDeleteLargeAndEquals(topicName, subName,
lastPosition.getLedgerId(), lastPosition.getEntryId());
+
+ // cleanup
+ consumerAndReceivedMessages2.consumer.close();
+ admin.topics().delete(topicName);
+ }
+
+ private ManagedCursorImpl getCursor(String topicName, String subName)
throws Exception {
+ PersistentSubscription subscription_ =
+ (PersistentSubscription)
pulsar.getBrokerService().getTopic(topicName, false)
+ .get().get().getSubscription(subName);
+ return (ManagedCursorImpl) subscription_.getCursor();
+ }
+
+ private void waitMarkDeleteLargeAndEquals(String topicName, String
subName, final long markDeletedLedgerId,
+ final long markDeletedEntryId)
throws Exception {
+ Awaitility.await().atMost(Duration.ofSeconds(45)).untilAsserted(() -> {
+ Position persistentMarkDeletedPosition = getCursor(topicName,
subName).getMarkDeletedPosition();
+ log.info("markDeletedPosition {}:{}, expected {}:{}",
persistentMarkDeletedPosition.getLedgerId(),
+ persistentMarkDeletedPosition.getEntryId(),
markDeletedLedgerId, markDeletedEntryId);
+ Assert.assertTrue(persistentMarkDeletedPosition.getLedgerId() >=
markDeletedLedgerId);
+ if (persistentMarkDeletedPosition.getLedgerId() >
markDeletedLedgerId){
+ return;
+ }
+ Assert.assertTrue(persistentMarkDeletedPosition.getEntryId() >=
markDeletedEntryId);
+ });
+ }
+
+ private void waitPersistentCursorLedger(String topicName, String subName,
final long markDeletedLedgerId,
+ final long markDeletedEntryId)
throws Exception {
+ Awaitility.await().untilAsserted(() -> {
+ Position persistentMarkDeletedPosition = getCursor(topicName,
subName).getPersistentMarkDeletedPosition();
+ Assert.assertEquals(persistentMarkDeletedPosition.getLedgerId(),
markDeletedLedgerId);
+ Assert.assertEquals(persistentMarkDeletedPosition.getEntryId(),
markDeletedEntryId);
+ });
+ }
+
+ private List<MessageIdImpl>[] sendManyMessages(String topicName, int
ledgerCount, int messageCountPerLedger,
+ int messageCountPerEntry)
throws Exception {
+ List<MessageIdImpl>[] messageIds = new List[ledgerCount];
+ for (int i = 0; i < ledgerCount; i++){
+ admin.topics().unload(topicName);
+ if (messageCountPerEntry == 1) {
+ messageIds[i] = sendManyMessages(topicName,
messageCountPerLedger);
+ } else {
+ messageIds[i] = sendManyBatchedMessages(topicName,
messageCountPerEntry,
+ messageCountPerLedger / messageCountPerEntry);
+ }
+ }
+ return messageIds;
+ }
+
+ private List<MessageIdImpl> sendManyMessages(String topicName, int
messageCountPerLedger,
+ int messageCountPerEntry)
throws Exception {
+ if (messageCountPerEntry == 1) {
+ return sendManyMessages(topicName, messageCountPerLedger);
+ } else {
+ return sendManyBatchedMessages(topicName, messageCountPerEntry,
+ messageCountPerLedger / messageCountPerEntry);
+ }
+ }
+
+ private List<MessageIdImpl> sendManyMessages(String topicName, int
msgCount) throws Exception {
+ List<MessageIdImpl> messageIdList = new ArrayList<>();
+ final Producer<String> producer =
pulsarClient.newProducer(Schema.JSON(String.class))
+ .topic(topicName)
+ .enableBatching(false)
+ .create();
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < msgCount; i++){
+ String messageSuffix = String.format("%s-%s", timestamp, i);
+ MessageIdImpl messageIdSent = (MessageIdImpl) producer.newMessage()
+ .key(String.format("Key-%s", messageSuffix))
+ .value(String.format("Msg-%s", messageSuffix))
+ .send();
+ messageIdList.add(messageIdSent);
+ }
+ producer.close();
+ return messageIdList;
+ }
+
+ private List<MessageIdImpl> sendManyBatchedMessages(String topicName, int
msgCountPerEntry, int entryCount)
+ throws Exception {
+ Producer<String> producer =
pulsarClient.newProducer(Schema.JSON(String.class))
+ .topic(topicName)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(Integer.MAX_VALUE, TimeUnit.SECONDS)
+ .batchingMaxMessages(Integer.MAX_VALUE)
+ .create();
+ List<CompletableFuture<MessageId>> messageIdFutures = new
ArrayList<>();
+ for (int i = 0; i < entryCount; i++){
+ for (int j = 0; j < msgCountPerEntry; j++){
+ CompletableFuture<MessageId> messageIdFuture =
+
producer.newMessage().value(String.format("entry-seq[%s], batch_index[%s]", i,
j)).sendAsync();
+ messageIdFutures.add(messageIdFuture);
+ }
+ producer.flush();
+ }
+ producer.close();
+ FutureUtil.waitForAll(messageIdFutures).get();
+ return messageIdFutures.stream().map(f ->
(MessageIdImpl)f.join()).collect(Collectors.toList());
+ }
+
+ private ConsumerAndReceivedMessages waitConsumeAndAllMessages(String
topicName, String subName,
+ final boolean
enabledBatch,
+ boolean ack)
throws Exception {
+ List<MessageIdImpl> messageIds = new ArrayList<>();
+ final Consumer consumer = createConsumer(topicName, subName,
enabledBatch);
+ while (true){
+ Message message = consumer.receive(5, TimeUnit.SECONDS);
+ if (message != null){
+ messageIds.add((MessageIdImpl) message.getMessageId());
+ if (ack) {
+ consumer.acknowledge(message);
+ }
+ } else {
+ break;
+ }
+ }
+ log.info("receive {} messages", messageIds.size());
+ return new ConsumerAndReceivedMessages(consumer,
sortMessageId(messageIds, enabledBatch));
+ }
+
+ @AllArgsConstructor
+ private static class ConsumerAndReceivedMessages {
+ private Consumer consumer;
+ private List<MessageIdImpl>[] messageIds;
+ }
+
+ private List<MessageIdImpl>[] sortMessageId(List<MessageIdImpl>
messageIds, boolean enabledBatch){
+ Map<Long, List<MessageIdImpl>> map =
messageIds.stream().collect(Collectors.groupingBy(v -> v.getLedgerId()));
+ TreeMap<Long, List<MessageIdImpl>> sortedMap = new TreeMap<>(map);
+ List<MessageIdImpl>[] res = new List[sortedMap.size()];
+ Iterator<Map.Entry<Long, List<MessageIdImpl>>> iterator =
sortedMap.entrySet().iterator();
+ for (int i = 0; i < sortedMap.size(); i++){
+ res[i] = iterator.next().getValue();
+ }
+ for (List<MessageIdImpl> list : res){
+ list.sort((m1, m2) -> {
+ if (enabledBatch){
+ BatchMessageIdImpl mb1 = (BatchMessageIdImpl) m1;
+ BatchMessageIdImpl mb2 = (BatchMessageIdImpl) m2;
+ return (int) (mb1.getLedgerId() * 1000000 +
mb1.getEntryId() * 1000 + mb1.getBatchIndex() -
+ mb2.getLedgerId() * 1000000 + mb2.getEntryId() *
1000 + mb2.getBatchIndex());
+ }
+ return (int) (m1.getLedgerId() * 1000 + m1.getEntryId() -
+ m2.getLedgerId() * 1000 + m2.getEntryId());
+ });
+ }
+ return res;
+ }
+
+ private Consumer<String> createConsumer(String topicName, String subName,
boolean enabledBatch) throws Exception {
+ final Consumer<String> consumer =
pulsarClient.newConsumer(Schema.JSON(String.class))
+ .subscriptionType(SubscriptionType.Failover)
+ .isAckReceiptEnabled(true)
+ .enableBatchIndexAcknowledgment(enabledBatch)
+ .receiverQueueSize(1000)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscribe();
+ return consumer;
+ }
+}