This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 49f6a9f6be2 [fix] [broker] Replication stopped due to unload topic
failed (#21947)
49f6a9f6be2 is described below
commit 49f6a9f6be27a9b785f4ced22e65600b0dfc4379
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jan 25 17:02:26 2024 +0800
[fix] [broker] Replication stopped due to unload topic failed (#21947)
### Motivation
**Steps to reproduce the issue**
- Enable replication.
- Send `10` messages to the local cluster then close the producer.
- Call `pulsar-admin topics unload <topic>` and get an error due to the
internal producer of the replicator close failing.
- The topic closed failed, so we assumed the topic could work as expected,
but the replication stopped.
**Root cause**
- `pulsar-admin topics unload <topic>` will wait for the clients(including
`consumers & producers & replicators`) to close successfully, and it will fail
if clients can not be closed successfully.
- `replicator.producer` close failed causing the Admin API to fail, but
there is a scheduled task that will retry to close `replicator.producer` which
causes replication to stop. see
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java#L209
### Modifications
since the "replicator.producer.closeAsync()" will retry after it fails, the
topic unload should be successful.
---
.../pulsar/broker/service/AbstractReplicator.java | 3 +-
.../broker/service/OneWayReplicatorTest.java | 70 +++++++++++++++++++---
2 files changed, 63 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 9509515e1e8..50408b50632 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -185,7 +185,7 @@ public abstract class AbstractReplicator {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> future = producer.closeAsync();
- future.thenRun(() -> {
+ return future.thenRun(() -> {
STATE_UPDATER.set(this, State.Stopped);
this.producer = null;
// deactivate further read
@@ -200,7 +200,6 @@ public abstract class AbstractReplicator {
brokerService.executor().schedule(this::closeProducerAsync,
waitTimeMs, TimeUnit.MILLISECONDS);
return null;
});
- return future;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 73a8aca13a9..a6240775321 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -21,17 +21,23 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Field;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.junit.Assert;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -51,6 +57,29 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
super.cleanup();
}
+ private void waitReplicatorStarted(String topicName) {
+ Awaitility.await().untilAsserted(() -> {
+ Optional<Topic> topicOptional2 =
pulsar2.getBrokerService().getTopic(topicName, false).get();
+ assertTrue(topicOptional2.isPresent());
+ PersistentTopic persistentTopic2 = (PersistentTopic)
topicOptional2.get();
+ assertFalse(persistentTopic2.getProducers().isEmpty());
+ });
+ }
+
+ /**
+ * Override "AbstractReplicator.producer" by {@param producer} and return
the original value.
+ */
+ private ProducerImpl overrideProducerForReplicator(AbstractReplicator
replicator, ProducerImpl newProducer)
+ throws Exception {
+ Field producerField =
AbstractReplicator.class.getDeclaredField("producer");
+ producerField.setAccessible(true);
+ ProducerImpl originalValue = (ProducerImpl)
producerField.get(replicator);
+ synchronized (replicator) {
+ producerField.set(replicator, newProducer);
+ }
+ return originalValue;
+ }
+
@Test
public void testReplicatorProducerStatInTopic() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ defaultNamespace + "/tp_");
@@ -86,18 +115,13 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
public void testCreateRemoteConsumerFirst() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ defaultNamespace + "/tp_");
Producer<String> producer1 =
client1.newProducer(Schema.STRING).topic(topicName).create();
- // Wait for replicator started.
- Awaitility.await().untilAsserted(() -> {
- Optional<Topic> topicOptional2 =
pulsar2.getBrokerService().getTopic(topicName, false).get();
- assertTrue(topicOptional2.isPresent());
- PersistentTopic persistentTopic2 = (PersistentTopic)
topicOptional2.get();
- assertFalse(persistentTopic2.getProducers().isEmpty());
- });
+
// The topic in cluster2 has a replicator created producer(schema
Auto_Produce), but does not have any schema。
// Verify: the consumer of this cluster2 can create successfully.
Consumer<String> consumer2 =
client2.newConsumer(Schema.STRING).topic(topicName).subscriptionName("s1")
.subscribe();;
-
+ // Wait for replicator started.
+ waitReplicatorStarted(topicName);
// cleanup.
producer1.close();
consumer2.close();
@@ -106,4 +130,34 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
admin2.topics().delete(topicName);
});
}
+
+ @Test
+ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws
Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ defaultNamespace + "/tp_");
+ admin1.topics().createNonPartitionedTopic(topicName);
+ // Wait for replicator started.
+ waitReplicatorStarted(topicName);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+ PersistentReplicator replicator =
+ (PersistentReplicator)
persistentTopic.getReplicators().values().iterator().next();
+ // Mock an error when calling "replicator.disconnect()"
+ ProducerImpl mockProducer = Mockito.mock(ProducerImpl.class);
+
Mockito.when(mockProducer.closeAsync()).thenReturn(CompletableFuture.failedFuture(new
Exception("mocked ex")));
+ ProducerImpl originalProducer =
overrideProducerForReplicator(replicator, mockProducer);
+ // Verify: since the "replicator.producer.closeAsync()" will retry
after it failed, the topic unload should be
+ // successful.
+ admin1.topics().unload(topicName);
+ // Verify: After "replicator.producer.closeAsync()" retry again, the
"replicator.producer" will be closed
+ // successful.
+ overrideProducerForReplicator(replicator, originalProducer);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse(replicator.isConnected());
+ });
+ // cleanup.
+ cleanupTopics(() -> {
+ admin1.topics().delete(topicName);
+ admin2.topics().delete(topicName);
+ });
+ }
}