This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 49f6a9f6be2 [fix] [broker] Replication stopped due to unload topic 
failed (#21947)
49f6a9f6be2 is described below

commit 49f6a9f6be27a9b785f4ced22e65600b0dfc4379
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jan 25 17:02:26 2024 +0800

    [fix] [broker] Replication stopped due to unload topic failed (#21947)
    
    ### Motivation
    
    **Steps to reproduce the issue**
    - Enable replication.
    - Send `10` messages to the local cluster then close the producer.
    - Call `pulsar-admin topics unload <topic>` and get an error due to the 
internal producer of the replicator close failing.
    - The topic closed failed, so we assumed the topic could work as expected, 
but the replication stopped.
    
    **Root cause**
    - `pulsar-admin topics unload <topic>`  will wait for the clients(including 
`consumers & producers & replicators`) to close successfully, and it will fail 
if clients can not be closed successfully.
    - `replicator.producer` close failed causing the Admin API to fail, but 
there is a scheduled task that will retry to close `replicator.producer` which 
causes replication to stop. see 
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java#L209
    
    ### Modifications
    
    since the "replicator.producer.closeAsync()" will retry after it fails, the 
topic unload should be successful.
---
 .../pulsar/broker/service/AbstractReplicator.java  |  3 +-
 .../broker/service/OneWayReplicatorTest.java       | 70 +++++++++++++++++++---
 2 files changed, 63 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 9509515e1e8..50408b50632 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -185,7 +185,7 @@ public abstract class AbstractReplicator {
             return CompletableFuture.completedFuture(null);
         }
         CompletableFuture<Void> future = producer.closeAsync();
-        future.thenRun(() -> {
+        return future.thenRun(() -> {
             STATE_UPDATER.set(this, State.Stopped);
             this.producer = null;
             // deactivate further read
@@ -200,7 +200,6 @@ public abstract class AbstractReplicator {
             brokerService.executor().schedule(this::closeProducerAsync, 
waitTimeMs, TimeUnit.MILLISECONDS);
             return null;
         });
-        return future;
     }
 
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 73a8aca13a9..a6240775321 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -21,17 +21,23 @@ package org.apache.pulsar.broker.service;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Field;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.junit.Assert;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -51,6 +57,29 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         super.cleanup();
     }
 
+    private void waitReplicatorStarted(String topicName) {
+        Awaitility.await().untilAsserted(() -> {
+            Optional<Topic> topicOptional2 = 
pulsar2.getBrokerService().getTopic(topicName, false).get();
+            assertTrue(topicOptional2.isPresent());
+            PersistentTopic persistentTopic2 = (PersistentTopic) 
topicOptional2.get();
+            assertFalse(persistentTopic2.getProducers().isEmpty());
+        });
+    }
+
+    /**
+     * Override "AbstractReplicator.producer" by {@param producer} and return 
the original value.
+     */
+    private ProducerImpl overrideProducerForReplicator(AbstractReplicator 
replicator, ProducerImpl newProducer)
+            throws Exception {
+        Field producerField = 
AbstractReplicator.class.getDeclaredField("producer");
+        producerField.setAccessible(true);
+        ProducerImpl originalValue = (ProducerImpl) 
producerField.get(replicator);
+        synchronized (replicator) {
+            producerField.set(replicator, newProducer);
+        }
+        return originalValue;
+    }
+
     @Test
     public void testReplicatorProducerStatInTopic() throws Exception {
         final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
@@ -86,18 +115,13 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
     public void testCreateRemoteConsumerFirst() throws Exception {
         final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
         Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(topicName).create();
-        // Wait for replicator started.
-        Awaitility.await().untilAsserted(() -> {
-            Optional<Topic> topicOptional2 = 
pulsar2.getBrokerService().getTopic(topicName, false).get();
-            assertTrue(topicOptional2.isPresent());
-            PersistentTopic persistentTopic2 = (PersistentTopic) 
topicOptional2.get();
-            assertFalse(persistentTopic2.getProducers().isEmpty());
-        });
+
         // The topic in cluster2 has a replicator created producer(schema 
Auto_Produce), but does not have any schema。
         // Verify: the consumer of this cluster2 can create successfully.
         Consumer<String> consumer2 = 
client2.newConsumer(Schema.STRING).topic(topicName).subscriptionName("s1")
                 .subscribe();;
-
+        // Wait for replicator started.
+        waitReplicatorStarted(topicName);
         // cleanup.
         producer1.close();
         consumer2.close();
@@ -106,4 +130,34 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
             admin2.topics().delete(topicName);
         });
     }
+
+    @Test
+    public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws 
Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ defaultNamespace + "/tp_");
+        admin1.topics().createNonPartitionedTopic(topicName);
+        // Wait for replicator started.
+        waitReplicatorStarted(topicName);
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+        PersistentReplicator replicator =
+                (PersistentReplicator) 
persistentTopic.getReplicators().values().iterator().next();
+        // Mock an error when calling "replicator.disconnect()"
+        ProducerImpl mockProducer = Mockito.mock(ProducerImpl.class);
+        
Mockito.when(mockProducer.closeAsync()).thenReturn(CompletableFuture.failedFuture(new
 Exception("mocked ex")));
+        ProducerImpl originalProducer = 
overrideProducerForReplicator(replicator, mockProducer);
+        // Verify: since the "replicator.producer.closeAsync()" will retry 
after it failed, the topic unload should be
+        // successful.
+        admin1.topics().unload(topicName);
+        // Verify: After "replicator.producer.closeAsync()" retry again, the 
"replicator.producer" will be closed
+        // successful.
+        overrideProducerForReplicator(replicator, originalProducer);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertFalse(replicator.isConnected());
+        });
+        // cleanup.
+        cleanupTopics(() -> {
+            admin1.topics().delete(topicName);
+            admin2.topics().delete(topicName);
+        });
+    }
 }

Reply via email to