This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 903ff320210 [opt](fe) exit FE when transfer to (non)master failed
(#34809) (#35158)
903ff320210 is described below
commit 903ff3202103093eadcf0f0672843eb6719fcb94
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue May 21 22:31:47 2024 +0800
[opt](fe) exit FE when transfer to (non)master failed (#34809) (#35158)
bp #34809
---
be/src/vec/exec/scan/vfile_scanner.cpp | 3 +-
.../main/java/org/apache/doris/catalog/Env.java | 275 +++++++++++----------
.../java/org/apache/doris/service/FeServer.java | 2 +-
3 files changed, 148 insertions(+), 132 deletions(-)
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 398621b08ea..fca2c301c64 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -958,8 +958,7 @@ Status VFileScanner::_get_next_reader() {
COUNTER_UPDATE(_empty_file_counter, 1);
continue;
} else if (!init_status.ok()) {
- return Status::InternalError("failed to init reader for file {},
err: {}", range.path,
- init_status.to_string());
+ return Status::InternalError("failed to init reader, err: {}",
init_status.to_string());
}
_name_to_col_type.clear();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index d0223e626bc..5dffb93121c 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1444,137 +1444,147 @@ public class Env {
@SuppressWarnings({"checkstyle:WhitespaceAfter", "checkstyle:LineLength"})
private void transferToMaster() {
- // stop replayer
- if (replayer != null) {
- replayer.exit();
- try {
- replayer.join();
- } catch (InterruptedException e) {
- LOG.warn("got exception when stopping the replayer thread", e);
+ try {
+ // stop replayer
+ if (replayer != null) {
+ replayer.exit();
+ try {
+ replayer.join();
+ } catch (InterruptedException e) {
+ LOG.warn("got exception when stopping the replayer
thread", e);
+ }
+ replayer = null;
}
- replayer = null;
- }
- // set this after replay thread stopped. to avoid replay thread modify
them.
- isReady.set(false);
- canRead.set(false);
+ // set this after replay thread stopped. to avoid replay thread
modify them.
+ isReady.set(false);
+ canRead.set(false);
- toMasterProgress = "open editlog";
- editLog.open();
+ toMasterProgress = "open editlog";
+ editLog.open();
- if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
- if (!haProtocol.fencing()) {
- LOG.error("fencing failed. will exit.");
- System.exit(-1);
+ if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
+ if (!haProtocol.fencing()) {
+ LOG.error("fencing failed. will exit.");
+ System.exit(-1);
+ }
}
- }
- toMasterProgress = "replay journal";
- long replayStartTime = System.currentTimeMillis();
- // replay journals. -1 means replay all the journals larger than
current journal id.
- replayJournal(-1);
- long replayEndTime = System.currentTimeMillis();
- LOG.info("finish replay in " + (replayEndTime - replayStartTime) + "
msec");
+ toMasterProgress = "replay journal";
+ long replayStartTime = System.currentTimeMillis();
+ // replay journals. -1 means replay all the journals larger than
current journal id.
+ replayJournal(-1);
+ long replayEndTime = System.currentTimeMillis();
+ LOG.info("finish replay in " + (replayEndTime - replayStartTime) +
" msec");
- checkCurrentNodeExist();
+ checkCurrentNodeExist();
- checkBeExecVersion();
+ checkBeExecVersion();
- toMasterProgress = "roll editlog";
- editLog.rollEditLog();
+ toMasterProgress = "roll editlog";
+ editLog.rollEditLog();
- // Log meta_version
- long journalVersion = MetaContext.get().getMetaVersion();
- if (journalVersion < FeConstants.meta_version) {
- toMasterProgress = "log meta version";
- editLog.logMetaVersion(FeConstants.meta_version);
- MetaContext.get().setMetaVersion(FeConstants.meta_version);
- }
+ // Log meta_version
+ long journalVersion = MetaContext.get().getMetaVersion();
+ if (journalVersion < FeConstants.meta_version) {
+ toMasterProgress = "log meta version";
+ editLog.logMetaVersion(FeConstants.meta_version);
+ MetaContext.get().setMetaVersion(FeConstants.meta_version);
+ }
- // Log the first frontend
- if (isFirstTimeStartUp) {
- // if isFirstTimeStartUp is true, frontends must contains this
Node.
- Frontend self = frontends.get(nodeName);
- Preconditions.checkNotNull(self);
- // OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even
if canWrite is false
- editLog.logAddFirstFrontend(self);
+ // Log the first frontend
+ if (isFirstTimeStartUp) {
+ // if isFirstTimeStartUp is true, frontends must contains this
Node.
+ Frontend self = frontends.get(nodeName);
+ Preconditions.checkNotNull(self);
+ // OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE
even if canWrite is false
+ editLog.logAddFirstFrontend(self);
- initLowerCaseTableNames();
- // Set initial root password if master FE first time launch.
- auth.setInitialRootPassword(Config.initial_root_password);
- } else {
- if (journalVersion <= FeMetaVersion.VERSION_114) {
- // if journal version is less than 114, which means it is
upgraded from version before 2.0.
- // When upgrading from 1.2 to 2.0, we need to make sure that
the parallelism of query remain unchanged
- // when switch to pipeline engine, otherwise it may impact the
load of entire cluster
- // because the default parallelism of pipeline engine is
higher than previous version.
- // so set parallel_pipeline_task_num to
parallel_fragment_exec_instance_num
- int newVal =
VariableMgr.newSessionVariable().parallelExecInstanceNum;
- VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
- String.valueOf(newVal));
-
- // similar reason as above, need to upgrade broadcast scale
factor during 1.2 to 2.x
- // if the default value has been upgraded
- double newBcFactorVal =
VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
- VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
- SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
- String.valueOf(newBcFactorVal));
-
- // similar reason as above, need to upgrade
enable_nereids_planner to true
- VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.ENABLE_NEREIDS_PLANNER,
- "true");
- }
- if (journalVersion <= FeMetaVersion.VERSION_123) {
- VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.ENABLE_NEREIDS_DML, "true");
- VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
-
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
- if (VariableMgr.newSessionVariable().nereidsTimeoutSecond ==
5) {
+ initLowerCaseTableNames();
+ // Set initial root password if master FE first time launch.
+ auth.setInitialRootPassword(Config.initial_root_password);
+ } else {
+ if (journalVersion <= FeMetaVersion.VERSION_114) {
+ // if journal version is less than 114, which means it is
upgraded from version before 2.0.
+ // When upgrading from 1.2 to 2.0,
+ // we need to make sure that the parallelism of query
remain unchanged
+ // when switch to pipeline engine, otherwise it may impact
the load of entire cluster
+ // because the default parallelism of pipeline engine is
higher than previous version.
+ // so set parallel_pipeline_task_num to
parallel_fragment_exec_instance_num
+ int newVal =
VariableMgr.newSessionVariable().parallelExecInstanceNum;
+ VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
+ SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
+ String.valueOf(newVal));
+
+ // similar reason as above, need to upgrade broadcast
scale factor during 1.2 to 2.x
+ // if the default value has been upgraded
+ double newBcFactorVal =
VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
+ VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
+ SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
+ String.valueOf(newBcFactorVal));
+
+ // similar reason as above, need to upgrade
enable_nereids_planner to true
+ VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.ENABLE_NEREIDS_PLANNER,
+ "true");
+ }
+ if (journalVersion <= FeMetaVersion.VERSION_123) {
+ VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.ENABLE_NEREIDS_DML,
+ "true");
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
- SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
+
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
+ if (VariableMgr.newSessionVariable().nereidsTimeoutSecond
== 5) {
+ VariableMgr.refreshDefaultSessionVariables("2.0 to
2.1",
+ SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
+ }
}
}
- }
- getPolicyMgr().createDefaultStoragePolicy();
+ getPolicyMgr().createDefaultStoragePolicy();
- // MUST set master ip before starting checkpoint thread.
- // because checkpoint thread need this info to select non-master FE to
push image
+ // MUST set master ip before starting checkpoint thread.
+ // because checkpoint thread need this info to select non-master
FE to push image
- toMasterProgress = "log master info";
- this.masterInfo = new
MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(),
- Config.http_port,
- Config.rpc_port);
- editLog.logMasterInfo(masterInfo);
- LOG.info("logMasterInfo:{}", masterInfo);
+ toMasterProgress = "log master info";
+ this.masterInfo = new
MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(),
+ Config.http_port,
+ Config.rpc_port);
+ editLog.logMasterInfo(masterInfo);
+ LOG.info("logMasterInfo:{}", masterInfo);
- // for master, the 'isReady' is set behind.
- // but we are sure that all metadata is replayed if we get here.
- // so no need to check 'isReady' flag in this method
- postProcessAfterMetadataReplayed(false);
+ // for master, the 'isReady' is set behind.
+ // but we are sure that all metadata is replayed if we get here.
+ // so no need to check 'isReady' flag in this method
+ postProcessAfterMetadataReplayed(false);
- insertOverwriteManager.allTaskFail();
+ insertOverwriteManager.allTaskFail();
- toMasterProgress = "start daemon threads";
+ toMasterProgress = "start daemon threads";
- // start all daemon threads that only running on MASTER FE
- startMasterOnlyDaemonThreads();
- // start other daemon threads that should running on all FE
- startNonMasterDaemonThreads();
+ // start all daemon threads that only running on MASTER FE
+ startMasterOnlyDaemonThreads();
+ // start other daemon threads that should running on all FE
+ startNonMasterDaemonThreads();
- MetricRepo.init();
+ MetricRepo.init();
- toMasterProgress = "finished";
- canRead.set(true);
- isReady.set(true);
- checkLowerCaseTableNames();
+ toMasterProgress = "finished";
+ canRead.set(true);
+ isReady.set(true);
+ checkLowerCaseTableNames();
- String msg = "master finished to replay journal, can write now.";
- Util.stdoutWithTime(msg);
- LOG.info(msg);
- // for master, there are some new thread pools need to register metric
- ThreadPoolManager.registerAllThreadPoolMetric();
- if (analysisManager != null) {
- analysisManager.getStatisticsCache().preHeat();
+ String msg = "master finished to replay journal, can write now.";
+ Util.stdoutWithTime(msg);
+ LOG.info(msg);
+ // for master, there are some new thread pools need to register
metric
+ ThreadPoolManager.registerAllThreadPoolMetric();
+ if (analysisManager != null) {
+ analysisManager.getStatisticsCache().preHeat();
+ }
+ } catch (Throwable e) {
+ // When failed to transfer to master, we need to exit the process.
+ // Otherwise, the process will be in an unknown state.
+ LOG.error("failed to transfer to master. progress: {}",
toMasterProgress, e);
+ System.exit(-1);
}
}
@@ -1722,36 +1732,43 @@ public class Env {
private void transferToNonMaster(FrontendNodeType newType) {
isReady.set(false);
- if (feType == FrontendNodeType.OBSERVER || feType ==
FrontendNodeType.FOLLOWER) {
- Preconditions.checkState(newType == FrontendNodeType.UNKNOWN);
- LOG.warn("{} to UNKNOWN, still offer read service", feType.name());
- // not set canRead here, leave canRead as what is was.
- // if meta out of date, canRead will be set to false in replayer
thread.
- metaReplayState.setTransferToUnknown();
- return;
- }
+ try {
+ if (feType == FrontendNodeType.OBSERVER || feType ==
FrontendNodeType.FOLLOWER) {
+ Preconditions.checkState(newType == FrontendNodeType.UNKNOWN);
+ LOG.warn("{} to UNKNOWN, still offer read service",
feType.name());
+ // not set canRead here, leave canRead as what is was.
+ // if meta out of date, canRead will be set to false in
replayer thread.
+ metaReplayState.setTransferToUnknown();
+ return;
+ }
- // transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
+ // transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
- if (replayer == null) {
- createReplayer();
- replayer.start();
- }
+ if (replayer == null) {
+ createReplayer();
+ replayer.start();
+ }
- // 'isReady' will be set to true in 'setCanRead()' method
- if (!postProcessAfterMetadataReplayed(true)) {
- // the state has changed, exit early.
- return;
- }
+ // 'isReady' will be set to true in 'setCanRead()' method
+ if (!postProcessAfterMetadataReplayed(true)) {
+ // the state has changed, exit early.
+ return;
+ }
- checkLowerCaseTableNames();
+ checkLowerCaseTableNames();
- startNonMasterDaemonThreads();
+ startNonMasterDaemonThreads();
- MetricRepo.init();
+ MetricRepo.init();
- if (analysisManager != null) {
- analysisManager.getStatisticsCache().preHeat();
+ if (analysisManager != null) {
+ analysisManager.getStatisticsCache().preHeat();
+ }
+ } catch (Throwable e) {
+ // When failed to transfer to non-master, we need to exit the
process.
+ // Otherwise, the process will be in an unknown state.
+ LOG.error("failed to transfer to non-master.", e);
+ System.exit(-1);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FeServer.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FeServer.java
index 6c61c9cdd48..542c993a438 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FeServer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FeServer.java
@@ -44,7 +44,7 @@ public class FeServer {
public void start() throws IOException {
FrontendServiceImpl service = new
FrontendServiceImpl(ExecuteEnv.getInstance());
- Logger feServiceLogger =
LogManager.getLogger(FrontendServiceImpl.class);
+ Logger feServiceLogger = LogManager.getLogger(FeServer.class);
FrontendService.Iface instance = (FrontendService.Iface)
Proxy.newProxyInstance(
FrontendServiceImpl.class.getClassLoader(),
FrontendServiceImpl.class.getInterfaces(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]