This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 57b0ca403a6 [fix] [log] Do not print warn log when concurrently 
publishing and switching ledgers (#23209)
57b0ca403a6 is described below

commit 57b0ca403a66b805745aed2f9c1813ce937b433b
Author: fengyubiao <[email protected]>
AuthorDate: Thu Aug 22 11:13:55 2024 +0800

    [fix] [log] Do not print warn log when concurrently publishing and 
switching ledgers (#23209)
    
    (cherry picked from commit 0a5cb51a2f010d6771ae0ae0fd259d002cca20da)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  5 +-
 ...impleProducerConsumerMLInitializeDelayTest.java | 70 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 3a97fa32701..d780e16f98b 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1681,8 +1681,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 if (existsOp.ledger != null) {
                     existsOp = 
existsOp.duplicateAndClose(currentLedgerTimeoutTriggered);
                 } else {
-                    // This scenario should not happen.
-                    log.warn("[{}] An OpAddEntry's ledger is empty.", name);
+                    // It may happen when the following operations execute at 
the same time, so it is expected.
+                    // - Adding entry.
+                    // - Switching ledger.
                     
existsOp.setTimeoutTriggered(currentLedgerTimeoutTriggered);
                 }
                 existsOp.setLedger(currentLedger);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
index ab4e063ae3d..7c7665a5bd3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
@@ -20,12 +20,19 @@ package org.apache.pulsar.client.api;
 
 import com.carrotsearch.hppc.ObjectSet;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.awaitility.Awaitility;
 import org.awaitility.reflect.WhiteboxImpl;
@@ -105,4 +112,67 @@ public class SimpleProducerConsumerMLInitializeDelayTest 
extends ProducerConsume
         // cleanup.
         client.close();
     }
+
+    @Test(timeOut = 30 * 1000)
+    public void testConcurrentlyOfPublishAndSwitchLedger() throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String subscription = "s1";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subscription, 
MessageId.earliest);
+        // Make ledger switches faster.
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        ManagedLedgerConfig config = 
persistentTopic.getManagedLedger().getConfig();
+        config.setMaxEntriesPerLedger(2);
+        config.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        // Inject a delay for switching ledgers, so publishing requests will 
be push in to the pending queue.
+        AtomicInteger delayTimes = new AtomicInteger();
+        mockZooKeeper.delay(10, (op, s) -> {
+            if (op.toString().equals("SET") && 
s.contains(TopicName.get(topicName).getPersistenceNamingEncoding())) {
+                return delayTimes.incrementAndGet() == 1;
+            }
+            return false;
+        });
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false)
+                .create();
+        List<CompletableFuture<MessageId>> sendRequests = new ArrayList<>();
+        List<String> msgsSent = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            String msg = i + "";
+            sendRequests.add(producer.sendAsync(i + ""));
+            msgsSent.add(msg);
+        }
+        // Verify:
+        // - All messages were sent.
+        // - The order of messages are correct.
+        Set<String> msgIds = new LinkedHashSet<>();
+        MessageIdImpl previousMsgId = null;
+        for (CompletableFuture<MessageId> msgId : sendRequests) {
+            Assert.assertNotNull(msgId.join());
+            MessageIdImpl messageIdImpl = (MessageIdImpl) msgId.join();
+            if (previousMsgId != null) {
+                Assert.assertTrue(messageIdImpl.compareTo(previousMsgId) > 0);
+            }
+            msgIds.add(String.format("%s:%s", messageIdImpl.getLedgerId(), 
messageIdImpl.getEntryId()));
+            previousMsgId = messageIdImpl;
+        }
+        Assert.assertEquals(msgIds.size(), 100);
+        log.info("messages were sent: {}", msgIds.toString());
+        List<String> msgsReceived = new ArrayList<>();
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                .subscriptionName(subscription).subscribe();
+        while (true) {
+            Message<String> receivedMsg = consumer.receive(2, 
TimeUnit.SECONDS);
+            if (receivedMsg == null) {
+                break;
+            }
+            msgsReceived.add(receivedMsg.getValue());
+        }
+        Assert.assertEquals(msgsReceived, msgsSent);
+
+        // cleanup.
+        consumer.close();
+        producer.close();
+        admin.topics().delete(topicName);
+    }
 }

Reply via email to