This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d25f5ed [Metric] Add more metrics for FE checkpoint operation (#6522)
d25f5ed is described below
commit d25f5ed911b718cac239774d798771d3694283d6
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Sep 7 11:52:55 2021 +0800
[Metric] Add more metrics for FE checkpoint operation (#6522)
Counter of image write success or failure:
image_write{"type" = "success"}
image_write{"type" = "failed"}
Counter of image push to other FE nodes success or failure:
image_push{"type" = "success"}
image_push{"type" = "failed"}
Counter of old image clean success or failure:
image_clean{"type" = "success"}
image_clean{"type" = "failed"}
Counter of old edit log clean success or failure:
edit_log_clean{"type" = "success"}
edit_log_clean{"type" = "failed"}
---
.../apache/doris/common/CheckpointException.java | 31 +++++
.../java/org/apache/doris/master/Checkpoint.java | 141 +++++++++++++--------
.../java/org/apache/doris/metric/MetricRepo.java | 47 ++++++-
.../java/org/apache/doris/qe/StmtExecutor.java | 1 +
4 files changed, 157 insertions(+), 63 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/CheckpointException.java
b/fe/fe-core/src/main/java/org/apache/doris/common/CheckpointException.java
new file mode 100644
index 0000000..38598ad
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/CheckpointException.java
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+/**
+ * Exception for checkpoint thread
+ */
+public class CheckpointException extends Exception {
+ public CheckpointException(String msg) {
+ super(msg);
+ }
+
+ public CheckpointException(String msg, Throwable e) {
+ super(msg, e);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
index 65a29c1..6a90dab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
@@ -18,6 +18,7 @@
package org.apache.doris.master;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.CheckpointException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.MasterDaemon;
@@ -78,21 +79,27 @@ public class Checkpoint extends MasterDaemon {
imageVersion = storage.getImageSeq();
// get max finalized journal id
checkPointVersion = editLog.getFinalizedJournalId();
- LOG.info("checkpoint imageVersion {}, checkPointVersion {}",
imageVersion, checkPointVersion);
+ LOG.info("last checkpoint journal id: {}, current finalized
journal id: {}", imageVersion, checkPointVersion);
if (imageVersion >= checkPointVersion) {
return;
}
} catch (IOException e) {
LOG.error("Does not get storage info", e);
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_IMAGE_WRITE_FAILED.increase(1L);
+ }
return;
}
-
+
if (!checkMemoryEnoughToDoCheckpoint()) {
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_IMAGE_WRITE_FAILED.increase(1L);
+ }
return;
}
-
- long replayedJournalId = -1;
+
// generate new image file
+ long replayedJournalId = -1;
LOG.info("begin to generate new image: image.{}", checkPointVersion);
catalog = Catalog.getCurrentCatalog();
catalog.setEditLog(editLog);
@@ -100,21 +107,22 @@ public class Checkpoint extends MasterDaemon {
catalog.loadImage(imageDir);
catalog.replayJournal(checkPointVersion);
if (catalog.getReplayedJournalId() != checkPointVersion) {
- LOG.error("checkpoint version should be {}, actual replayed
journal id is {}",
- checkPointVersion, catalog.getReplayedJournalId());
- return;
+ throw new CheckpointException(String.format("checkpoint
version should be %d, actual replayed journal id is %d",
+ checkPointVersion, catalog.getReplayedJournalId()));
}
catalog.fixBugAfterMetadataReplayed(false);
-
catalog.saveImage();
replayedJournalId = catalog.getReplayedJournalId();
if (MetricRepo.isInit) {
- MetricRepo.COUNTER_IMAGE_WRITE.increase(1L);
+ MetricRepo.COUNTER_IMAGE_WRITE_SUCCESS.increase(1L);
}
LOG.info("checkpoint finished save image.{}", replayedJournalId);
} catch (Exception e) {
e.printStackTrace();
LOG.error("Exception when generate new image file", e);
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_IMAGE_WRITE_FAILED.increase(1L);
+ }
return;
} finally {
// destroy checkpoint catalog, reclaim memory
@@ -148,70 +156,91 @@ public class Checkpoint extends MasterDaemon {
LOG.error("Exception when pushing image file. url = {}",
url, e);
}
}
-
+
LOG.info("push image.{} to other nodes. totally {} nodes, push
succeed {} nodes",
- replayedJournalId, otherNodesCount, successPushed);
+ replayedJournalId, otherNodesCount, successPushed);
}
-
+ if (successPushed == otherNodesCount) {
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_IMAGE_PUSH_SUCCESS.increase(1L);
+ }
+ } else {
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_IMAGE_PUSH_FAILED.increase(1L);
+ }
+ }
+
// Delete old journals
+ // only do this when the new image succeed in pushing to other nodes
if (successPushed == otherNodesCount) {
- long minOtherNodesJournalId = Long.MAX_VALUE;
- long deleteVersion = checkPointVersion;
- if (successPushed > 0) {
- for (Frontend fe : allFrontends) {
- String host = fe.getHost();
- if
(host.equals(Catalog.getServingCatalog().getMasterIp())) {
- // skip master itself
- continue;
- }
- int port = Config.http_port;
- URL idURL;
- HttpURLConnection conn = null;
- try {
- /*
- * get current replayed journal id of each non-master
nodes.
- * when we delete bdb database, we cannot delete db
newer than
- * any non-master node's current replayed journal id.
otherwise,
- * this lagging node can never get the deleted journal.
- */
- idURL = new URL("http://" + host + ":" + port +
"/journal_id");
- conn = (HttpURLConnection) idURL.openConnection();
- conn.setConnectTimeout(CONNECT_TIMEOUT_SECOND * 1000);
- conn.setReadTimeout(READ_TIMEOUT_SECOND * 1000);
- String idString = conn.getHeaderField("id");
- long id = Long.parseLong(idString);
- if (minOtherNodesJournalId > id) {
- minOtherNodesJournalId = id;
+ try {
+ long minOtherNodesJournalId = Long.MAX_VALUE;
+ long deleteVersion = checkPointVersion;
+ if (successPushed > 0) {
+ for (Frontend fe : allFrontends) {
+ String host = fe.getHost();
+ if
(host.equals(Catalog.getServingCatalog().getMasterIp())) {
+ // skip master itself
+ continue;
}
- } catch (IOException e) {
- LOG.error("Exception when getting current replayed
journal id. host={}, port={}",
- host, port, e);
- minOtherNodesJournalId = 0;
- break;
- } finally {
- if (conn != null) {
- conn.disconnect();
+ int port = Config.http_port;
+ URL idURL;
+ HttpURLConnection conn = null;
+ try {
+ /*
+ * get current replayed journal id of each
non-master nodes.
+ * when we delete bdb database, we cannot delete
db newer than
+ * any non-master node's current replayed journal
id. otherwise,
+ * this lagging node can never get the deleted
journal.
+ */
+ idURL = new URL("http://" + host + ":" + port +
"/journal_id");
+ conn = (HttpURLConnection) idURL.openConnection();
+ conn.setConnectTimeout(CONNECT_TIMEOUT_SECOND *
1000);
+ conn.setReadTimeout(READ_TIMEOUT_SECOND * 1000);
+ String idString = conn.getHeaderField("id");
+ long id = Long.parseLong(idString);
+ if (minOtherNodesJournalId > id) {
+ minOtherNodesJournalId = id;
+ }
+ } catch (IOException e) {
+ throw new
CheckpointException(String.format("Exception when getting current replayed
journal id. host=%s, port=%d",
+ host, port), e);
+ } finally {
+ if (conn != null) {
+ conn.disconnect();
+ }
}
}
+ deleteVersion = Math.min(minOtherNodesJournalId,
checkPointVersion);
+ }
+
+ editLog.deleteJournals(deleteVersion + 1);
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_EDIT_LOG_CLEAN_SUCCESS.increase(1L);
+ }
+ LOG.info("journals <= {} are deleted. image version {}, other
nodes min version {}",
+ deleteVersion, checkPointVersion,
minOtherNodesJournalId);
+ } catch (Throwable e) {
+ LOG.error("failed to delete old edit log", e);
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_EDIT_LOG_CLEAN_FAILED.increase(1L);
}
- deleteVersion = Math.min(minOtherNodesJournalId,
checkPointVersion);
- }
- editLog.deleteJournals(deleteVersion + 1);
- if (MetricRepo.isInit) {
- MetricRepo.COUNTER_IMAGE_PUSH.increase(1L);
}
- LOG.info("journals <= {} are deleted. image version {}, other
nodes min version {}",
- deleteVersion, checkPointVersion, minOtherNodesJournalId);
}
-
+
// Delete old image files
MetaCleaner cleaner = new MetaCleaner(Config.meta_dir + "/image");
try {
cleaner.clean();
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_IMAGE_CLEAN_SUCCESS.increase(1L);
+ }
} catch (IOException e) {
LOG.error("Master delete old image file fail.", e);
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_IMAGE_CLEAN_FAILED.increase(1L);
+ }
}
-
}
/*
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 968a024..f72c5eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -83,8 +83,15 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_EDIT_LOG_WRITE;
public static LongCounterMetric COUNTER_EDIT_LOG_READ;
public static LongCounterMetric COUNTER_EDIT_LOG_SIZE_BYTES;
- public static LongCounterMetric COUNTER_IMAGE_WRITE;
- public static LongCounterMetric COUNTER_IMAGE_PUSH;
+ public static LongCounterMetric COUNTER_IMAGE_WRITE_SUCCESS;
+ public static LongCounterMetric COUNTER_IMAGE_WRITE_FAILED;
+ public static LongCounterMetric COUNTER_IMAGE_PUSH_SUCCESS;
+ public static LongCounterMetric COUNTER_IMAGE_PUSH_FAILED;
+ public static LongCounterMetric COUNTER_IMAGE_CLEAN_SUCCESS;
+ public static LongCounterMetric COUNTER_IMAGE_CLEAN_FAILED;
+ public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_SUCCESS;
+ public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_FAILED;
+
public static LongCounterMetric COUNTER_TXN_REJECT;
public static LongCounterMetric COUNTER_TXN_BEGIN;
public static LongCounterMetric COUNTER_TXN_FAILED;
@@ -279,11 +286,37 @@ public final class MetricRepo {
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_READ);
COUNTER_EDIT_LOG_SIZE_BYTES = new
LongCounterMetric("edit_log_size_bytes", MetricUnit.BYTES, "size of edit log");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_SIZE_BYTES);
- COUNTER_IMAGE_WRITE = new LongCounterMetric("image_write",
MetricUnit.OPERATIONS, "counter of image generated");
- PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_WRITE);
- COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push",
MetricUnit.OPERATIONS,
- "counter of image succeeded in pushing to other frontends");
- PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH);
+
+ // image generate
+ COUNTER_IMAGE_WRITE_SUCCESS = new LongCounterMetric("image_write",
MetricUnit.OPERATIONS, "counter of image succeed in write");
+ COUNTER_IMAGE_WRITE_SUCCESS.addLabel(new MetricLabel("type",
"success"));
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_WRITE_SUCCESS);
+ COUNTER_IMAGE_WRITE_FAILED = new LongCounterMetric("image_write",
MetricUnit.OPERATIONS, "counter of image failed to write");
+ COUNTER_IMAGE_WRITE_FAILED.addLabel(new MetricLabel("type", "failed"));
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_WRITE_FAILED);
+
+ COUNTER_IMAGE_PUSH_SUCCESS = new LongCounterMetric("image_push",
MetricUnit.OPERATIONS, "counter of image succeeded in pushing to other
frontends");
+ COUNTER_IMAGE_PUSH_SUCCESS.addLabel(new MetricLabel("type",
"success"));
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH_SUCCESS);
+ COUNTER_IMAGE_PUSH_FAILED = new LongCounterMetric("image_push",
MetricUnit.OPERATIONS, "counter of image failed to other frontends");
+ COUNTER_IMAGE_PUSH_FAILED.addLabel(new MetricLabel("type", "failed"));
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH_FAILED);
+
+ // image clean
+ COUNTER_IMAGE_CLEAN_SUCCESS = new LongCounterMetric("image_clean",
MetricUnit.OPERATIONS, "counter of image succeeded in cleaning");
+ COUNTER_IMAGE_CLEAN_SUCCESS.addLabel(new MetricLabel("type",
"success"));
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_CLEAN_SUCCESS);
+ COUNTER_IMAGE_CLEAN_FAILED = new LongCounterMetric("image_clean",
MetricUnit.OPERATIONS, "counter of image failed to clean");
+ COUNTER_IMAGE_CLEAN_FAILED.addLabel(new MetricLabel("type", "failed"));
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_CLEAN_FAILED);
+
+ // edit log clean
+ COUNTER_EDIT_LOG_CLEAN_SUCCESS = new
LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS, "counter of edit log
succeed in cleaning");
+ COUNTER_EDIT_LOG_CLEAN_SUCCESS.addLabel(new MetricLabel("type",
"success"));
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_CLEAN_SUCCESS);
+ COUNTER_EDIT_LOG_CLEAN_FAILED = new
LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS, "counter of edit log
failed to clean");
+ COUNTER_EDIT_LOG_CLEAN_FAILED.addLabel(new MetricLabel("type",
"failed"));
+ PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_CLEAN_FAILED);
COUNTER_TXN_REJECT = new LongCounterMetric("txn_reject",
MetricUnit.REQUESTS, "counter of rejected transactions");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_REJECT);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index ee2ca13..ec3a4c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1447,6 +1447,7 @@ public class StmtExecutor implements ProfileWriter {
context.setState(e.getQueryState());
} catch (UserException e) {
// Return message to info client what happened.
+ LOG.debug("DDL statement({}) process failed.",
originStmt.originStmt, e);
context.getState().setError(e.getMessage());
} catch (Exception e) {
// Maybe our bug
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]