This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 06cedc5465adea243b421046cd6c1fca57e4becf
Author: Penghui Li <[email protected]>
AuthorDate: Fri Aug 19 09:17:48 2022 +0800

    [fix][broker] Fix out of order data replication (#17154)
    
    * [fix][broker] Fix out of order data replication
    
    ### Motivation
    
    The schema replication will break the data replication order while fetching
    the schema from the local cluster.
    
    
https://github.com/apache/pulsar/blob/8a6ecd7d4c9399bb7ce5a224ca854e4a71db79b1/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L366-L369
    
    The method getSchemaInfo() is an async method that will reverse the order 
in which messages are written.
    
    ### Modification
    
    Added a new state for replicator `fetchSchemaInProgress` which means the
    replicator had detected a new schema that needed to fetch the schema info
    from the local cluster. During the schema fetching, the replicator should
    pause the data replicator and resume after the schema has been fetched.
    
    (cherry picked from commit 39c1ee1aebc06c85b7dcb203b8cfa16fe035ae27)
---
 .../service/persistent/PersistentReplicator.java   | 50 ++++++++++++++++++----
 .../pulsar/broker/service/ReplicatorTest.java      | 50 ++++++++++++++++------
 2 files changed, 79 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 953300e823f..2e7dbb2fbf1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -105,6 +105,8 @@ public class PersistentReplicator extends AbstractReplicator
 
     private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
 
+    private volatile boolean fetchSchemaInProgress = false;
+
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, 
String localCluster, String remoteCluster,
                                 BrokerService brokerService, PulsarClientImpl 
replicationClient)
             throws PulsarServerException {
@@ -220,6 +222,11 @@ public class PersistentReplicator extends 
AbstractReplicator
     }
 
     protected void readMoreEntries() {
+        if (fetchSchemaInProgress) {
+            log.info("[{}][{} -> {}] Skip the reading due to new detected 
schema",
+                    topicName, localCluster, remoteCluster);
+            return;
+        }
         int availablePermits = getAvailablePermits();
 
         if (availablePermits > 0) {
@@ -290,8 +297,15 @@ public class PersistentReplicator extends 
AbstractReplicator
             // This flag is set to true when we skip atleast one local message,
             // in order to skip remaining local messages.
             boolean isLocalMessageSkippedOnce = false;
+            boolean skipRemainingMessages = false;
             for (int i = 0; i < entries.size(); i++) {
                 Entry entry = entries.get(i);
+                // Skip the messages since the replicator need to fetch the 
schema info to replicate the schema to the
+                // remote cluster. Rewind the cursor first and continue the 
message read after fetched the schema.
+                if (skipRemainingMessages) {
+                    entry.release();
+                    continue;
+                }
                 int length = entry.getLength();
                 ByteBuf headersAndPayload = entry.getDataBuffer();
                 MessageImpl msg;
@@ -364,16 +378,34 @@ public class PersistentReplicator extends 
AbstractReplicator
 
                 headersAndPayload.retain();
 
-                getSchemaInfo(msg).thenAccept(schemaInfo -> {
-                    msg.setSchemaInfoForReplicator(schemaInfo);
+                CompletableFuture<SchemaInfo> schemaFuture = 
getSchemaInfo(msg);
+                if (!schemaFuture.isDone() || 
schemaFuture.isCompletedExceptionally()) {
+                    entry.release();
+                    headersAndPayload.release();
+                    msg.recycle();
+                    // Mark the replicator is fetching the schema for now and 
rewind the cursor
+                    // and trigger the next read after complete the schema 
fetching.
+                    fetchSchemaInProgress = true;
+                    skipRemainingMessages = true;
+                    cursor.cancelPendingReadRequest();
+                    log.info("[{}][{} -> {}] Pause the data replication due to 
new detected schema", topicName,
+                            localCluster, remoteCluster);
+                    schemaFuture.whenComplete((__, e) -> {
+                       if (e != null) {
+                           log.warn("[{}][{} -> {}] Failed to get schema from 
local cluster, will try in the next loop",
+                                   topicName, localCluster, remoteCluster, e);
+                       }
+                       log.info("[{}][{} -> {}] Resume the data replication 
after the schema fetching done", topicName,
+                               localCluster, remoteCluster);
+                       cursor.rewind();
+                       fetchSchemaInProgress = false;
+                       readMoreEntries();
+                    });
+                } else {
+                    msg.setSchemaInfoForReplicator(schemaFuture.get());
                     producer.sendAsync(msg, ProducerSendCallback.create(this, 
entry, msg));
-                }).exceptionally(ex -> {
-                    log.error("[{}][{} -> {}] Failed to get schema from local 
cluster", topicName,
-                            localCluster, remoteCluster, ex);
-                    return null;
-                });
-
-                atLeastOneMessageSentForReplication = true;
+                    atLeastOneMessageSentForReplication = true;
+                }
             }
         } catch (Exception e) {
             log.error("[{}][{} -> {}] Unexpected exception: {}", topicName, 
localCluster, remoteCluster, e.getMessage(),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 21ca713aa30..72d04d95397 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -80,6 +80,7 @@ import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -383,8 +384,11 @@ public class ReplicatorTest extends ReplicatorTestBase {
         consumer3.receive(1);
     }
 
-    @Test
+    @Test(invocationCount = 5)
     public void testReplicationWithSchema() throws Exception {
+        config1.setBrokerDeduplicationEnabled(true);
+        config2.setBrokerDeduplicationEnabled(true);
+        config3.setBrokerDeduplicationEnabled(true);
         PulsarClient client1 = pulsar1.getClient();
         PulsarClient client2 = pulsar2.getClient();
         PulsarClient client3 = pulsar3.getClient();
@@ -394,17 +398,29 @@ public class ReplicatorTest extends ReplicatorTestBase {
         final String subName = "my-sub";
 
         @Cleanup
-        Producer<Schemas.PersonOne> producer = 
client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+        Producer<Schemas.PersonOne> producer1 = 
client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic.toString())
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Producer<Schemas.PersonThree> producer2 = 
client1.newProducer(Schema.AVRO(Schemas.PersonThree.class))
                 .topic(topic.toString())
+                .enableBatching(false)
                 .create();
 
         admin1.topics().createSubscription(topic.toString(), subName, 
MessageId.earliest);
         admin2.topics().createSubscription(topic.toString(), subName, 
MessageId.earliest);
         admin3.topics().createSubscription(topic.toString(), subName, 
MessageId.earliest);
 
+        final int totalMessages = 1000;
 
-        for (int i = 0; i < 10; i++) {
-            producer.send(new Schemas.PersonOne(i));
+        for (int i = 0; i < totalMessages / 2; i++) {
+            producer1.sendAsync(new Schemas.PersonOne(i));
+        }
+
+        for (int i = 500; i < totalMessages; i++) {
+            producer2.sendAsync(new Schemas.PersonThree(i, "name-" + i));
         }
 
         Awaitility.await().untilAsserted(() -> {
@@ -414,29 +430,39 @@ public class ReplicatorTest extends ReplicatorTestBase {
         });
 
         @Cleanup
-        Consumer<Schemas.PersonOne> consumer1 = 
client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+        Consumer<GenericRecord> consumer1 = 
client1.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic.toString())
                 .subscriptionName(subName)
                 .subscribe();
 
         @Cleanup
-        Consumer<Schemas.PersonOne> consumer2 = 
client2.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+        Consumer<GenericRecord> consumer2 = 
client2.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic.toString())
                 .subscriptionName(subName)
                 .subscribe();
 
         @Cleanup
-        Consumer<Schemas.PersonOne> consumer3 = 
client3.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+        Consumer<GenericRecord> consumer3 = 
client3.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic.toString())
                 .subscriptionName(subName)
                 .subscribe();
 
-        for (int i = 0; i < 10; i++) {
-            Message<Schemas.PersonOne> msg1 = consumer1.receive();
-            Message<Schemas.PersonOne> msg2 = consumer2.receive();
-            Message<Schemas.PersonOne> msg3 = consumer3.receive();
+        int lastId = -1;
+        for (int i = 0; i < totalMessages; i++) {
+            Message<GenericRecord> msg1 = consumer1.receive();
+            Message<GenericRecord> msg2 = consumer2.receive();
+            Message<GenericRecord> msg3 = consumer3.receive();
             assertTrue(msg1 != null && msg2 != null && msg3 != null);
-            assertTrue(msg1.getValue().equals(msg2.getValue()) && 
msg2.getValue().equals(msg3.getValue()));
+            GenericRecord record1 = msg1.getValue();
+            GenericRecord record2 = msg2.getValue();
+            GenericRecord record3 = msg3.getValue();
+            int id1 = (int) record1.getField("id");
+            int id2 = (int) record2.getField("id");
+            int id3 = (int) record3.getField("id");
+            log.info("Received ids, id1: {}, id2: {}, id3: {}, lastId: {}", 
id1, id2, id3, lastId);
+            assertTrue(id1 == id2 && id2 == id3);
+            assertTrue(id1 > lastId);
+            lastId = id1;
             consumer1.acknowledge(msg1);
             consumer2.acknowledge(msg2);
             consumer3.acknowledge(msg3);

Reply via email to