This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new c1ffd5866fc [fix](journal) ensure txns are matched with the master
before replaying (#28192) (#28537)
c1ffd5866fc is described below
commit c1ffd5866fc52b7d122a149d0e18aa685bc7931e
Author: walter <[email protected]>
AuthorDate: Mon Dec 18 15:00:39 2023 +0800
[fix](journal) ensure txns are matched with the master before replaying
(#28192) (#28537)
---
.../apache/doris/journal/bdbje/BDBJEJournal.java | 72 +++++++++++++++++++---
1 file changed, 62 insertions(+), 10 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 70a22970b13..d805bf1d5a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -36,11 +36,15 @@ import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.ReplicaConsistencyException;
import com.sleepycat.je.rep.ReplicaWriteException;
import com.sleepycat.je.rep.RollbackException;
+import com.sleepycat.je.rep.TimeConsistencyPolicy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -49,6 +53,7 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/*
@@ -129,10 +134,7 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
// id is the key
long id = nextJournalId.getAndIncrement();
- Long idLong = id;
- DatabaseEntry theKey = new DatabaseEntry();
- TupleBinding<Long> idBinding =
TupleBinding.getPrimitiveBinding(Long.class);
- idBinding.objectToEntry(idLong, theKey);
+ DatabaseEntry theKey = idToKey(id);
// entity is the value
DataOutputBuffer buffer = new
DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
@@ -208,6 +210,13 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
return id;
}
+ private static DatabaseEntry idToKey(Long id) {
+ DatabaseEntry theKey = new DatabaseEntry();
+ TupleBinding<Long> idBinding =
TupleBinding.getPrimitiveBinding(Long.class);
+ idBinding.objectToEntry(id, theKey);
+ return theKey;
+ }
+
@Override
public JournalEntity read(long journalId) {
List<Long> dbNames = getDatabaseNames();
@@ -229,7 +238,7 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
}
JournalEntity ret = null;
- Long key = new Long(journalId);
+ Long key = journalId;
DatabaseEntry theKey = new DatabaseEntry();
TupleBinding<Long> myBinding =
TupleBinding.getPrimitiveBinding(Long.class);
myBinding.objectToEntry(key, theKey);
@@ -275,7 +284,7 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
if (dbNames == null) {
return ret;
}
- if (dbNames.size() == 0) {
+ if (dbNames.isEmpty()) {
return ret;
}
@@ -283,9 +292,52 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
String dbName = dbNames.get(index).toString();
long dbNumberName = dbNames.get(index);
Database database = bdbEnvironment.openDatabase(dbName);
- ret = dbNumberName + database.count() - 1;
+ if (!isReplicaTxnAreMatched(database, dbNumberName)) {
+ LOG.warn("The current replica hasn't synced up with the master,
current db name: {}", dbNumberName);
+ if (index != 0) {
+ // Because roll journal occurs after write, the previous write
must have
+ // been replicated to the majority, so it can be guaranteed
that the database
+ // will not be rollback.
+ return dbNumberName - 1;
+ }
+ return -1;
+ }
+ return dbNumberName + database.count() - 1;
+ }
- return ret;
+ // Whether the replica txns are matched with the master.
+ //
+ // BDBJE could throw InsufficientAcksException during post commit, at that
time the
+ // log has persisted in disk. When the replica is restarted, we need to
ensure that
+ // before replaying the journals, sync up txns with the new master in the
cluster and
+ // rollback the txns that have been persisted but have not committed to
the majority.
+ //
+ // See
org.apache.doris.journal.bdbje.BDBEnvironmentTest#testReadTxnIsNotMatched for
details.
+ private boolean isReplicaTxnAreMatched(Database database, Long id) {
+ // The time lag is set to Integer.MAX_VALUE if the replica haven't
synced up
+ // with the master. By allowing a very large lag, we can detect
whether the
+ // replica has synced up with the master.
+ TimeConsistencyPolicy consistencyPolicy = new TimeConsistencyPolicy(
+ 1, TimeUnit.DAYS, 1, TimeUnit.MINUTES);
+ Transaction txn = null;
+ try {
+ TransactionConfig cfg = new TransactionConfig()
+ .setReadOnly(true)
+ .setReadCommitted(true)
+ .setConsistencyPolicy(consistencyPolicy);
+
+ txn =
bdbEnvironment.getReplicatedEnvironment().beginTransaction(null, cfg);
+
+ DatabaseEntry key = idToKey(id);
+ database.get(txn, key, null, LockMode.READ_COMMITTED);
+ return true;
+ } catch (ReplicaConsistencyException e) {
+ return false;
+ } finally {
+ if (txn != null) {
+ txn.abort();
+ }
+ }
}
@Override
@@ -298,7 +350,7 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
if (dbNames == null) {
return ret;
}
- if (dbNames.size() == 0) {
+ if (dbNames.isEmpty()) {
return ret;
}
@@ -354,7 +406,7 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
LOG.error("fail to get dbNames while open bdbje journal.
will exit");
System.exit(-1);
}
- if (dbNames.size() == 0) {
+ if (dbNames.isEmpty()) {
/*
* This is the very first time to open. Usually, we will
open a new database
* named "1".
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]