This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c936abd2a3 [fix](fe) when bdbje adding follower, master write op may
failed. (#10376)
c936abd2a3 is described below
commit c936abd2a37b2dd4213c0240b32df98ddd4b4017
Author: Lei Zhang <[email protected]>
AuthorDate: Wed Jul 6 10:29:16 2022 +0800
[fix](fe) when bdbje adding follower, master write op may failed. (#10376)
---
.../java/org/apache/doris/catalog/Catalog.java | 13 ++++
.../src/main/java/org/apache/doris/ha/BDBHA.java | 71 ++++++++++++++++++----
.../apache/doris/journal/bdbje/BDBJEJournal.java | 39 ++++++++----
.../java/org/apache/doris/persist/EditLog.java | 39 +++++++-----
.../java/org/apache/doris/system/Frontend.java | 29 +++++----
.../java/org/apache/doris/system/HeartbeatMgr.java | 2 +-
6 files changed, 140 insertions(+), 53 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 2798b2b36e..4020ffa72f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -2470,6 +2470,7 @@ public class Catalog {
if (role == FrontendNodeType.FOLLOWER || role ==
FrontendNodeType.REPLICA) {
bdbha.addHelperSocket(host, editLogPort);
helperNodes.add(Pair.create(host, editLogPort));
+ bdbha.addUnReadyElectableNode(nodeName, getFollowerCount());
}
bdbha.removeConflictNodeIfExist(host, editLogPort);
editLog.logAddFrontend(fe);
@@ -2499,6 +2500,8 @@ public class Catalog {
if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() ==
FrontendNodeType.REPLICA) {
haProtocol.removeElectableNode(fe.getNodeName());
helperNodes.remove(Pair.create(host, port));
+ BDBHA ha = (BDBHA) haProtocol;
+ ha.removeUnReadyElectableNode(nodeName, getFollowerCount());
}
editLog.logRemoveFrontend(fe);
} finally {
@@ -4931,4 +4934,14 @@ public class Catalog {
sb.append("\nCOMMENT
'").append(table.getComment(true)).append("'");
}
}
+
+ public int getFollowerCount() {
+ int count = 0;
+ for (Frontend fe : frontends.values()) {
+ if (fe.getRole() == FrontendNodeType.FOLLOWER) {
+ count++;
+ }
+ }
+ return count;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java
b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java
index 22d15373d8..02f110b40a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBHA.java
@@ -1,17 +1,17 @@
// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
+// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
+// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
+// with the License. You may obtain a copy of the License at
//
-// http://www.apache.org/licenses/LICENSE-2.0
+// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
+// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
@@ -28,7 +28,9 @@ import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.rep.MasterStateException;
import com.sleepycat.je.rep.MemberNotFoundException;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationGroup;
+import com.sleepycat.je.rep.ReplicationMutableConfig;
import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.UnknownMasterException;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
@@ -37,16 +39,30 @@ import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class BDBHA implements HAProtocol {
private static final Logger LOG = LogManager.getLogger(BDBHA.class);
- private BDBEnvironment environment;
- private String nodeName;
+ private final BDBEnvironment environment;
+ private final String nodeName;
private static final int RETRY_TIME = 3;
+ // Unstable node is a follower node that is joining the cluster but have
not
+ // completed.
+ // We should record this kind node and set the bdb electable group size to
+ // (size_of_all_followers - size_of_unstable_nodes).
+ // Because once the handshake is successful, the joined node is put into
the
+ // optional group,
+ // but it may take a little time for this node to replicate the historical
data.
+ // This node will never respond to a new data replication until the
historical
+ // replication is completed,
+ // and if the master cannot receive a quorum response, the write operation
will
+ // fail.
+ private final Set<String> unReadyElectableNodes = new HashSet<>();
+
public BDBHA(BDBEnvironment env, String nodeName) {
this.environment = env;
this.nodeName = nodeName;
@@ -124,7 +140,8 @@ public class BDBHA implements HAProtocol {
if (leaderIncluded) {
ret.add(replicationNode.getSocketAddress());
} else {
- if
(!replicationNode.getName().equals(replicationGroupAdmin.getMasterNodeName())) {
+ if (!replicationNode.getName()
+
.equals(replicationGroupAdmin.getMasterNodeName())) {
ret.add(replicationNode.getSocketAddress());
}
}
@@ -204,17 +221,20 @@ public class BDBHA implements HAProtocol {
return true;
}
- // When new Follower FE is added to the cluster, it should also be added
to the helper sockets in
+ // When new Follower FE is added to the cluster, it should also be added
to the
+ // helper sockets in
// ReplicationGroupAdmin, in order to fix the following case:
// 1. A Observer starts with helper of master FE.
// 2. Master FE is dead, new Master is elected.
// 3. Observer's helper sockets only contains the info of the dead master
FE.
- // So when you try to get frontends' info from this Observer, it will
throw the Exception:
- // "Could not determine master from helpers at:[/dead master FE
host:port]"
+ // So when you try to get frontends' info from this Observer, it will
throw the
+ // Exception:
+ // "Could not determine master from helpers at:[/dead master FE host:port]"
public void addHelperSocket(String ip, Integer port) {
ReplicationGroupAdmin replicationGroupAdmin =
environment.getReplicationGroupAdmin();
- Set<InetSocketAddress> helperSockets =
Sets.newHashSet(replicationGroupAdmin.getHelperSockets());
- InetSocketAddress newHelperSocket = new InetSocketAddress(ip, port);
+ Set<InetSocketAddress> helperSockets =
+ Sets.newHashSet(replicationGroupAdmin.getHelperSockets());
+ InetSocketAddress newHelperSocket = new InetSocketAddress(ip, port);
if (!helperSockets.contains(newHelperSocket)) {
helperSockets.add(newHelperSocket);
environment.setNewReplicationGroupAdmin(helperSockets);
@@ -240,4 +260,29 @@ public class BDBHA implements HAProtocol {
removeElectableNode(conflictNode);
}
}
+
+ public synchronized void addUnReadyElectableNode(String nodeName, int
totalFollowerCount) {
+ unReadyElectableNodes.add(nodeName);
+ ReplicatedEnvironment replicatedEnvironment =
environment.getReplicatedEnvironment();
+ if (replicatedEnvironment != null) {
+ replicatedEnvironment.setRepMutableConfig(new
ReplicationMutableConfig()
+ .setElectableGroupSizeOverride(totalFollowerCount -
unReadyElectableNodes.size()));
+ }
+ }
+
+ public synchronized void removeUnReadyElectableNode(String nodeName, int
totalFollowerCount) {
+ unReadyElectableNodes.remove(nodeName);
+ ReplicatedEnvironment replicatedEnvironment =
environment.getReplicatedEnvironment();
+ if (replicatedEnvironment != null) {
+ if (unReadyElectableNodes.isEmpty()) {
+ // Setting ElectableGroupSizeOverride to 0 means remove this
config,
+ // and bdb will use the normal electable group size.
+ replicatedEnvironment.setRepMutableConfig(
+ new
ReplicationMutableConfig().setElectableGroupSizeOverride(0));
+ } else {
+ replicatedEnvironment.setRepMutableConfig(new
ReplicationMutableConfig()
+ .setElectableGroupSizeOverride(totalFollowerCount -
unReadyElectableNodes.size()));
+ }
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 4d787f866b..4974ec0f85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -163,7 +163,8 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
if (op == OperationType.OP_TIMESTAMP) {
/*
* Do not exit if the write operation is OP_TIMESTAMP.
- * If all the followers exit except master, master should
continue provide query service.
+ * If all the followers exit except master, master should
continue provide query
+ * service.
* To prevent master exit, we should exempt OP_TIMESTAMP write
*/
nextJournalId.set(id);
@@ -307,7 +308,8 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
}
}
- // Open a new journal database or get last existing one as current
journal database
+ // Open a new journal database or get last existing one as current
journal
+ // database
List<Long> dbNames = null;
for (int i = 0; i < RETRY_TIME; i++) {
try {
@@ -319,10 +321,11 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
}
if (dbNames.size() == 0) {
/*
- * This is the very first time to open. Usually, we will
open a new database named "1".
- * But when we start cluster with an image file copied
from other cluster,
- * here we should open database with name image max
journal id + 1.
- * (default
Catalog.getServingCatalog().getReplayedJournalId() is 0)
+ * This is the very first time to open. Usually, we will
open a new database
+ * named "1".
+ * But when we start cluster with an image file copied
from other cluster,
+ * here we should open database with name image max
journal id + 1.
+ * (default
Catalog.getServingCatalog().getReplayedJournalId() is 0)
*/
String dbName =
Long.toString(Catalog.getServingCatalog().getReplayedJournalId() + 1);
LOG.info("the very first time to open bdb, dbname is {}",
dbName);
@@ -344,8 +347,10 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
private void reSetupBdbEnvironment(InsufficientLogException
insufficientLogEx) {
LOG.warn("catch insufficient log exception. will recover and try
again.", insufficientLogEx);
- // Copy the missing log files from a member of the replication group
who owns the files
- // ATTN: here we use `getServingCatalog()`, because only serving
catalog has helper nodes.
+ // Copy the missing log files from a member of the replication group
who owns
+ // the files
+ // ATTN: here we use `getServingCatalog()`, because only serving
catalog has
+ // helper nodes.
Pair<String, Integer> helperNode =
Catalog.getServingCatalog().getHelperNode();
NetworkRestore restore = new NetworkRestore();
NetworkRestoreConfig config = new NetworkRestoreConfig();
@@ -413,19 +418,23 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
return null;
}
- // Open a new journal database or get last existing one as current
journal database
- List<Long> dbNames = null;
+ // Open a new journal database or get last existing one as current
journal
+ // database
+ List<Long> dbNames = null;
for (int i = 0; i < RETRY_TIME; i++) {
try {
dbNames = bdbEnvironment.getDatabaseNames();
break;
} catch (InsufficientLogException insufficientLogEx) {
/*
- * If this is not a checkpoint thread, which means this maybe
the FE startup thread,
- * or a replay thread. We will reopen bdbEnvironment for these
2 cases to get valid log
+ * If this is not a checkpoint thread, which means this maybe
the FE startup
+ * thread,
+ * or a replay thread. We will reopen bdbEnvironment for these
2 cases to get
+ * valid log
* from helper nodes.
*
- * The checkpoint thread will only run on Master FE. And
Master FE should not encounter
+ * The checkpoint thread will only run on Master FE. And
Master FE should not
+ * encounter
* these exception. So if it happens, throw exception out.
*/
if (!Catalog.isCheckpointThread()) {
@@ -446,4 +455,8 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
return dbNames;
}
+
+ public BDBEnvironment getBDBEnvironment() {
+ return this.bdbEnvironment;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 0871855d20..3e0b531970 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -466,7 +466,7 @@ public class EditLog {
int version = Integer.parseInt(versionString);
if (version > FeConstants.meta_version) {
LOG.error("meta data version is out of date, image:
{}. meta: {}."
- + "please update
FeConstants.meta_version and restart.",
+ + "please update FeConstants.meta_version and
restart.",
MetaContext.get().getMetaVersion(),
FeConstants.meta_version);
System.exit(-1);
}
@@ -546,8 +546,8 @@ public class EditLog {
break;
}
case OperationType.OP_BATCH_REMOVE_TXNS: {
- final BatchRemoveTransactionsOperation operation
- = (BatchRemoveTransactionsOperation)
journal.getData();
+ final BatchRemoveTransactionsOperation operation =
(BatchRemoveTransactionsOperation) journal
+ .getData();
Catalog.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactions(operation);
break;
}
@@ -647,8 +647,8 @@ public class EditLog {
break;
}
case OperationType.OP_CREATE_LOAD_JOB: {
- org.apache.doris.load.loadv2.LoadJob loadJob =
- (org.apache.doris.load.loadv2.LoadJob)
journal.getData();
+ org.apache.doris.load.loadv2.LoadJob loadJob =
(org.apache.doris.load.loadv2.LoadJob) journal
+ .getData();
catalog.getLoadManager().replayCreateLoadJob(loadJob);
break;
}
@@ -742,8 +742,8 @@ public class EditLog {
break;
}
case OperationType.OP_REPLACE_TEMP_PARTITION: {
- ReplacePartitionOperationLog replaceTempPartitionLog
- = (ReplacePartitionOperationLog) journal.getData();
+ ReplacePartitionOperationLog replaceTempPartitionLog =
(ReplacePartitionOperationLog) journal
+ .getData();
catalog.replayReplaceTempPartition(replaceTempPartitionLog);
break;
}
@@ -858,13 +858,19 @@ public class EditLog {
* for a table that no longer exists.
* 1. Thread 1: get TableA object
* 2. Thread 2: lock db and drop table and record edit log of the
dropped TableA
- * 3. Thread 1: lock table, modify table and record edit log of
the modified TableA
+ * 3. Thread 1: lock table, modify table and record edit log of
the modified
+ * TableA
* **The modified edit log is after the dropped edit log**
- * Because the table has been dropped, the olapTable in here is
null when the modified edit log is replayed.
- * So in this case, we will ignore the edit log of the modified
table after the table is dropped.
- * This could make the meta inconsistent, for example, an edit log
on a dropped table is ignored, but
- * this table is restored later, so there may be an inconsistent
situation between master and followers. We
- * log a warning here to debug when happens. This could happen to
other meta like DB.
+ * Because the table has been dropped, the olapTable in here is
null when the
+ * modified edit log is replayed.
+ * So in this case, we will ignore the edit log of the modified
table after the
+ * table is dropped.
+ * This could make the meta inconsistent, for example, an edit log
on a dropped
+ * table is ignored, but
+ * this table is restored later, so there may be an inconsistent
situation
+ * between master and followers. We
+ * log a warning here to debug when happens. This could happen to
other meta
+ * like DB.
*/
LOG.warn("[INCONSISTENT META] replay failed {}: {}", journal,
e.getMessage(), e);
} catch (Exception e) {
@@ -911,7 +917,8 @@ public class EditLog {
try {
journal.write(op, writable);
} catch (Throwable t) {
- // Throwable contains all Exception and Error, such as IOException
and OutOfMemoryError
+ // Throwable contains all Exception and Error, such as IOException
and
+ // OutOfMemoryError
LOG.error("Fatal Error : write stream Exception", t);
System.exit(-1);
}
@@ -1461,4 +1468,8 @@ public class EditLog {
public void logDatasourceLog(short id, CatalogLog log) {
logEdit(id, log);
}
+
+ public Journal getJournal() {
+ return this.journal;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
index bcad0012c4..1dd498beb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java
@@ -1,24 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
+// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
+// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
+// with the License. You may obtain a copy of the License at
//
-// http://www.apache.org/licenses/LICENSE-2.0
+// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
+// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.system;
+import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.ha.BDBHA;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.system.HeartbeatResponse.HbStatus;
@@ -42,8 +44,7 @@ public class Frontend implements Writable {
private boolean isAlive = false;
- public Frontend() {
- }
+ public Frontend() {}
public Frontend(FrontendNodeType role, String nodeName, String host, int
editLogPort) {
this.role = role;
@@ -97,14 +98,18 @@ public class Frontend implements Writable {
}
/**
- * handle Frontend's heartbeat response.
- * Because the replayed journal id is very likely to be changed at each
heartbeat response,
- * so we simple return true if the heartbeat status is OK.
- * But if heartbeat status is BAD, only return true if it is the first
time to transfer from alive to dead.
+ * handle Frontend's heartbeat response. Because the replayed journal id
is very likely to be
+ * changed at each heartbeat response, so we simple return true if the
heartbeat status is OK.
+ * But if heartbeat status is BAD, only return true if it is the first
time to transfer from
+ * alive to dead.
*/
- public boolean handleHbResponse(FrontendHbResponse hbResponse) {
+ public boolean handleHbResponse(FrontendHbResponse hbResponse, boolean
isReplay) {
boolean isChanged = false;
if (hbResponse.getStatus() == HbStatus.OK) {
+ if (!isAlive && !isReplay) {
+ BDBHA bdbha = (BDBHA)
Catalog.getCurrentCatalog().getHaProtocol();
+ bdbha.removeUnReadyElectableNode(nodeName,
Catalog.getCurrentCatalog().getFollowerCount());
+ }
isAlive = true;
version = hbResponse.getVersion();
queryPort = hbResponse.getQueryPort();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 855e95c0f1..395aba00c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -157,7 +157,7 @@ public class HeartbeatMgr extends MasterDaemon {
FrontendHbResponse hbResponse = (FrontendHbResponse) response;
Frontend fe =
Catalog.getCurrentCatalog().getFeByName(hbResponse.getName());
if (fe != null) {
- return fe.handleHbResponse(hbResponse);
+ return fe.handleHbResponse(hbResponse, isReplay);
}
break;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]