This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8d293a2b5fcfe44c78fc40022ab40e11387e2fd4
Author: Penghui Li <[email protected]>
AuthorDate: Fri Aug 19 09:17:48 2022 +0800

    [fix][broker] Fix out of order data replication (#17154)
    
    * [fix][broker] Fix out of order data replication
    
    The schema replication will break the data replication order while fetching
    the schema from the local cluster.
    
    
https://github.com/apache/pulsar/blob/8a6ecd7d4c9399bb7ce5a224ca854e4a71db79b1/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L366-L369
    
    The method getSchemaInfo() is an async method that will reverse the order 
in which messages are written.
    
    Added a new state for replicator `fetchSchemaInProgress` which means the
    replicator had detected a new schema that needed to fetch the schema info
    from the local cluster. During the schema fetching, the replicator should
    pause the data replicator and resume after the schema has been fetched.
    
    (cherry picked from commit 39c1ee1aebc06c85b7dcb203b8cfa16fe035ae27)
---
 .../service/persistent/PersistentReplicator.java   | 50 +++++++++++++++++----
 .../pulsar/broker/service/ReplicatorTest.java      | 51 +++++++++++++++++-----
 2 files changed, 80 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 1db685ccbec..c10b70df6a8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -104,6 +104,8 @@ public class PersistentReplicator extends AbstractReplicator
 
     private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
 
+    private volatile boolean fetchSchemaInProgress = false;
+
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, 
String localCluster, String remoteCluster,
                                 BrokerService brokerService, PulsarClientImpl 
replicationClient)
             throws PulsarServerException {
@@ -219,6 +221,11 @@ public class PersistentReplicator extends 
AbstractReplicator
     }
 
     protected void readMoreEntries() {
+        if (fetchSchemaInProgress) {
+            log.info("[{}][{} -> {}] Skip the reading due to new detected 
schema",
+                    topicName, localCluster, remoteCluster);
+            return;
+        }
         int availablePermits = getAvailablePermits();
 
         if (availablePermits > 0) {
@@ -289,8 +296,15 @@ public class PersistentReplicator extends 
AbstractReplicator
             // This flag is set to true when we skip atleast one local message,
             // in order to skip remaining local messages.
             boolean isLocalMessageSkippedOnce = false;
+            boolean skipRemainingMessages = false;
             for (int i = 0; i < entries.size(); i++) {
                 Entry entry = entries.get(i);
+                // Skip the messages since the replicator need to fetch the 
schema info to replicate the schema to the
+                // remote cluster. Rewind the cursor first and continue the 
message read after fetched the schema.
+                if (skipRemainingMessages) {
+                    entry.release();
+                    continue;
+                }
                 int length = entry.getLength();
                 ByteBuf headersAndPayload = entry.getDataBuffer();
                 MessageImpl msg;
@@ -363,16 +377,34 @@ public class PersistentReplicator extends 
AbstractReplicator
 
                 headersAndPayload.retain();
 
-                getSchemaInfo(msg).thenAccept(schemaInfo -> {
-                    msg.setSchemaInfoForReplicator(schemaInfo);
+                CompletableFuture<SchemaInfo> schemaFuture = 
getSchemaInfo(msg);
+                if (!schemaFuture.isDone() || 
schemaFuture.isCompletedExceptionally()) {
+                    entry.release();
+                    headersAndPayload.release();
+                    msg.recycle();
+                    // Mark the replicator is fetching the schema for now and 
rewind the cursor
+                    // and trigger the next read after complete the schema 
fetching.
+                    fetchSchemaInProgress = true;
+                    skipRemainingMessages = true;
+                    cursor.cancelPendingReadRequest();
+                    log.info("[{}][{} -> {}] Pause the data replication due to 
new detected schema", topicName,
+                            localCluster, remoteCluster);
+                    schemaFuture.whenComplete((__, e) -> {
+                       if (e != null) {
+                           log.warn("[{}][{} -> {}] Failed to get schema from 
local cluster, will try in the next loop",
+                                   topicName, localCluster, remoteCluster, e);
+                       }
+                       log.info("[{}][{} -> {}] Resume the data replication 
after the schema fetching done", topicName,
+                               localCluster, remoteCluster);
+                       cursor.rewind();
+                       fetchSchemaInProgress = false;
+                       readMoreEntries();
+                    });
+                } else {
+                    msg.setSchemaInfoForReplicator(schemaFuture.get());
                     producer.sendAsync(msg, ProducerSendCallback.create(this, 
entry, msg));
-                }).exceptionally(ex -> {
-                    log.error("[{}][{} -> {}] Failed to get schema from local 
cluster", topicName,
-                            localCluster, remoteCluster, ex);
-                    return null;
-                });
-
-                atLeastOneMessageSentForReplication = true;
+                    atLeastOneMessageSentForReplication = true;
+                }
             }
         } catch (Exception e) {
             log.error("[{}][{} -> {}] Unexpected exception: {}", topicName, 
localCluster, remoteCluster, e.getMessage(),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index e78539f985d..9cb6ec45487 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -78,6 +78,8 @@ import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -379,8 +381,11 @@ public class ReplicatorTest extends ReplicatorTestBase {
         consumer3.receive(1);
     }
 
-    @Test
+    @Test(invocationCount = 5)
     public void testReplicationWithSchema() throws Exception {
+        config1.setBrokerDeduplicationEnabled(true);
+        config2.setBrokerDeduplicationEnabled(true);
+        config3.setBrokerDeduplicationEnabled(true);
         PulsarClient client1 = pulsar1.getClient();
         PulsarClient client2 = pulsar2.getClient();
         PulsarClient client3 = pulsar3.getClient();
@@ -390,17 +395,29 @@ public class ReplicatorTest extends ReplicatorTestBase {
         final String subName = "my-sub";
 
         @Cleanup
-        Producer<Schemas.PersonOne> producer = 
client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+        Producer<Schemas.PersonOne> producer1 = 
client1.newProducer(Schema.AVRO(Schemas.PersonOne.class))
+                .topic(topic.toString())
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Producer<Schemas.PersonThree> producer2 = 
client1.newProducer(Schema.AVRO(Schemas.PersonThree.class))
                 .topic(topic.toString())
+                .enableBatching(false)
                 .create();
 
         admin1.topics().createSubscription(topic.toString(), subName, 
MessageId.earliest);
         admin2.topics().createSubscription(topic.toString(), subName, 
MessageId.earliest);
         admin3.topics().createSubscription(topic.toString(), subName, 
MessageId.earliest);
 
+        final int totalMessages = 1000;
 
-        for (int i = 0; i < 10; i++) {
-            producer.send(new Schemas.PersonOne(i));
+        for (int i = 0; i < totalMessages / 2; i++) {
+            producer1.sendAsync(new Schemas.PersonOne(i));
+        }
+
+        for (int i = 500; i < totalMessages; i++) {
+            producer2.sendAsync(new Schemas.PersonThree(i, "name-" + i));
         }
 
         Awaitility.await().untilAsserted(() -> {
@@ -410,29 +427,39 @@ public class ReplicatorTest extends ReplicatorTestBase {
         });
 
         @Cleanup
-        Consumer<Schemas.PersonOne> consumer1 = 
client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+        Consumer<GenericRecord> consumer1 = 
client1.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic.toString())
                 .subscriptionName(subName)
                 .subscribe();
 
         @Cleanup
-        Consumer<Schemas.PersonOne> consumer2 = 
client2.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+        Consumer<GenericRecord> consumer2 = 
client2.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic.toString())
                 .subscriptionName(subName)
                 .subscribe();
 
         @Cleanup
-        Consumer<Schemas.PersonOne> consumer3 = 
client3.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
+        Consumer<GenericRecord> consumer3 = 
client3.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic.toString())
                 .subscriptionName(subName)
                 .subscribe();
 
-        for (int i = 0; i < 10; i++) {
-            Message<Schemas.PersonOne> msg1 = consumer1.receive();
-            Message<Schemas.PersonOne> msg2 = consumer2.receive();
-            Message<Schemas.PersonOne> msg3 = consumer3.receive();
+        int lastId = -1;
+        for (int i = 0; i < totalMessages; i++) {
+            Message<GenericRecord> msg1 = consumer1.receive();
+            Message<GenericRecord> msg2 = consumer2.receive();
+            Message<GenericRecord> msg3 = consumer3.receive();
             assertTrue(msg1 != null && msg2 != null && msg3 != null);
-            assertTrue(msg1.getValue().equals(msg2.getValue()) && 
msg2.getValue().equals(msg3.getValue()));
+            GenericRecord record1 = msg1.getValue();
+            GenericRecord record2 = msg2.getValue();
+            GenericRecord record3 = msg3.getValue();
+            int id1 = (int) record1.getField("id");
+            int id2 = (int) record2.getField("id");
+            int id3 = (int) record3.getField("id");
+            log.info("Received ids, id1: {}, id2: {}, id3: {}, lastId: {}", 
id1, id2, id3, lastId);
+            assertTrue(id1 == id2 && id2 == id3);
+            assertTrue(id1 > lastId);
+            lastId = id1;
             consumer1.acknowledge(msg1);
             consumer2.acknowledge(msg2);
             consumer3.acknowledge(msg3);

Reply via email to