This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 0be2fb9b9cc [fix][broker] Release orphan replicator after topic closed 
(#20582)
0be2fb9b9cc is described below

commit 0be2fb9b9cc7d37a8687296db407d3c63cc750f3
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Jun 16 10:39:28 2023 +0800

    [fix][broker] Release orphan replicator after topic closed (#20582)
---
 .../pulsar/broker/service/AbstractReplicator.java  |  39 +++++-
 .../nonpersistent/NonPersistentReplicator.java     |   2 +-
 .../service/persistent/PersistentReplicator.java   |   2 +-
 .../broker/service/AbstractReplicatorTest.java     | 146 +++++++++++++++++++++
 4 files changed, 183 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 18e2e2d16c3..a142c804001 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -44,6 +45,7 @@ public abstract class AbstractReplicator {
     protected final String remoteCluster;
     protected final PulsarClientImpl replicationClient;
     protected final PulsarClientImpl client;
+    protected final Topic localTopic;
 
     protected volatile ProducerImpl producer;
     public static final String REPL_PRODUCER_NAME_DELIMITER = "-->";
@@ -64,11 +66,12 @@ public abstract class AbstractReplicator {
         Stopped, Starting, Started, Stopping
     }
 
-    public AbstractReplicator(String topicName, String replicatorPrefix, 
String localCluster, String remoteCluster,
+    public AbstractReplicator(Topic localTopic, String replicatorPrefix, 
String localCluster, String remoteCluster,
                               BrokerService brokerService, PulsarClientImpl 
replicationClient)
             throws PulsarServerException {
         this.brokerService = brokerService;
-        this.topicName = topicName;
+        this.localTopic = localTopic;
+        this.topicName = localTopic.getName();
         this.replicatorPrefix = replicatorPrefix;
         this.localCluster = localCluster.intern();
         this.remoteCluster = remoteCluster.intern();
@@ -111,7 +114,8 @@ public abstract class AbstractReplicator {
                         topicName, localCluster, remoteCluster, waitTimeMs / 
1000.0);
             }
             // BackOff before retrying
-            brokerService.executor().schedule(this::startProducer, waitTimeMs, 
TimeUnit.MILLISECONDS);
+            
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer,
+                    waitTimeMs, TimeUnit.MILLISECONDS);
             return;
         }
         State state = STATE_UPDATER.get(this);
@@ -139,7 +143,8 @@ public abstract class AbstractReplicator {
                         localCluster, remoteCluster, ex.getMessage(), 
waitTimeMs / 1000.0);
 
                 // BackOff before retrying
-                brokerService.executor().schedule(this::startProducer, 
waitTimeMs, TimeUnit.MILLISECONDS);
+                
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer,
+                        waitTimeMs, TimeUnit.MILLISECONDS);
             } else {
                 log.warn("[{}][{} -> {}] Failed to create remote producer. 
Replicator state: {}", topicName,
                         localCluster, remoteCluster, STATE_UPDATER.get(this), 
ex);
@@ -149,6 +154,32 @@ public abstract class AbstractReplicator {
 
     }
 
+    protected void checkTopicActiveAndRetryStartProducer() {
+        isLocalTopicActive().thenAccept(isTopicActive -> {
+            if (isTopicActive) {
+                startProducer();
+            }
+        }).exceptionally(ex -> {
+            log.warn("[{}] Stop retry to create producer due to topic load 
fail. Replicator state: {}",
+                    String.format("%s%s%s", 
getReplicatorName(replicatorPrefix, localCluster),
+                            REPL_PRODUCER_NAME_DELIMITER, remoteCluster), 
STATE_UPDATER.get(this), ex);
+            return null;
+        });
+    }
+
+    protected CompletableFuture<Boolean> isLocalTopicActive() {
+        CompletableFuture<Optional<Topic>> topicFuture = 
brokerService.getTopics().get(topicName);
+        if (topicFuture == null){
+            return CompletableFuture.completedFuture(false);
+        }
+        return topicFuture.thenApplyAsync(optional -> {
+            if (optional.isEmpty()) {
+                return false;
+            }
+            return optional.get() == localTopic;
+        }, brokerService.executor());
+    }
+
     protected synchronized CompletableFuture<Void> closeProducerAsync() {
         if (producer == null) {
             STATE_UPDATER.set(this, State.Stopped);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index b863e9eb3c2..40d19e8176d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -50,7 +50,7 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
 
     public NonPersistentReplicator(NonPersistentTopic topic, String 
localCluster, String remoteCluster,
             BrokerService brokerService, PulsarClientImpl replicationClient) 
throws PulsarServerException {
-        super(topic.getName(), topic.getReplicatorPrefix(), localCluster, 
remoteCluster, brokerService,
+        super(topic, topic.getReplicatorPrefix(), localCluster, remoteCluster, 
brokerService,
                 replicationClient);
 
         producerBuilder.blockIfQueueFull(false);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 0471f12f3c9..75ea294329e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -110,7 +110,7 @@ public class PersistentReplicator extends AbstractReplicator
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, 
String localCluster, String remoteCluster,
                                 BrokerService brokerService, PulsarClientImpl 
replicationClient)
             throws PulsarServerException {
-        super(topic.getName(), topic.getReplicatorPrefix(), localCluster, 
remoteCluster, brokerService,
+        super(topic, topic.getReplicatorPrefix(), localCluster, remoteCluster, 
brokerService,
                 replicationClient);
         this.topic = topic;
         this.cursor = cursor;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
new file mode 100644
index 00000000000..30be3c55c59
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import io.netty.channel.DefaultEventLoop;
+import io.netty.util.internal.DefaultPriorityQueue;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test(groups = "broker")
+public class AbstractReplicatorTest {
+
+    @Test
+    public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
+        final String localCluster = "localCluster";
+        final String remoteCluster = "remoteCluster";
+        final String topicName = "remoteTopicName";
+        final String replicatorPrefix = "pulsar.repl";
+        final DefaultEventLoop eventLoopGroup = new DefaultEventLoop();
+        // Mock services.
+        final ServiceConfiguration pulsarConfig = 
mock(ServiceConfiguration.class);
+        final PulsarService pulsar = mock(PulsarService.class);
+        final BrokerService broker = mock(BrokerService.class);
+        final Topic localTopic = mock(Topic.class);
+        final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
+        final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
+        final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
+        final ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
+        when(broker.executor()).thenReturn(eventLoopGroup);
+        when(broker.getTopics()).thenReturn(topics);
+        
when(remoteClient.newProducer(any(Schema.class))).thenReturn(producerBuilder);
+        when(broker.pulsar()).thenReturn(pulsar);
+        when(pulsar.getClient()).thenReturn(localClient);
+        when(pulsar.getConfiguration()).thenReturn(pulsarConfig);
+        when(pulsarConfig.getReplicationProducerQueueSize()).thenReturn(100);
+        when(localTopic.getName()).thenReturn(topicName);
+        when(producerBuilder.topic(any())).thenReturn(producerBuilder);
+        
when(producerBuilder.messageRoutingMode(any())).thenReturn(producerBuilder);
+        
when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder);
+        when(producerBuilder.sendTimeout(anyInt(), 
any())).thenReturn(producerBuilder);
+        
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
+        
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
+        // Mock create producer fail.
+        when(producerBuilder.create()).thenThrow(new RuntimeException("mocked 
ex"));
+        when(producerBuilder.createAsync())
+                .thenReturn(CompletableFuture.failedFuture(new 
RuntimeException("mocked ex")));
+        // Make race condition: "retry start producer" and "close replicator".
+        final ReplicatorInTest replicator = new ReplicatorInTest(localTopic, 
remoteCluster, topicName,
+                replicatorPrefix, broker, remoteClient);
+        replicator.startProducer();
+        replicator.disconnect();
+
+        // Verify task will done.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            AtomicInteger taskCounter = new AtomicInteger();
+            CountDownLatch checkTaskFinished = new CountDownLatch(1);
+            eventLoopGroup.execute(() -> {
+                synchronized (replicator) {
+                    LinkedBlockingQueue taskQueue = 
WhiteboxImpl.getInternalState(eventLoopGroup, "taskQueue");
+                    DefaultPriorityQueue scheduledTaskQueue =
+                            WhiteboxImpl.getInternalState(eventLoopGroup, 
"scheduledTaskQueue");
+                    taskCounter.set(taskQueue.size() + 
scheduledTaskQueue.size());
+                    checkTaskFinished.countDown();
+                }
+            });
+            checkTaskFinished.await();
+            Assert.assertEquals(taskCounter.get(), 0);
+        });
+    }
+
+    private static class ReplicatorInTest extends AbstractReplicator {
+
+        public ReplicatorInTest(Topic localTopic, String remoteCluster, String 
remoteTopicName,
+                                String replicatorPrefix, BrokerService 
brokerService,
+                                PulsarClientImpl replicationClient) throws 
PulsarServerException {
+            super(localTopic, remoteCluster, remoteTopicName, 
replicatorPrefix, brokerService,
+                    replicationClient);
+        }
+
+        protected String getProducerName() {
+            return "pulsar.repl.producer";
+        }
+
+        @Override
+        protected void readEntries(Producer<byte[]> producer) {
+
+        }
+
+        @Override
+        protected Position getReplicatorReadPosition() {
+            return PositionImpl.EARLIEST;
+        }
+
+        @Override
+        protected long getNumberOfEntriesInBacklog() {
+            return 0;
+        }
+
+        @Override
+        protected void disableReplicatorRead() {
+
+        }
+    }
+}

Reply via email to