This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new f7c540e7987 [fix] [log] Do not print warn log when concurrently
publishing and switching ledgers (#23209)
f7c540e7987 is described below
commit f7c540e79873dff6dc447dcc0f3b814000743ec3
Author: fengyubiao <[email protected]>
AuthorDate: Thu Aug 22 11:13:55 2024 +0800
[fix] [log] Do not print warn log when concurrently publishing and
switching ledgers (#23209)
(cherry picked from commit 0a5cb51a2f010d6771ae0ae0fd259d002cca20da)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 +-
...impleProducerConsumerMLInitializeDelayTest.java | 70 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 78e844a2457..3f327152586 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1679,8 +1679,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (existsOp.ledger != null) {
existsOp =
existsOp.duplicateAndClose(currentLedgerTimeoutTriggered);
} else {
- // This scenario should not happen.
- log.warn("[{}] An OpAddEntry's ledger is empty.", name);
+ // It may happen when the following operations execute at
the same time, so it is expected.
+ // - Adding entry.
+ // - Switching ledger.
existsOp.setTimeoutTriggered(currentLedgerTimeoutTriggered);
}
existsOp.setLedger(currentLedger);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
index ab4e063ae3d..7c7665a5bd3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
@@ -20,12 +20,19 @@ package org.apache.pulsar.client.api;
import com.carrotsearch.hppc.ObjectSet;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
@@ -105,4 +112,67 @@ public class SimpleProducerConsumerMLInitializeDelayTest
extends ProducerConsume
// cleanup.
client.close();
}
+
+ @Test(timeOut = 30 * 1000)
+ public void testConcurrentlyOfPublishAndSwitchLedger() throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ final String subscription = "s1";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subscription,
MessageId.earliest);
+ // Make ledger switches faster.
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ ManagedLedgerConfig config =
persistentTopic.getManagedLedger().getConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ // Inject a delay for switching ledgers, so publishing requests will
be push in to the pending queue.
+ AtomicInteger delayTimes = new AtomicInteger();
+ mockZooKeeper.delay(10, (op, s) -> {
+ if (op.toString().equals("SET") &&
s.contains(TopicName.get(topicName).getPersistenceNamingEncoding())) {
+ return delayTimes.incrementAndGet() == 1;
+ }
+ return false;
+ });
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false)
+ .create();
+ List<CompletableFuture<MessageId>> sendRequests = new ArrayList<>();
+ List<String> msgsSent = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ String msg = i + "";
+ sendRequests.add(producer.sendAsync(i + ""));
+ msgsSent.add(msg);
+ }
+ // Verify:
+ // - All messages were sent.
+ // - The order of messages are correct.
+ Set<String> msgIds = new LinkedHashSet<>();
+ MessageIdImpl previousMsgId = null;
+ for (CompletableFuture<MessageId> msgId : sendRequests) {
+ Assert.assertNotNull(msgId.join());
+ MessageIdImpl messageIdImpl = (MessageIdImpl) msgId.join();
+ if (previousMsgId != null) {
+ Assert.assertTrue(messageIdImpl.compareTo(previousMsgId) > 0);
+ }
+ msgIds.add(String.format("%s:%s", messageIdImpl.getLedgerId(),
messageIdImpl.getEntryId()));
+ previousMsgId = messageIdImpl;
+ }
+ Assert.assertEquals(msgIds.size(), 100);
+ log.info("messages were sent: {}", msgIds.toString());
+ List<String> msgsReceived = new ArrayList<>();
+ Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+ .subscriptionName(subscription).subscribe();
+ while (true) {
+ Message<String> receivedMsg = consumer.receive(2,
TimeUnit.SECONDS);
+ if (receivedMsg == null) {
+ break;
+ }
+ msgsReceived.add(receivedMsg.getValue());
+ }
+ Assert.assertEquals(msgsReceived, msgsSent);
+
+ // cleanup.
+ consumer.close();
+ producer.close();
+ admin.topics().delete(topicName);
+ }
}