This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch 2.10.4/19384
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4b98829939c8083ed5e1090d7996288931fba059
Author: Tao Jiuming <[email protected]>
AuthorDate: Mon Feb 6 15:32:59 2023 +0800

    [fix] Close TransactionBuffer when create persistent topic timeout (#19384)
    
    (cherry picked from commit 8eb7ee1e9e96d4686e452320cdaba92d5eca7b4f)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  6 ----
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  4 +--
 .../pulsar/broker/service/BrokerService.java       |  7 ++++
 .../apache/pulsar/broker/service/ServerCnx.java    |  3 +-
 .../broker/admin/IncrementPartitionsTest.java      |  3 +-
 .../pulsar/broker/transaction/TransactionTest.java |  1 +
 .../buffer/TopicTransactionBufferTest.java         | 38 ++++++++++++++++++++++
 7 files changed, 51 insertions(+), 11 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 09322382a46..71948f547c2 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -204,12 +204,6 @@ public class ManagedCursorImpl implements ManagedCursor {
     // active state cache in ManagedCursor. It should be in sync with the 
state in activeCursors in ManagedLedger.
     private volatile boolean isActive = false;
 
-    // active state cache in ManagedCursor. It should be in sync with the 
state in activeCursors in ManagedLedger.
-    private volatile boolean isActive = false;
-
-    // active state cache in ManagedCursor. It should be in sync with the 
state in activeCursors in ManagedLedger.
-    private volatile boolean isActive = false;
-
     class MarkDeleteEntry {
         final PositionImpl newPosition;
         final MarkDeleteCallback callback;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a05b6e451ac..162240b44f0 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -215,9 +215,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     private final CallbackMutex offloadMutex = new CallbackMutex();
     private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE 
= CompletableFuture
             .completedFuture(PositionImpl.LATEST);
-    private volatile LedgerHandle currentLedger;
+    protected volatile LedgerHandle currentLedger;
     private long currentLedgerEntries = 0;
-    private long currentLedgerSize = 0;
+    protected long currentLedgerSize = 0;
     private long lastLedgerCreatedTimestamp = 0;
     private long lastLedgerCreationFailureTimestamp = 0;
     private long lastLedgerCreationInitiationTimestamp = 0;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 61eff24a78f..0ea95995f4b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1457,8 +1457,15 @@ public class BrokerService implements Closeable {
                                                                         - 
topicCreateTimeMs;
                                             
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
                                             if 
(topicFuture.isCompletedExceptionally()) {
+                                                // Check create persistent 
topic timeout.
                                                 log.warn("{} future is already 
completed with failure {}, closing the"
                                                         + " topic", topic, 
FutureUtil.getException(topicFuture));
+                                                
persistentTopic.getTransactionBuffer()
+                                                        .closeAsync()
+                                                        .exceptionally(t -> {
+                                                            log.error("[{}] 
Close transactionBuffer failed", topic, t);
+                                                            return null;
+                                                        });
                                                 
persistentTopic.stopReplProducers()
                                                         .whenCompleteAsync((v, 
exception) -> {
                                                             
topics.remove(topic, topicFuture);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1f08db86aaf..6cc89dfe030 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -134,7 +134,6 @@ import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.intercept.InterceptException;
 import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
@@ -1261,7 +1260,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     // Check whether the producer will publish encrypted 
messages or not
                     if ((topic.isEncryptionRequired() || 
encryptionRequireOnProducer)
                             && !isEncrypted
-                            && !SystemTopicNames.isSystemTopic(topicName)) {
+                            && 
!topic.getBrokerService().isSystemTopic(topicName)) {
                         String msg = String.format("Encryption is required in 
%s", topicName);
                         log.warn("[{}] {}", remoteAddress, msg);
                         if (producerFuture.completeExceptionally(new 
ServerMetadataException(msg))) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
index e156678a394..6d1444fa3a4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
@@ -136,7 +136,8 @@ public class IncrementPartitionsTest extends 
MockedPulsarServiceBaseTest {
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 20);
 
         assertEquals(admin.topics().getSubscriptions(
-                
TopicName.get(partitionedTopicName).getPartition(15).toString()), 
List.of("sub-1"));
+                
TopicName.get(partitionedTopicName).getPartition(15).toString()),
+                Lists.newArrayList("sub-1"));
         TopicStats stats = admin.topics()
                 
.getStats(TopicName.get(partitionedTopicName).getPartition(15).toString());
         Map<String, String> subscriptionProperties = stats.getSubscriptions()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index a2c6e7dbafc..2da7502dc25 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -39,6 +39,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.util.Timeout;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 1cf190056af..ddb9af94f56 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -23,6 +23,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -42,8 +43,11 @@ import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -138,4 +142,38 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
         Assert.assertEquals(ttb.getState(), expectState);
     }
 
+
+    @Test
+    public void testCloseTransactionBufferWhenTimeout() throws Exception {
+        String topic = "persistent://" + NAMESPACE1 + "/test_" + 
UUID.randomUUID();
+        PulsarService pulsar = pulsarServiceList.get(0);
+        BrokerService brokerService0 = pulsar.getBrokerService();
+        BrokerService brokerService = Mockito.spy(brokerService0);
+        AtomicReference<PersistentTopic> reference = new AtomicReference<>();
+        pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10);
+        long topicLoadTimeout = 
TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()
 + 1);
+
+        Mockito
+                .doAnswer(inv -> {
+                    Thread.sleep(topicLoadTimeout);
+                    PersistentTopic persistentTopic = (PersistentTopic) 
inv.callRealMethod();
+                    reference.set(persistentTopic);
+                    return persistentTopic;
+                })
+                .when(brokerService)
+                .newPersistentTopic(Mockito.eq(topic), Mockito.any(), 
Mockito.eq(brokerService));
+
+        CompletableFuture<Optional<Topic>> f = brokerService.getTopic(topic, 
true);
+
+        Awaitility.waitAtMost(20, TimeUnit.SECONDS)
+                .pollInterval(Duration.ofSeconds(2)).until(() -> 
reference.get() != null);
+        PersistentTopic persistentTopic = reference.get();
+        TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
+        Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
+        TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
+        TopicTransactionBufferState.State expectState = 
TopicTransactionBufferState.State.Close;
+        Assert.assertEquals(ttb.getState(), expectState);
+        Assert.assertTrue(f.isCompletedExceptionally());
+    }
+
 }

Reply via email to