This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 348865cf94b [fix] [broker] Counter of pending send messages in
Replicator incorrect if schema future not complete (#19242)
348865cf94b is described below
commit 348865cf94b918b336bb9cfd1cde22843001139d
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jan 17 15:23:45 2023 +0800
[fix] [broker] Counter of pending send messages in Replicator incorrect if
schema future not complete (#19242)
(cherry picked from commit 3ba6fa8f2f31e7805aad5cb03252ad5c519c10c2)
---
.../service/persistent/PersistentReplicator.java | 5 +-
.../pulsar/broker/service/ReplicatorTest.java | 102 +++++++++++++++++++++
2 files changed, 104 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 2e7dbb2fbf1..86a46a5c9c2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -369,9 +369,6 @@ public class PersistentReplicator extends AbstractReplicator
dispatchRateLimiter.ifPresent(rateLimiter ->
rateLimiter.tryDispatchPermit(1, entry.getLength()));
- // Increment pending messages for messages produced locally
- PENDING_MESSAGES_UPDATER.incrementAndGet(this);
-
msgOut.recordEvent(headersAndPayload.readableBytes());
msg.setReplicatedFrom(localCluster);
@@ -403,6 +400,8 @@ public class PersistentReplicator extends AbstractReplicator
});
} else {
msg.setSchemaInfoForReplicator(schemaFuture.get());
+ // Increment pending messages for messages produced locally
+ PENDING_MESSAGES_UPDATER.incrementAndGet(this);
producer.sendAsync(msg, ProducerSendCallback.create(this,
entry, msg));
atLeastOneMessageSentForReplication = true;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index d4d7c9d5e13..156b906c4d4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -84,6 +84,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -96,10 +97,12 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -486,6 +489,105 @@ public class ReplicatorTest extends ReplicatorTestBase {
assertEquals(consumer3.receive().getValue().getNativeObject(), data);
}
+ @Test
+ public void testCounterOfPendingMessagesCorrect() throws Exception {
+ // Init replicator and send many messages.
+ PulsarClient client1 = pulsar1.getClient();
+ final TopicName topic = TopicName
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/testReplicationWithGetSchemaError"));
+ final String subName = "my-sub";
+ @Cleanup
+ Consumer<GenericRecord> consumer =
client1.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic.toString())
+ .subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscribe();
+ @Cleanup
+ Producer<Schemas.PersonOne> producer =
client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+ .topic(topic.toString())
+ .enableBatching(false)
+ .create();
+ for (int i = 0; i < 20; i++) {
+ producer.send(new Schemas.PersonOne(i));
+ }
+
+ // Verify "pendingMessages" still is correct even if error occurs.
+ PersistentReplicator replicator = ensureReplicatorCreated(topic,
pulsar1);
+ waitReplicateFinish(topic, admin1);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals((int) WhiteboxImpl.getInternalState(replicator,
"pendingMessages"), 0);
+ });
+ }
+
+ @Test
+ public void testReplicationWillNotStuckByIncompleteSchemaFuture() throws
Exception {
+ int originalReplicationProducerQueueSize =
pulsar1.getConfiguration().getReplicationProducerQueueSize();
+ pulsar1.getConfiguration().setReplicationProducerQueueSize(5);
+ // Init replicator and send many messages.
+ PulsarClient client1 = pulsar1.getClient();
+ final TopicName topic = TopicName
+
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/testReplicationWithGetSchemaError"));
+ admin1.namespaces().setSchemaCompatibilityStrategy("pulsar/ns1",
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+ final String subName = "sub";
+ @Cleanup
+ Consumer<GenericRecord> consumer =
client1.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic.toString())
+ .subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscribe();
+ Thread sendTask = new Thread(() -> {
+ for (int i = 0; i < 50; i++) {
+ try {
+ Schema schema =
Schema.JSON(SchemaDefinition.builder().withJsonDef(String.format(
+ "{"
+ + "\"type\": \"record\","
+ + "\"name\": \"Test_Pojo\","
+ + "\"namespace\":
\"org.apache.pulsar.schema.compatibility\","
+ + "\"fields\": [{"
+ + "\"name\": \"prop_%s\","
+ + "\"type\": [\"null\", \"string\"],"
+ + "\"default\": null"
+ + "}]"
+ + "}", i)).build());
+ Producer producer = client1
+ .newProducer(schema)
+ .topic(topic.toString())
+ .create();
+ producer.send(String.valueOf(i).toString().getBytes());
+ pulsar1.getBrokerService().checkReplicationPolicies();
+ producer.close();
+ Thread.sleep(100);
+ } catch (Exception e){
+ }
+ }
+ });
+ sendTask.start();
+
+ // Verify the replicate task can finish.
+ ensureReplicatorCreated(topic, pulsar1);
+ sendTask.join();
+ waitReplicateFinish(topic, admin1);
+
+ // cleanup
+
pulsar1.getConfiguration().setReplicationProducerQueueSize(originalReplicationProducerQueueSize);
+ }
+
+ private static void waitReplicateFinish(TopicName topicName, PulsarAdmin
admin){
+ Awaitility.await().untilAsserted(() -> {
+ for (Map.Entry<String, ? extends ReplicatorStats> subStats :
+ admin.topics().getStats(topicName.toString(), true, false,
false).getReplication().entrySet()){
+ assertTrue(subStats.getValue().getReplicationBacklog() == 0,
"replication task finished");
+ }
+ });
+ }
+
+ private static PersistentReplicator ensureReplicatorCreated(TopicName
topicName, PulsarService pulsar) {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(topicName.toString(), false).join().get();
+ Awaitility.await().until(() ->
!persistentTopic.getReplicators().isEmpty());
+ return (PersistentReplicator)
persistentTopic.getReplicators().values().iterator().next();
+ }
+
@Test
public void testReplicationOverrides() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");