This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch 2.10.4/19384 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4b98829939c8083ed5e1090d7996288931fba059 Author: Tao Jiuming <[email protected]> AuthorDate: Mon Feb 6 15:32:59 2023 +0800 [fix] Close TransactionBuffer when create persistent topic timeout (#19384) (cherry picked from commit 8eb7ee1e9e96d4686e452320cdaba92d5eca7b4f) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 ---- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 +-- .../pulsar/broker/service/BrokerService.java | 7 ++++ .../apache/pulsar/broker/service/ServerCnx.java | 3 +- .../broker/admin/IncrementPartitionsTest.java | 3 +- .../pulsar/broker/transaction/TransactionTest.java | 1 + .../buffer/TopicTransactionBufferTest.java | 38 ++++++++++++++++++++++ 7 files changed, 51 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 09322382a46..71948f547c2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -204,12 +204,6 @@ public class ManagedCursorImpl implements ManagedCursor { // active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger. private volatile boolean isActive = false; - // active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger. - private volatile boolean isActive = false; - - // active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger. - private volatile boolean isActive = false; - class MarkDeleteEntry { final PositionImpl newPosition; final MarkDeleteCallback callback; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a05b6e451ac..162240b44f0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -215,9 +215,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final CallbackMutex offloadMutex = new CallbackMutex(); private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture .completedFuture(PositionImpl.LATEST); - private volatile LedgerHandle currentLedger; + protected volatile LedgerHandle currentLedger; private long currentLedgerEntries = 0; - private long currentLedgerSize = 0; + protected long currentLedgerSize = 0; private long lastLedgerCreatedTimestamp = 0; private long lastLedgerCreationFailureTimestamp = 0; private long lastLedgerCreationInitiationTimestamp = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 61eff24a78f..0ea95995f4b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1457,8 +1457,15 @@ public class BrokerService implements Closeable { - topicCreateTimeMs; pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); if (topicFuture.isCompletedExceptionally()) { + // Check create persistent topic timeout. log.warn("{} future is already completed with failure {}, closing the" + " topic", topic, FutureUtil.getException(topicFuture)); + persistentTopic.getTransactionBuffer() + .closeAsync() + .exceptionally(t -> { + log.error("[{}] Close transactionBuffer failed", topic, t); + return null; + }); persistentTopic.stopReplProducers() .whenCompleteAsync((v, exception) -> { topics.remove(topic, topicFuture); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 1f08db86aaf..6cc89dfe030 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -134,7 +134,6 @@ import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.intercept.InterceptException; import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; @@ -1261,7 +1260,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { // Check whether the producer will publish encrypted messages or not if ((topic.isEncryptionRequired() || encryptionRequireOnProducer) && !isEncrypted - && !SystemTopicNames.isSystemTopic(topicName)) { + && !topic.getBrokerService().isSystemTopic(topicName)) { String msg = String.format("Encryption is required in %s", topicName); log.warn("[{}] {}", remoteAddress, msg); if (producerFuture.completeExceptionally(new ServerMetadataException(msg))) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java index e156678a394..6d1444fa3a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java @@ -136,7 +136,8 @@ public class IncrementPartitionsTest extends MockedPulsarServiceBaseTest { assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20); assertEquals(admin.topics().getSubscriptions( - TopicName.get(partitionedTopicName).getPartition(15).toString()), List.of("sub-1")); + TopicName.get(partitionedTopicName).getPartition(15).toString()), + Lists.newArrayList("sub-1")); TopicStats stats = admin.topics() .getStats(TopicName.get(partitionedTopicName).getPartition(15).toString()); Map<String, String> subscriptionProperties = stats.getSubscriptions() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index a2c6e7dbafc..2da7502dc25 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -39,6 +39,7 @@ import io.netty.buffer.Unpooled; import io.netty.util.Timeout; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.Map; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 1cf190056af..ddb9af94f56 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -23,6 +23,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; @@ -42,8 +43,11 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; + +import java.time.Duration; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -138,4 +142,38 @@ public class TopicTransactionBufferTest extends TransactionTestBase { Assert.assertEquals(ttb.getState(), expectState); } + + @Test + public void testCloseTransactionBufferWhenTimeout() throws Exception { + String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID(); + PulsarService pulsar = pulsarServiceList.get(0); + BrokerService brokerService0 = pulsar.getBrokerService(); + BrokerService brokerService = Mockito.spy(brokerService0); + AtomicReference<PersistentTopic> reference = new AtomicReference<>(); + pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10); + long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 1); + + Mockito + .doAnswer(inv -> { + Thread.sleep(topicLoadTimeout); + PersistentTopic persistentTopic = (PersistentTopic) inv.callRealMethod(); + reference.set(persistentTopic); + return persistentTopic; + }) + .when(brokerService) + .newPersistentTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService)); + + CompletableFuture<Optional<Topic>> f = brokerService.getTopic(topic, true); + + Awaitility.waitAtMost(20, TimeUnit.SECONDS) + .pollInterval(Duration.ofSeconds(2)).until(() -> reference.get() != null); + PersistentTopic persistentTopic = reference.get(); + TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); + Assert.assertTrue(buffer instanceof TopicTransactionBuffer); + TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer; + TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close; + Assert.assertEquals(ttb.getState(), expectState); + Assert.assertTrue(f.isCompletedExceptionally()); + } + }
