This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 3cc9001da99 [fix][broker] If ledger lost, cursor mark delete position 
can not forward (#18620)
3cc9001da99 is described below

commit 3cc9001da99078c41df2e541b971ea29ad801de6
Author: fengyubiao <[email protected]>
AuthorDate: Mon May 29 14:46:38 2023 +0800

    [fix][broker] If ledger lost, cursor mark delete position can not forward 
(#18620)
    
    Motivation:
    Configuration `autoSkipNonRecoverableData` is designed to turn this feature 
on if we can accept partial data loss. When a ledger is lost, the broker will 
still work. But now we have this problem: If a ledger is lost, consumer and 
producer can work, but the cursor mark delete position can not forward.
    
    Modifications:
    - When an unrecoverable ledger is found, remove the records 
in`individualDeletedMessages` and `batchDeletedIndexes`.
    - When the managed cursor is recovered, check whether there are invalid 
records in `individualDeletedMessages` and `batchDeletedIndexes` and print a 
warning log.
    
    (cherry picked from commit ab810f4f59dd7d6c8b5313ceb334873a7d2cde31)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |   6 +
 .../apache/bookkeeper/mledger/ManagedLedger.java   |   6 +
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  40 +++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   7 +
 .../bookkeeper/mledger/impl/OpReadEntry.java       |   4 +
 .../LedgerLostAndSkipNonRecoverableTest.java       | 295 +++++++++++++++++++++
 6 files changed, 358 insertions(+)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 46ca0f14003..3012706891e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -681,6 +681,12 @@ public interface ManagedCursor {
      */
     long getEstimatedSizeSinceMarkDeletePosition();
 
+    /**
+     * If a ledger is lost, this ledger will be skipped after enabled 
"autoSkipNonRecoverableData", and the method is
+     * used to delete information about this ledger in the ManagedCursor.
+     */
+    default void skipNonRecoverableLedger(long ledgerId){}
+
     /**
      * Returns cursor throttle mark-delete rate.
      *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 497cbeb171c..b8785858661 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -623,6 +623,12 @@ public interface ManagedLedger {
      */
     void trimConsumedLedgersInBackground(CompletableFuture<?> promise);
 
+    /**
+     * If a ledger is lost, this ledger will be skipped after enabled 
"autoSkipNonRecoverableData", and the method is
+     * used to delete information about this ledger in the ManagedCursor.
+     */
+    default void skipNonRecoverableLedger(long ledgerId){}
+
     /**
      * Roll current ledger if it is full.
      */
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 011d3df77f1..636577992cd 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2555,6 +2555,46 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
     }
 
+    /**
+     * Manually acknowledge all entries in the lost ledger.
+     * - Since this is an uncommon event, we focus on maintainability. So we 
do not modify
+     *   {@link #individualDeletedMessages} and {@link #batchDeletedIndexes}, 
but call
+     *   {@link #asyncDelete(Position, AsyncCallbacks.DeleteCallback, Object)}.
+     * - This method is valid regardless of the consumer ACK type.
+     * - If there is a consumer ack request after this event, it will also 
work.
+     */
+    @Override
+    public void skipNonRecoverableLedger(final long ledgerId){
+        LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+        if (ledgerInfo == null) {
+            return;
+        }
+        lock.writeLock().lock();
+        log.warn("[{}] [{}] Since the ledger [{}] is lost and the 
autoSkipNonRecoverableData is true, this ledger will"
+                + " be auto acknowledge in subscription", ledger.getName(), 
name, ledgerId);
+        try {
+            for (int i = 0; i < ledgerInfo.getEntries(); i++) {
+                if (!individualDeletedMessages.contains(ledgerId, i)) {
+                    asyncDelete(PositionImpl.get(ledgerId, i), new 
AsyncCallbacks.DeleteCallback() {
+                        @Override
+                        public void deleteComplete(Object ctx) {
+                            // ignore.
+                        }
+
+                        @Override
+                        public void deleteFailed(ManagedLedgerException ex, 
Object ctx) {
+                            // The method internalMarkDelete already handled 
the failure operation. We only need to
+                            // make sure the memory state is updated.
+                            // If the broker crashed, the non-recoverable 
ledger will be detected again.
+                        }
+                    }, null);
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
     // //////////////////////////////////////////////////
 
     void startCreatingNewMetadataLedger() {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index bcb607cef43..56b2e8f0900 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1689,6 +1689,13 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
+    @Override
+    public void skipNonRecoverableLedger(long ledgerId){
+        for (ManagedCursor managedCursor : cursors) {
+            managedCursor.skipNonRecoverableLedger(ledgerId);
+        }
+    }
+
     synchronized void createLedgerAfterClosed() {
         if (isNeededCreateNewLedgerAfterCloseLedger()) {
             log.info("[{}] Creating a new ledger after closed", name);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index a805802e633..b8018db511a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -103,6 +103,7 @@ class OpReadEntry implements ReadEntriesCallback {
             // try to find and move to next valid ledger
             final Position nexReadPosition = 
cursor.getNextLedgerPosition(readPosition.getLedgerId());
             // fail callback if it couldn't find next valid ledger
+            Long lostLedger = readPosition.ledgerId;
             if (nexReadPosition == null) {
                 callback.readEntriesFailed(exception, ctx);
                 cursor.ledger.mbean.recordReadEntriesError();
@@ -110,6 +111,9 @@ class OpReadEntry implements ReadEntriesCallback {
                 return;
             }
             updateReadPosition(nexReadPosition);
+            if (lostLedger < nexReadPosition.getLedgerId()) {
+                cursor.getManagedLedger().skipNonRecoverableLedger(lostLedger);
+            }
             checkReadCompletion();
         } else {
             if (!(exception instanceof TooManyRequestsException)) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/LedgerLostAndSkipNonRecoverableTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/LedgerLostAndSkipNonRecoverableTest.java
new file mode 100644
index 00000000000..2cab6c5e761
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/LedgerLostAndSkipNonRecoverableTest.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class LedgerLostAndSkipNonRecoverableTest extends ProducerConsumerBase {
+
+    private static final String DEFAULT_NAMESPACE = "my-property/my-ns";
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    protected void doInitConf() throws Exception {
+        conf.setAutoSkipNonRecoverableData(true);
+    }
+
+    @DataProvider(name = "batchEnabled")
+    public Object[][] batchEnabled(){
+        return new Object[][]{
+                {true},
+                {false}
+        };
+    }
+
+    @Test(timeOut = 30000, dataProvider = "batchEnabled")
+    public void testMarkDeletedPositionCanForwardAfterTopicLedgerLost(boolean 
enabledBatch) throws Exception {
+        String topicSimpleName = UUID.randomUUID().toString().replaceAll("-", 
"");
+        String subName = UUID.randomUUID().toString().replaceAll("-", "");
+        String topicName = String.format("persistent://%s/%s", 
DEFAULT_NAMESPACE, topicSimpleName);
+
+        log.info("create topic and subscription.");
+        Consumer sub = createConsumer(topicName, subName, enabledBatch);
+        sub.redeliverUnacknowledgedMessages();
+        sub.close();
+
+        log.info("send many messages.");
+        int ledgerCount = 3;
+        int messageCountPerLedger = enabledBatch ? 25 : 5;
+        int messageCountPerEntry = enabledBatch ? 5 : 1;
+        List<MessageIdImpl>[] sendMessages =
+                sendManyMessages(topicName, ledgerCount, 
messageCountPerLedger, messageCountPerEntry);
+        int sendMessageCount = Arrays.asList(sendMessages).stream()
+                .flatMap(s -> s.stream()).collect(Collectors.toList()).size();
+        log.info("send {} messages", sendMessageCount);
+
+        log.info("make individual ack.");
+        ConsumerAndReceivedMessages consumerAndReceivedMessages1 =
+                waitConsumeAndAllMessages(topicName, subName, 
enabledBatch,false);
+        List<MessageIdImpl>[] messageIds = 
consumerAndReceivedMessages1.messageIds;
+        Consumer consumer = consumerAndReceivedMessages1.consumer;
+        MessageIdImpl individualPosition = 
messageIds[1].get(messageCountPerEntry - 1);
+        MessageIdImpl expectedMarkDeletedPosition =
+                new MessageIdImpl(messageIds[0].get(0).getLedgerId(), 
messageIds[0].get(0).getEntryId(), -1);
+        MessageIdImpl lastPosition =
+                new MessageIdImpl(messageIds[2].get(4).getLedgerId(), 
messageIds[2].get(4).getEntryId(), -1);
+        consumer.acknowledge(individualPosition);
+        consumer.acknowledge(expectedMarkDeletedPosition);
+        waitPersistentCursorLedger(topicName, subName, 
expectedMarkDeletedPosition.getLedgerId(),
+                expectedMarkDeletedPosition.getEntryId());
+        consumer.close();
+
+        log.info("Make lost ledger [{}].", individualPosition.getLedgerId());
+        pulsar.getBrokerService().getTopic(topicName, 
false).get().get().close(false);
+        mockBookKeeper.deleteLedger(individualPosition.getLedgerId());
+
+        log.info("send some messages.");
+        sendManyMessages(topicName, 3, messageCountPerEntry);
+
+        log.info("receive all messages then verify mark deleted position");
+        ConsumerAndReceivedMessages consumerAndReceivedMessages2 =
+                waitConsumeAndAllMessages(topicName, subName, enabledBatch, 
true);
+        waitMarkDeleteLargeAndEquals(topicName, subName, 
lastPosition.getLedgerId(), lastPosition.getEntryId());
+
+        // cleanup
+        consumerAndReceivedMessages2.consumer.close();
+        admin.topics().delete(topicName);
+    }
+
+    private ManagedCursorImpl getCursor(String topicName, String subName) 
throws Exception {
+        PersistentSubscription subscription_ =
+                (PersistentSubscription) 
pulsar.getBrokerService().getTopic(topicName, false)
+                        .get().get().getSubscription(subName);
+        return  (ManagedCursorImpl) subscription_.getCursor();
+    }
+
+    private void waitMarkDeleteLargeAndEquals(String topicName, String 
subName, final long markDeletedLedgerId,
+                                            final long markDeletedEntryId) 
throws Exception {
+        Awaitility.await().atMost(Duration.ofSeconds(45)).untilAsserted(() -> {
+            Position persistentMarkDeletedPosition = getCursor(topicName, 
subName).getMarkDeletedPosition();
+            log.info("markDeletedPosition {}:{}, expected {}:{}", 
persistentMarkDeletedPosition.getLedgerId(),
+                    persistentMarkDeletedPosition.getEntryId(), 
markDeletedLedgerId, markDeletedEntryId);
+            Assert.assertTrue(persistentMarkDeletedPosition.getLedgerId() >= 
markDeletedLedgerId);
+            if (persistentMarkDeletedPosition.getLedgerId() > 
markDeletedLedgerId){
+                return;
+            }
+            Assert.assertTrue(persistentMarkDeletedPosition.getEntryId() >= 
markDeletedEntryId);
+        });
+    }
+
+    private void waitPersistentCursorLedger(String topicName, String subName, 
final long markDeletedLedgerId,
+                                            final long markDeletedEntryId) 
throws Exception {
+        Awaitility.await().untilAsserted(() -> {
+            Position persistentMarkDeletedPosition = getCursor(topicName, 
subName).getPersistentMarkDeletedPosition();
+            Assert.assertEquals(persistentMarkDeletedPosition.getLedgerId(), 
markDeletedLedgerId);
+            Assert.assertEquals(persistentMarkDeletedPosition.getEntryId(), 
markDeletedEntryId);
+        });
+    }
+
+    private List<MessageIdImpl>[] sendManyMessages(String topicName, int 
ledgerCount, int messageCountPerLedger,
+                                                   int messageCountPerEntry) 
throws Exception {
+        List<MessageIdImpl>[] messageIds = new List[ledgerCount];
+        for (int i = 0; i < ledgerCount; i++){
+            admin.topics().unload(topicName);
+            if (messageCountPerEntry == 1) {
+                messageIds[i] = sendManyMessages(topicName, 
messageCountPerLedger);
+            } else {
+                messageIds[i] = sendManyBatchedMessages(topicName, 
messageCountPerEntry,
+                        messageCountPerLedger / messageCountPerEntry);
+            }
+        }
+        return messageIds;
+    }
+
+    private List<MessageIdImpl> sendManyMessages(String topicName, int 
messageCountPerLedger,
+                                                   int messageCountPerEntry) 
throws Exception {
+        if (messageCountPerEntry == 1) {
+            return sendManyMessages(topicName, messageCountPerLedger);
+        } else {
+            return sendManyBatchedMessages(topicName, messageCountPerEntry,
+                    messageCountPerLedger / messageCountPerEntry);
+        }
+    }
+
+    private List<MessageIdImpl> sendManyMessages(String topicName, int 
msgCount) throws Exception {
+        List<MessageIdImpl> messageIdList = new ArrayList<>();
+        final Producer<String> producer = 
pulsarClient.newProducer(Schema.JSON(String.class))
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+        long timestamp = System.currentTimeMillis();
+        for (int i = 0; i < msgCount; i++){
+            String messageSuffix = String.format("%s-%s", timestamp, i);
+            MessageIdImpl messageIdSent = (MessageIdImpl) producer.newMessage()
+                    .key(String.format("Key-%s", messageSuffix))
+                    .value(String.format("Msg-%s", messageSuffix))
+                    .send();
+            messageIdList.add(messageIdSent);
+        }
+        producer.close();
+        return messageIdList;
+    }
+
+    private List<MessageIdImpl> sendManyBatchedMessages(String topicName, int 
msgCountPerEntry, int entryCount)
+            throws Exception {
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.JSON(String.class))
+                .topic(topicName)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(Integer.MAX_VALUE, TimeUnit.SECONDS)
+                .batchingMaxMessages(Integer.MAX_VALUE)
+                .create();
+        List<CompletableFuture<MessageId>> messageIdFutures = new 
ArrayList<>();
+        for (int i = 0; i < entryCount; i++){
+            for (int j = 0; j < msgCountPerEntry; j++){
+                CompletableFuture<MessageId> messageIdFuture =
+                        
producer.newMessage().value(String.format("entry-seq[%s], batch_index[%s]", i, 
j)).sendAsync();
+                messageIdFutures.add(messageIdFuture);
+            }
+            producer.flush();
+        }
+        producer.close();
+        FutureUtil.waitForAll(messageIdFutures).get();
+        return messageIdFutures.stream().map(f -> 
(MessageIdImpl)f.join()).collect(Collectors.toList());
+    }
+
+    private ConsumerAndReceivedMessages waitConsumeAndAllMessages(String 
topicName, String subName,
+                                                            final boolean 
enabledBatch,
+                                                            boolean ack) 
throws Exception {
+        List<MessageIdImpl> messageIds = new ArrayList<>();
+        final Consumer consumer = createConsumer(topicName, subName, 
enabledBatch);
+        while (true){
+            Message message = consumer.receive(5, TimeUnit.SECONDS);
+            if (message != null){
+                messageIds.add((MessageIdImpl) message.getMessageId());
+                if (ack) {
+                    consumer.acknowledge(message);
+                }
+            } else {
+                break;
+            }
+        }
+        log.info("receive {} messages", messageIds.size());
+        return new ConsumerAndReceivedMessages(consumer, 
sortMessageId(messageIds, enabledBatch));
+    }
+
+    @AllArgsConstructor
+    private static class ConsumerAndReceivedMessages {
+        private Consumer consumer;
+        private List<MessageIdImpl>[] messageIds;
+    }
+
+    private List<MessageIdImpl>[] sortMessageId(List<MessageIdImpl> 
messageIds, boolean enabledBatch){
+        Map<Long, List<MessageIdImpl>> map = 
messageIds.stream().collect(Collectors.groupingBy(v -> v.getLedgerId()));
+        TreeMap<Long, List<MessageIdImpl>> sortedMap = new TreeMap<>(map);
+        List<MessageIdImpl>[] res = new List[sortedMap.size()];
+        Iterator<Map.Entry<Long, List<MessageIdImpl>>> iterator = 
sortedMap.entrySet().iterator();
+        for (int i = 0; i < sortedMap.size(); i++){
+            res[i] = iterator.next().getValue();
+        }
+        for (List<MessageIdImpl> list : res){
+            list.sort((m1, m2) -> {
+                if (enabledBatch){
+                    BatchMessageIdImpl mb1 = (BatchMessageIdImpl) m1;
+                    BatchMessageIdImpl mb2 = (BatchMessageIdImpl) m2;
+                    return (int) (mb1.getLedgerId() * 1000000 + 
mb1.getEntryId() * 1000 + mb1.getBatchIndex() -
+                            mb2.getLedgerId() * 1000000 + mb2.getEntryId() * 
1000 + mb2.getBatchIndex());
+                }
+                return (int) (m1.getLedgerId() * 1000 + m1.getEntryId() -
+                        m2.getLedgerId() * 1000 + m2.getEntryId());
+            });
+        }
+        return res;
+    }
+
+    private Consumer<String> createConsumer(String topicName, String subName, 
boolean enabledBatch) throws Exception {
+        final Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.JSON(String.class))
+                .subscriptionType(SubscriptionType.Failover)
+                .isAckReceiptEnabled(true)
+                .enableBatchIndexAcknowledgment(enabledBatch)
+                .receiverQueueSize(1000)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscribe();
+        return consumer;
+    }
+}

Reply via email to