This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new c1ffd5866fc [fix](journal) ensure txns are matched with the master 
before replaying (#28192) (#28537)
c1ffd5866fc is described below

commit c1ffd5866fc52b7d122a149d0e18aa685bc7931e
Author: walter <[email protected]>
AuthorDate: Mon Dec 18 15:00:39 2023 +0800

    [fix](journal) ensure txns are matched with the master before replaying 
(#28192) (#28537)
---
 .../apache/doris/journal/bdbje/BDBJEJournal.java   | 72 +++++++++++++++++++---
 1 file changed, 62 insertions(+), 10 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 70a22970b13..d805bf1d5a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -36,11 +36,15 @@ import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.DatabaseNotFoundException;
 import com.sleepycat.je.LockMode;
 import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
 import com.sleepycat.je.rep.InsufficientLogException;
 import com.sleepycat.je.rep.NetworkRestore;
 import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.ReplicaConsistencyException;
 import com.sleepycat.je.rep.ReplicaWriteException;
 import com.sleepycat.je.rep.RollbackException;
+import com.sleepycat.je.rep.TimeConsistencyPolicy;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -49,6 +53,7 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /*
@@ -129,10 +134,7 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
 
         // id is the key
         long id = nextJournalId.getAndIncrement();
-        Long idLong = id;
-        DatabaseEntry theKey = new DatabaseEntry();
-        TupleBinding<Long> idBinding = 
TupleBinding.getPrimitiveBinding(Long.class);
-        idBinding.objectToEntry(idLong, theKey);
+        DatabaseEntry theKey = idToKey(id);
 
         // entity is the value
         DataOutputBuffer buffer = new 
DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
@@ -208,6 +210,13 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
         return id;
     }
 
+    private static DatabaseEntry idToKey(Long id) {
+        DatabaseEntry theKey = new DatabaseEntry();
+        TupleBinding<Long> idBinding = 
TupleBinding.getPrimitiveBinding(Long.class);
+        idBinding.objectToEntry(id, theKey);
+        return theKey;
+    }
+
     @Override
     public JournalEntity read(long journalId) {
         List<Long> dbNames = getDatabaseNames();
@@ -229,7 +238,7 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
         }
 
         JournalEntity ret = null;
-        Long key = new Long(journalId);
+        Long key = journalId;
         DatabaseEntry theKey = new DatabaseEntry();
         TupleBinding<Long> myBinding = 
TupleBinding.getPrimitiveBinding(Long.class);
         myBinding.objectToEntry(key, theKey);
@@ -275,7 +284,7 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
         if (dbNames == null) {
             return ret;
         }
-        if (dbNames.size() == 0) {
+        if (dbNames.isEmpty()) {
             return ret;
         }
 
@@ -283,9 +292,52 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
         String dbName = dbNames.get(index).toString();
         long dbNumberName = dbNames.get(index);
         Database database = bdbEnvironment.openDatabase(dbName);
-        ret = dbNumberName + database.count() - 1;
+        if (!isReplicaTxnAreMatched(database, dbNumberName)) {
+            LOG.warn("The current replica hasn't synced up with the master, 
current db name: {}", dbNumberName);
+            if (index != 0) {
+                // Because roll journal occurs after write, the previous write 
must have
+                // been replicated to the majority, so it can be guaranteed 
that the database
+                // will not be rollback.
+                return dbNumberName - 1;
+            }
+            return -1;
+        }
+        return dbNumberName + database.count() - 1;
+    }
 
-        return ret;
+    // Whether the replica txns are matched with the master.
+    //
+    // BDBJE could throw InsufficientAcksException during post commit, at that 
time the
+    // log has persisted in disk. When the replica is restarted, we need to 
ensure that
+    // before replaying the journals, sync up txns with the new master in the 
cluster and
+    // rollback the txns that have been persisted but have not committed to 
the majority.
+    //
+    // See 
org.apache.doris.journal.bdbje.BDBEnvironmentTest#testReadTxnIsNotMatched for 
details.
+    private boolean isReplicaTxnAreMatched(Database database, Long id) {
+        // The time lag is set to Integer.MAX_VALUE if the replica haven't 
synced up
+        // with the master. By allowing a very large lag, we can detect 
whether the
+        // replica has synced up with the master.
+        TimeConsistencyPolicy consistencyPolicy = new TimeConsistencyPolicy(
+                1, TimeUnit.DAYS, 1, TimeUnit.MINUTES);
+        Transaction txn = null;
+        try {
+            TransactionConfig cfg = new TransactionConfig()
+                    .setReadOnly(true)
+                    .setReadCommitted(true)
+                    .setConsistencyPolicy(consistencyPolicy);
+
+            txn = 
bdbEnvironment.getReplicatedEnvironment().beginTransaction(null, cfg);
+
+            DatabaseEntry key = idToKey(id);
+            database.get(txn, key, null, LockMode.READ_COMMITTED);
+            return true;
+        } catch (ReplicaConsistencyException e) {
+            return false;
+        } finally {
+            if (txn != null) {
+                txn.abort();
+            }
+        }
     }
 
     @Override
@@ -298,7 +350,7 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
         if (dbNames == null) {
             return ret;
         }
-        if (dbNames.size() == 0) {
+        if (dbNames.isEmpty()) {
             return ret;
         }
 
@@ -354,7 +406,7 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
                     LOG.error("fail to get dbNames while open bdbje journal. 
will exit");
                     System.exit(-1);
                 }
-                if (dbNames.size() == 0) {
+                if (dbNames.isEmpty()) {
                     /*
                      * This is the very first time to open. Usually, we will 
open a new database
                      * named "1".


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to