This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 04d502946ca [fix] [broker] Counter of pending send messages in 
Replicator incorrect if schema future not complete (#19242)
04d502946ca is described below

commit 04d502946ca20d1ce8d1d3cb4c3edbfd0a45b790
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jan 17 15:23:45 2023 +0800

    [fix] [broker] Counter of pending send messages in Replicator incorrect if 
schema future not complete (#19242)
    
    (cherry picked from commit 3ba6fa8f2f31e7805aad5cb03252ad5c519c10c2)
---
 .../service/persistent/PersistentReplicator.java   |   5 +-
 .../pulsar/broker/service/ReplicatorTest.java      | 114 +++++++++++++++++++--
 .../pulsar/broker/service/ReplicatorTestBase.java  |  11 +-
 3 files changed, 111 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index c10b70df6a8..9ea4891874e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -368,9 +368,6 @@ public class PersistentReplicator extends AbstractReplicator
 
                 dispatchRateLimiter.ifPresent(rateLimiter -> 
rateLimiter.tryDispatchPermit(1, entry.getLength()));
 
-                // Increment pending messages for messages produced locally
-                PENDING_MESSAGES_UPDATER.incrementAndGet(this);
-
                 msgOut.recordEvent(headersAndPayload.readableBytes());
 
                 msg.setReplicatedFrom(localCluster);
@@ -402,6 +399,8 @@ public class PersistentReplicator extends AbstractReplicator
                     });
                 } else {
                     msg.setSchemaInfoForReplicator(schemaFuture.get());
+                    // Increment pending messages for messages produced locally
+                    PENDING_MESSAGES_UPDATER.incrementAndGet(this);
                     producer.sendAsync(msg, ProducerSendCallback.create(this, 
entry, msg));
                     atLeastOneMessageSentForReplication = true;
                 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index b20083e84a4..e03a0c1acb9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -18,26 +18,24 @@
  */
 package org.apache.pulsar.broker.service;
 
-import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.collect.Sets;
 import com.scurrilous.circe.checksum.Crc32cIntChecksum;
-
 import io.netty.buffer.ByteBuf;
-
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedSet;
@@ -52,9 +50,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-
 import lombok.Cleanup;
-
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -63,6 +59,7 @@ import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedE
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -81,7 +78,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -93,10 +90,12 @@ import 
org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.schema.Schemas;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -482,6 +481,105 @@ public class ReplicatorTest extends ReplicatorTestBase {
         assertEquals(consumer3.receive().getValue().getNativeObject(), data);
     }
 
+    @Test
+    public void testCounterOfPendingMessagesCorrect() throws Exception {
+        // Init replicator and send many messages.
+        PulsarClient client1 = pulsar1.getClient();
+        final TopicName topic = TopicName
+                
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/testReplicationWithGetSchemaError"));
+        final String subName = "my-sub";
+        @Cleanup
+        Consumer<GenericRecord> consumer = 
client1.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic.toString())
+                .subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscribe();
+        @Cleanup
+        Producer<Schemas.PersonOne> producer = 
client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic.toString())
+                .enableBatching(false)
+                .create();
+        for (int i = 0; i < 20; i++) {
+            producer.send(new Schemas.PersonOne(i));
+        }
+
+        // Verify "pendingMessages" still is correct even if error occurs.
+        PersistentReplicator replicator = ensureReplicatorCreated(topic, 
pulsar1);
+        waitReplicateFinish(topic, admin1);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals((int) WhiteboxImpl.getInternalState(replicator, 
"pendingMessages"), 0);
+        });
+    }
+
+    @Test
+    public void testReplicationWillNotStuckByIncompleteSchemaFuture() throws 
Exception {
+        int originalReplicationProducerQueueSize = 
pulsar1.getConfiguration().getReplicationProducerQueueSize();
+        pulsar1.getConfiguration().setReplicationProducerQueueSize(5);
+        // Init replicator and send many messages.
+        PulsarClient client1 = pulsar1.getClient();
+        final TopicName topic = TopicName
+                
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/testReplicationWithGetSchemaError"));
+        admin1.namespaces().setSchemaCompatibilityStrategy("pulsar/ns1", 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+        final String subName = "sub";
+        @Cleanup
+        Consumer<GenericRecord> consumer = 
client1.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topic.toString())
+                .subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscribe();
+        Thread sendTask = new Thread(() -> {
+            for (int i = 0; i < 50; i++) {
+                try {
+                    Schema schema = 
Schema.JSON(SchemaDefinition.builder().withJsonDef(String.format(
+                            "{"
+                                + "\"type\": \"record\","
+                                + "\"name\": \"Test_Pojo\","
+                                + "\"namespace\": 
\"org.apache.pulsar.schema.compatibility\","
+                                + "\"fields\": [{"
+                                    + "\"name\": \"prop_%s\","
+                                    + "\"type\": [\"null\", \"string\"],"
+                                    + "\"default\": null"
+                                + "}]"
+                            + "}", i)).build());
+                    Producer producer = client1
+                            .newProducer(schema)
+                            .topic(topic.toString())
+                            .create();
+                    producer.send(String.valueOf(i).toString().getBytes());
+                    pulsar1.getBrokerService().checkReplicationPolicies();
+                    producer.close();
+                    Thread.sleep(100);
+                } catch (Exception e){
+                }
+            }
+        });
+        sendTask.start();
+
+        // Verify the replicate task can finish.
+        ensureReplicatorCreated(topic, pulsar1);
+        sendTask.join();
+        waitReplicateFinish(topic, admin1);
+
+        // cleanup
+        
pulsar1.getConfiguration().setReplicationProducerQueueSize(originalReplicationProducerQueueSize);
+    }
+
+    private static void waitReplicateFinish(TopicName topicName, PulsarAdmin 
admin){
+        Awaitility.await().untilAsserted(() -> {
+            for (Map.Entry<String, ? extends ReplicatorStats> subStats :
+                    admin.topics().getStats(topicName.toString(), true, 
false).getReplication().entrySet()){
+                assertTrue(subStats.getValue().getReplicationBacklog() == 0, 
"replication task finished");
+            }
+        });
+    }
+
+    private static PersistentReplicator ensureReplicatorCreated(TopicName 
topicName, PulsarService pulsar) {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName.toString(), false).join().get();
+        Awaitility.await().until(() -> 
!persistentTopic.getReplicators().isEmpty());
+        return (PersistentReplicator) 
persistentTopic.getReplicators().values().iterator().next();
+    }
+
     @Test
     public void testReplicationOverrides() throws Exception {
         log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 147711d4676..880c835742c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -18,14 +18,11 @@
  */
 package org.apache.pulsar.broker.service;
 
-import com.google.common.io.Resources;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
-
 import com.google.common.collect.Sets;
-
+import com.google.common.io.Resources;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
 import java.net.URL;
 import java.util.Optional;
 import java.util.Set;
@@ -34,11 +31,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -49,8 +43,9 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.tests.TestRetrySupport;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.pulsar.zookeeper.ZookeeperServerTest;
 import org.slf4j.Logger;

Reply via email to