This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 887f8ec398fad0062fa04b945099afbb888b929e Author: Rajan Dhabalia <[email protected]> AuthorDate: Thu Jun 17 23:07:34 2021 -0700 [pulsar-broker] Handle multiple topic creation for same topic-name in broker (#10847) ### Motivation When the broker takes a longer time to load the topic and times out before completing the topic future, then the broker keeps multiple topics opened , doesn't clean up timed out topic, fail to create replicator producer on successfully created topic with error: `repl-producer is already connected to topic`, builds replication backlog. ``` 19:16:10.107 [pulsar-ordered-OrderedExecutor-5-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger myProp/global/myNs/persistent/myTopic : 9:17:10.953 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl - [myProp/global/myNs/persistent/myTopic] Successfully initialize managed ledger : 19:17:10.065 [pulsar-io-23-30] ERROR org.apache.pulsar.broker.service.ServerCnx - [/10.196.133.62:47278] Failed to create topic persistent://myProp/global/myNs/myTopic, producerId=382 : 19:17:10.954 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://myProp/global/myNs/myTopic][west1 -> west2] Starting open cursor for replicator : 19:17:10.955 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO org.apache.pulsar.broker.service.BrokerService - Created topic persistent://myProp/global/myNs/myTopic - dedup is disabled : 19:17:51.532 [pulsar-ordered-OrderedExecutor-5-0] INFO org.apache.pulsar.broker.service.BrokerService - Created topic persistent://myProp/global/myNs/myTopic - dedup is disabled : 19:17:51.530 [pulsar-ordered-OrderedExecutor-5-0] INFO org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://myProp/global/myNs/myTopic][west1 -> west2] Starting open cursor for replicator : 07:25:51.377 [pulsar-io-23-5] ERROR org.apache.pulsar.client.impl.ProducerImpl - [persistent://myProp/global/myNs/myTopic] [pulsar.repl.west1] Failed to create producer: Producer with name 'pulsarrepl.west1' is already connected to topic ``` ### Modification - Stopped replicator for failed and timed-out topic - Clean up failed topic ### Result - Successfully create replicator producer for the topic and avoid creating replication backlog (cherry picked from commit 1447e6b1061babedc08901c44f16164bb4c4e2df) --- .../pulsar/broker/service/BrokerService.java | 14 +++- .../pulsar/broker/service/ReplicatorTest.java | 80 ++++++++++++++++++++++ .../org/apache/pulsar/common/util/FutureUtil.java | 31 +++++++++ 3 files changed, 122 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 575738a..9c9d482 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -970,7 +970,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies } private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) { - CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline(); + CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.futureWithDeadline(executor()); if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { @@ -1233,8 +1233,16 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs; pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); - addTopicToStatsMaps(topicName, persistentTopic); - topicFuture.complete(Optional.of(persistentTopic)); + if (topicFuture.isCompletedExceptionally()) { + log.warn("{} future is already completed with failure {}, closing the topic", + topic, FutureUtil.getException(topicFuture)); + persistentTopic.stopReplProducers().whenComplete((v, exception) -> { + topics.remove(topic, topicFuture); + }); + } else { + addTopicToStatsMaps(topicName, persistentTopic); + topicFuture.complete(Optional.of(persistentTopic)); + } }).exceptionally((ex) -> { log.warn( "Replication or dedup check failed." diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index e5219d5..d261e85 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -18,11 +18,13 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -34,11 +36,13 @@ import io.netty.buffer.ByteBuf; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.List; +import java.util.Optional; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -52,6 +56,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; @@ -65,6 +70,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -1041,6 +1047,80 @@ public class ReplicatorTest extends ReplicatorTestBase { nonPersistentProducer2.close(); } + @Test + public void testCleanupTopic() throws Exception { + + final String cluster1 = pulsar1.getConfig().getClusterName(); + final String cluster2 = pulsar2.getConfig().getClusterName(); + final String namespace = "pulsar/ns-" + System.nanoTime(); + final String topicName = "persistent://" + namespace + "/cleanTopic"; + final String topicMlName = namespace + "/persistent/cleanTopic"; + admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2)); + + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + long topicLoadTimeoutSeconds = 3; + config1.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); + config2.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); + + ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) pulsar1.getManagedLedgerClientFactory() + .getManagedLedgerFactory(); + Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); + ledgersField.setAccessible(true); + ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) ledgersField + .get(mlFactory); + CompletableFuture<ManagedLedgerImpl> mlFuture = new CompletableFuture<>(); + ledgers.put(topicMlName, mlFuture); + + try { + Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) + .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS); + fail("consumer should fail due to topic loading failure"); + } catch (Exception e) { + // Ok + } + + CompletableFuture<Optional<Topic>> topicFuture = null; + for (int i = 0; i < 5; i++) { + topicFuture = pulsar1.getBrokerService().getTopics().get(topicName); + if (topicFuture != null) { + break; + } + Thread.sleep(i * 1000); + } + + try { + topicFuture.get(); + fail("topic creation should fail"); + } catch (Exception e) { + // Ok + } + + final CompletableFuture<Optional<Topic>> timedOutTopicFuture = topicFuture; + // timeout topic future should be removed from cache + retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5, + 1000); + + assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName)); + + try { + Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) + .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS); + fail("consumer should fail due to topic loading failure"); + } catch (Exception e) { + // Ok + } + + ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2"); + mlFuture.complete(ml); + + Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name") + .subscriptionType(SubscriptionType.Shared).subscribeAsync() + .get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS); + + consumer.close(); + } private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 7356950..0c3a0c0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -20,8 +20,10 @@ package org.apache.pulsar.common.util; import java.time.Duration; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -162,4 +164,33 @@ public class FutureUtil { return this; } } + + public static <T> CompletableFuture<T> futureWithDeadline(ScheduledExecutorService executor, Long delay, + TimeUnit unit, Exception exp) { + CompletableFuture<T> future = new CompletableFuture<T>(); + executor.schedule(() -> { + if (!future.isDone()) { + future.completeExceptionally(exp); + } + }, delay, unit); + return future; + } + + public static <T> CompletableFuture<T> futureWithDeadline(ScheduledExecutorService executor) { + return futureWithDeadline(executor, 60000L, TimeUnit.MILLISECONDS, + new TimeoutException("Future didn't finish within deadline")); + } + + public static <T> Optional<Throwable> getException(CompletableFuture<T> future) { + if (future != null && future.isCompletedExceptionally()) { + try { + future.get(); + } catch (InterruptedException e) { + return Optional.ofNullable(e); + } catch (ExecutionException e) { + return Optional.ofNullable(e.getCause()); + } + } + return Optional.empty(); + } }
