This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 83a29ecc0332a9bc65a8e4474c27955accb1359f Author: congbo <[email protected]> AuthorDate: Wed Jun 23 23:07:18 2021 +0800 [Transaction] Fix broker init transaction related topic. (#11022) (cherry picked from commit e092fb33699c227c110a9c707ff6fcfedfd1ee7a) --- .../org/apache/pulsar/broker/PulsarService.java | 14 +- .../service/persistent/PersistentSubscription.java | 7 +- .../broker/service/persistent/PersistentTopic.java | 7 +- .../pulsar/broker/transaction/TransactionTest.java | 142 +++++++++++++++++++++ 4 files changed, 157 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index cbd3eb3..0d01968 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -108,6 +109,7 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; +import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.broker.validator.MultipleListenerValidator; import org.apache.pulsar.broker.web.WebService; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -122,6 +124,7 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; @@ -990,7 +993,7 @@ public class PulsarService implements AutoCloseable { for (String topic : getNamespaceService().getListOfPersistentTopics(nsName).join()) { try { TopicName topicName = TopicName.get(topic); - if (bundle.includes(topicName)) { + if (bundle.includes(topicName) && !isTransactionSystemTopic(topicName)) { CompletableFuture<Topic> future = brokerService.getOrCreateTopic(topic); if (future != null) { persistentTopics.add(future); @@ -1584,4 +1587,13 @@ public class PulsarService implements AutoCloseable { return workerConfig; } + + private static boolean isTransactionSystemTopic(TopicName topicName) { + String topic = topicName.toString(); + return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()) + || topic.startsWith(TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString()) + || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 16f2259..316ac10 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -59,7 +59,6 @@ import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; -import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled; import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; import org.apache.pulsar.client.api.transaction.TxnID; @@ -77,7 +76,6 @@ import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,10 +139,7 @@ public class PersistentSubscription implements Subscription { this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this); this.setReplicated(replicated); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() - && !checkTopicIsEventsNames(TopicName.get(topicName)) - && !topicName.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()) - && !topicName.startsWith(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX) - && !topicName.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) { + && !checkTopicIsEventsNames(TopicName.get(topicName))) { this.pendingAckHandle = new PendingAckHandleImpl(this); } else { this.pendingAckHandle = new PendingAckHandleDisabled(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 8f7f356..3d5e10d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -107,7 +107,6 @@ import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.broker.stats.ReplicationMetrics; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; -import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.api.MessageId; @@ -149,7 +148,6 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicImpl; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; -import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; import org.apache.pulsar.utils.StatsOutputStream; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -317,10 +315,7 @@ public class PersistentTopic extends AbstractTopic checkReplicatedSubscriptionControllerState(); TopicName topicName = TopicName.get(topic); if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled() - && !checkTopicIsEventsNames(topicName) - && !topicName.getEncodedLocalName().startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()) - && !topicName.getEncodedLocalName().startsWith(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX) - && !topicName.getEncodedLocalName().endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) { + && !checkTopicIsEventsNames(topicName)) { this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this, transactionCompletableFuture); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java new file mode 100644 index 0000000..ce8b3ae --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction; + +import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; +import com.google.common.collect.Sets; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Pulsar client transaction test. + */ +@Slf4j +@Test(groups = "broker") +public class TransactionTest extends TransactionTestBase { + + private static final String TENANT = "tnx"; + private static final String NAMESPACE1 = TENANT + "/ns1"; + + @BeforeMethod + protected void setup() throws Exception { + this.setBrokerCount(1); + this.internalSetup(); + + String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); + String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1]; + admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build()); + admin.tenants().createTenant(TENANT, + new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); + admin.namespaces().createNamespace(NAMESPACE1); + + admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), + new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); + admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); + admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); + pulsarClient.close(); + pulsarClient = PulsarClient.builder() + .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()) + .statsInterval(0, TimeUnit.SECONDS) + .enableTransaction(true) + .build(); + } + + @Test + public void brokerNotInitTxnManagedLedgerTopic() throws Exception { + String subName = "test"; + + String topicName = TopicName.get(NAMESPACE1 + "/test").toString(); + + + @Cleanup + Consumer<byte[]> consumer = getConsumer(topicName, subName); + + consumer.close(); + + Awaitility.await().until(() -> { + try { + pulsarClient.newTransaction() + .withTransactionTimeout(30, TimeUnit.SECONDS).build().get(); + } catch (Exception e) { + return false; + } + return true; + }); + + admin.namespaces().unload(NamespaceName.SYSTEM_NAMESPACE.toString()); + admin.namespaces().unload(NAMESPACE1); + + @Cleanup + Consumer<byte[]> consumer1 = getConsumer(topicName, subName); + + Awaitility.await().until(() -> { + try { + pulsarClient.newTransaction() + .withTransactionTimeout(30, TimeUnit.SECONDS).build().get(); + } catch (Exception e) { + return false; + } + return true; + }); + + ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = + getPulsarServiceList().get(0).getBrokerService().getTopics(); + + Assert.assertNull(topics.get(TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString() + 0)); + Assert.assertNull(topics.get(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString())); + Assert.assertNull(topics.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName))); + } + + + @AfterMethod(alwaysRun = true) + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + public Consumer<byte[]> getConsumer(String topicName, String subName) throws PulsarClientException { + return pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + } +} \ No newline at end of file
