This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 04d502946ca [fix] [broker] Counter of pending send messages in
Replicator incorrect if schema future not complete (#19242)
04d502946ca is described below
commit 04d502946ca20d1ce8d1d3cb4c3edbfd0a45b790
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jan 17 15:23:45 2023 +0800
[fix] [broker] Counter of pending send messages in Replicator incorrect if
schema future not complete (#19242)
(cherry picked from commit 3ba6fa8f2f31e7805aad5cb03252ad5c519c10c2)
---
.../service/persistent/PersistentReplicator.java | 5 +-
.../pulsar/broker/service/ReplicatorTest.java | 114 +++++++++++++++++++--
.../pulsar/broker/service/ReplicatorTestBase.java | 11 +-
3 files changed, 111 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index c10b70df6a8..9ea4891874e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -368,9 +368,6 @@ public class PersistentReplicator extends AbstractReplicator
dispatchRateLimiter.ifPresent(rateLimiter ->
rateLimiter.tryDispatchPermit(1, entry.getLength()));
- // Increment pending messages for messages produced locally
- PENDING_MESSAGES_UPDATER.incrementAndGet(this);
-
msgOut.recordEvent(headersAndPayload.readableBytes());
msg.setReplicatedFrom(localCluster);
@@ -402,6 +399,8 @@ public class PersistentReplicator extends AbstractReplicator
});
} else {
msg.setSchemaInfoForReplicator(schemaFuture.get());
+ // Increment pending messages for messages produced locally
+ PENDING_MESSAGES_UPDATER.incrementAndGet(this);
producer.sendAsync(msg, ProducerSendCallback.create(this,
entry, msg));
atLeastOneMessageSentForReplication = true;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index b20083e84a4..e03a0c1acb9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -18,26 +18,24 @@
*/
package org.apache.pulsar.broker.service;
-import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
+import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.google.common.collect.Sets;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
-
import io.netty.buffer.ByteBuf;
-
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
@@ -52,9 +50,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-
import lombok.Cleanup;
-
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -63,6 +59,7 @@ import
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedE
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -81,7 +78,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -93,10 +90,12 @@ import
org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -482,6 +481,105 @@ public class ReplicatorTest extends ReplicatorTestBase {
assertEquals(consumer3.receive().getValue().getNativeObject(), data);
}
+ @Test
+ public void testCounterOfPendingMessagesCorrect() throws Exception {
+ // Init replicator and send many messages.
+ PulsarClient client1 = pulsar1.getClient();
+ final TopicName topic = TopicName
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/testReplicationWithGetSchemaError"));
+ final String subName = "my-sub";
+ @Cleanup
+ Consumer<GenericRecord> consumer =
client1.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic.toString())
+ .subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscribe();
+ @Cleanup
+ Producer<Schemas.PersonOne> producer =
client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topic.toString())
+ .enableBatching(false)
+ .create();
+ for (int i = 0; i < 20; i++) {
+ producer.send(new Schemas.PersonOne(i));
+ }
+
+ // Verify "pendingMessages" still is correct even if error occurs.
+ PersistentReplicator replicator = ensureReplicatorCreated(topic,
pulsar1);
+ waitReplicateFinish(topic, admin1);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals((int) WhiteboxImpl.getInternalState(replicator,
"pendingMessages"), 0);
+ });
+ }
+
+ @Test
+ public void testReplicationWillNotStuckByIncompleteSchemaFuture() throws
Exception {
+ int originalReplicationProducerQueueSize =
pulsar1.getConfiguration().getReplicationProducerQueueSize();
+ pulsar1.getConfiguration().setReplicationProducerQueueSize(5);
+ // Init replicator and send many messages.
+ PulsarClient client1 = pulsar1.getClient();
+ final TopicName topic = TopicName
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/testReplicationWithGetSchemaError"));
+ admin1.namespaces().setSchemaCompatibilityStrategy("pulsar/ns1",
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+ final String subName = "sub";
+ @Cleanup
+ Consumer<GenericRecord> consumer =
client1.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic.toString())
+ .subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscribe();
+ Thread sendTask = new Thread(() -> {
+ for (int i = 0; i < 50; i++) {
+ try {
+ Schema schema =
Schema.JSON(SchemaDefinition.builder().withJsonDef(String.format(
+ "{"
+ + "\"type\": \"record\","
+ + "\"name\": \"Test_Pojo\","
+ + "\"namespace\":
\"org.apache.pulsar.schema.compatibility\","
+ + "\"fields\": [{"
+ + "\"name\": \"prop_%s\","
+ + "\"type\": [\"null\", \"string\"],"
+ + "\"default\": null"
+ + "}]"
+ + "}", i)).build());
+ Producer producer = client1
+ .newProducer(schema)
+ .topic(topic.toString())
+ .create();
+ producer.send(String.valueOf(i).toString().getBytes());
+ pulsar1.getBrokerService().checkReplicationPolicies();
+ producer.close();
+ Thread.sleep(100);
+ } catch (Exception e){
+ }
+ }
+ });
+ sendTask.start();
+
+ // Verify the replicate task can finish.
+ ensureReplicatorCreated(topic, pulsar1);
+ sendTask.join();
+ waitReplicateFinish(topic, admin1);
+
+ // cleanup
+
pulsar1.getConfiguration().setReplicationProducerQueueSize(originalReplicationProducerQueueSize);
+ }
+
+ private static void waitReplicateFinish(TopicName topicName, PulsarAdmin
admin){
+ Awaitility.await().untilAsserted(() -> {
+ for (Map.Entry<String, ? extends ReplicatorStats> subStats :
+ admin.topics().getStats(topicName.toString(), true,
false).getReplication().entrySet()){
+ assertTrue(subStats.getValue().getReplicationBacklog() == 0,
"replication task finished");
+ }
+ });
+ }
+
+ private static PersistentReplicator ensureReplicatorCreated(TopicName
topicName, PulsarService pulsar) {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName.toString(), false).join().get();
+ Awaitility.await().until(() ->
!persistentTopic.getReplicators().isEmpty());
+ return (PersistentReplicator)
persistentTopic.getReplicators().values().iterator().next();
+ }
+
@Test
public void testReplicationOverrides() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 147711d4676..880c835742c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -18,14 +18,11 @@
*/
package org.apache.pulsar.broker.service;
-import com.google.common.io.Resources;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-
import com.google.common.collect.Sets;
-
+import com.google.common.io.Resources;
import io.netty.util.concurrent.DefaultThreadFactory;
-
import java.net.URL;
import java.util.Optional;
import java.util.Set;
@@ -34,11 +31,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -49,8 +43,9 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.slf4j.Logger;