This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 200c5f4a18d [fix] [broker] Replication stopped due to unload topic
failed (#21947)
200c5f4a18d is described below
commit 200c5f4a18d35d51b7cc7cda584b025144ec38ca
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jan 25 17:02:26 2024 +0800
[fix] [broker] Replication stopped due to unload topic failed (#21947)
### Motivation
**Steps to reproduce the issue**
- Enable replication.
- Send `10` messages to the local cluster then close the producer.
- Call `pulsar-admin topics unload <topic>` and get an error due to the
internal producer of the replicator close failing.
- The topic closed failed, so we assumed the topic could work as expected,
but the replication stopped.
**Root cause**
- `pulsar-admin topics unload <topic>` will wait for the clients(including
`consumers & producers & replicators`) to close successfully, and it will fail
if clients can not be closed successfully.
- `replicator.producer` close failed causing the Admin API to fail, but
there is a scheduled task that will retry to close `replicator.producer` which
causes replication to stop. see
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java#L209
### Modifications
since the "replicator.producer.closeAsync()" will retry after it fails, the
topic unload should be successful.
---
.../pulsar/broker/service/AbstractReplicator.java | 3 +-
.../broker/service/OneWayReplicatorTest.java | 70 +++++++++++++++++++---
2 files changed, 63 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 6dd296d16b5..1b5b2824257 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -194,7 +194,7 @@ public abstract class AbstractReplicator {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> future = producer.closeAsync();
- future.thenRun(() -> {
+ return future.thenRun(() -> {
STATE_UPDATER.set(this, State.Stopped);
this.producer = null;
// deactivate further read
@@ -209,7 +209,6 @@ public abstract class AbstractReplicator {
brokerService.executor().schedule(this::closeProducerAsync,
waitTimeMs, TimeUnit.MILLISECONDS);
return null;
});
- return future;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 8269f40e608..1accd04f491 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -21,15 +21,21 @@ package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Field;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -49,6 +55,29 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
super.cleanup();
}
+ private void waitReplicatorStarted(String topicName) {
+ Awaitility.await().untilAsserted(() -> {
+ Optional<Topic> topicOptional2 =
pulsar2.getBrokerService().getTopic(topicName, false).get();
+ assertTrue(topicOptional2.isPresent());
+ PersistentTopic persistentTopic2 = (PersistentTopic)
topicOptional2.get();
+ assertFalse(persistentTopic2.getProducers().isEmpty());
+ });
+ }
+
+ /**
+ * Override "AbstractReplicator.producer" by {@param producer} and return
the original value.
+ */
+ private ProducerImpl overrideProducerForReplicator(AbstractReplicator
replicator, ProducerImpl newProducer)
+ throws Exception {
+ Field producerField =
AbstractReplicator.class.getDeclaredField("producer");
+ producerField.setAccessible(true);
+ ProducerImpl originalValue = (ProducerImpl)
producerField.get(replicator);
+ synchronized (replicator) {
+ producerField.set(replicator, newProducer);
+ }
+ return originalValue;
+ }
+
@Test
public void testReplicatorProducerStatInTopic() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ defaultNamespace + "/tp_");
@@ -79,18 +108,13 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
public void testCreateRemoteConsumerFirst() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ defaultNamespace + "/tp_");
Producer<String> producer1 =
client1.newProducer(Schema.STRING).topic(topicName).create();
- // Wait for replicator started.
- Awaitility.await().untilAsserted(() -> {
- Optional<Topic> topicOptional2 =
pulsar2.getBrokerService().getTopic(topicName, false).get();
- assertTrue(topicOptional2.isPresent());
- PersistentTopic persistentTopic2 = (PersistentTopic)
topicOptional2.get();
- assertFalse(persistentTopic2.getProducers().isEmpty());
- });
+
// The topic in cluster2 has a replicator created producer(schema
Auto_Produce), but does not have any schema。
// Verify: the consumer of this cluster2 can create successfully.
Consumer<String> consumer2 =
client2.newConsumer(Schema.STRING).topic(topicName).subscriptionName("s1")
.subscribe();;
-
+ // Wait for replicator started.
+ waitReplicatorStarted(topicName);
// cleanup.
producer1.close();
consumer2.close();
@@ -99,4 +123,34 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
admin2.topics().delete(topicName);
});
}
+
+ @Test
+ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws
Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ defaultNamespace + "/tp_");
+ admin1.topics().createNonPartitionedTopic(topicName);
+ // Wait for replicator started.
+ waitReplicatorStarted(topicName);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+ PersistentReplicator replicator =
+ (PersistentReplicator)
persistentTopic.getReplicators().values().iterator().next();
+ // Mock an error when calling "replicator.disconnect()"
+ ProducerImpl mockProducer = Mockito.mock(ProducerImpl.class);
+
Mockito.when(mockProducer.closeAsync()).thenReturn(CompletableFuture.failedFuture(new
Exception("mocked ex")));
+ ProducerImpl originalProducer =
overrideProducerForReplicator(replicator, mockProducer);
+ // Verify: since the "replicator.producer.closeAsync()" will retry
after it failed, the topic unload should be
+ // successful.
+ admin1.topics().unload(topicName);
+ // Verify: After "replicator.producer.closeAsync()" retry again, the
"replicator.producer" will be closed
+ // successful.
+ overrideProducerForReplicator(replicator, originalProducer);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse(replicator.isConnected());
+ });
+ // cleanup.
+ cleanupTopics(() -> {
+ admin1.topics().delete(topicName);
+ admin2.topics().delete(topicName);
+ });
+ }
}