This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d25f5ed  [Metric] Add more metrics for FE checkpoint operation (#6522)
d25f5ed is described below

commit d25f5ed911b718cac239774d798771d3694283d6
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Sep 7 11:52:55 2021 +0800

    [Metric] Add more metrics for FE checkpoint operation (#6522)
    
    Counter of image write success or failure:
    image_write{"type" = "success"}
    image_write{"type" = "failed"}
    
    Counter of image push to other FE nodes success or failure:
    image_push{"type" = "success"}
    image_push{"type" = "failed"}
    
    Counter of old image clean success or failure:
    image_clean{"type" = "success"}
    image_clean{"type" = "failed"}
    
    Counter of old edit log clean success or failure:
    edit_log_clean{"type" = "success"}
    edit_log_clean{"type" = "failed"}
---
 .../apache/doris/common/CheckpointException.java   |  31 +++++
 .../java/org/apache/doris/master/Checkpoint.java   | 141 +++++++++++++--------
 .../java/org/apache/doris/metric/MetricRepo.java   |  47 ++++++-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   1 +
 4 files changed, 157 insertions(+), 63 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/CheckpointException.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/CheckpointException.java
new file mode 100644
index 0000000..38598ad
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/CheckpointException.java
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+/**
+ * Exception for checkpoint thread
+ */
+public class CheckpointException extends Exception {
+    public CheckpointException(String msg) {
+        super(msg);
+    }
+
+    public CheckpointException(String msg, Throwable e) {
+        super(msg, e);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
index 65a29c1..6a90dab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java
@@ -18,6 +18,7 @@
 package org.apache.doris.master;
 
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.CheckpointException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.util.MasterDaemon;
@@ -78,21 +79,27 @@ public class Checkpoint extends MasterDaemon {
             imageVersion = storage.getImageSeq();
             // get max finalized journal id
             checkPointVersion = editLog.getFinalizedJournalId();
-            LOG.info("checkpoint imageVersion {}, checkPointVersion {}", 
imageVersion, checkPointVersion);
+            LOG.info("last checkpoint journal id: {}, current finalized 
journal id: {}", imageVersion, checkPointVersion);
             if (imageVersion >= checkPointVersion) {
                 return;
             }
         } catch (IOException e) {
             LOG.error("Does not get storage info", e);
+            if (MetricRepo.isInit) {
+                MetricRepo.COUNTER_IMAGE_WRITE_FAILED.increase(1L);
+            }
             return;
         }
-        
+
         if (!checkMemoryEnoughToDoCheckpoint()) {
+            if (MetricRepo.isInit) {
+                MetricRepo.COUNTER_IMAGE_WRITE_FAILED.increase(1L);
+            }
             return;
         }
-       
-        long replayedJournalId = -1;
+
         // generate new image file
+        long replayedJournalId = -1;
         LOG.info("begin to generate new image: image.{}", checkPointVersion);
         catalog = Catalog.getCurrentCatalog();
         catalog.setEditLog(editLog);
@@ -100,21 +107,22 @@ public class Checkpoint extends MasterDaemon {
             catalog.loadImage(imageDir);
             catalog.replayJournal(checkPointVersion);
             if (catalog.getReplayedJournalId() != checkPointVersion) {
-                LOG.error("checkpoint version should be {}, actual replayed 
journal id is {}",
-                          checkPointVersion, catalog.getReplayedJournalId());
-                return;
+                throw new CheckpointException(String.format("checkpoint 
version should be %d, actual replayed journal id is %d",
+                        checkPointVersion, catalog.getReplayedJournalId()));
             }
             catalog.fixBugAfterMetadataReplayed(false);
-
             catalog.saveImage();
             replayedJournalId = catalog.getReplayedJournalId();
             if (MetricRepo.isInit) {
-                MetricRepo.COUNTER_IMAGE_WRITE.increase(1L);
+                MetricRepo.COUNTER_IMAGE_WRITE_SUCCESS.increase(1L);
             }
             LOG.info("checkpoint finished save image.{}", replayedJournalId);
         } catch (Exception e) {
             e.printStackTrace();
             LOG.error("Exception when generate new image file", e);
+            if (MetricRepo.isInit) {
+                MetricRepo.COUNTER_IMAGE_WRITE_FAILED.increase(1L);
+            }
             return;
         } finally {
             // destroy checkpoint catalog, reclaim memory
@@ -148,70 +156,91 @@ public class Checkpoint extends MasterDaemon {
                     LOG.error("Exception when pushing image file. url = {}", 
url, e);
                 }
             }
-            
+
             LOG.info("push image.{} to other nodes. totally {} nodes, push 
succeed {} nodes",
-                     replayedJournalId, otherNodesCount, successPushed);
+                    replayedJournalId, otherNodesCount, successPushed);
         }
-        
+        if (successPushed == otherNodesCount) {
+            if (MetricRepo.isInit) {
+                MetricRepo.COUNTER_IMAGE_PUSH_SUCCESS.increase(1L);
+            }
+        } else {
+            if (MetricRepo.isInit) {
+                MetricRepo.COUNTER_IMAGE_PUSH_FAILED.increase(1L);
+            }
+        }
+
         // Delete old journals
+        // only do this when the new image succeed in pushing to other nodes
         if (successPushed == otherNodesCount) {
-            long minOtherNodesJournalId = Long.MAX_VALUE;
-            long deleteVersion = checkPointVersion;
-            if (successPushed > 0) {
-                for (Frontend fe : allFrontends) {
-                    String host = fe.getHost();
-                    if 
(host.equals(Catalog.getServingCatalog().getMasterIp())) {
-                        // skip master itself
-                        continue;
-                    }
-                    int port = Config.http_port;
-                    URL idURL;
-                    HttpURLConnection conn = null;
-                    try {
-                        /*
-                         * get current replayed journal id of each non-master 
nodes.
-                         * when we delete bdb database, we cannot delete db 
newer than
-                         * any non-master node's current replayed journal id. 
otherwise,
-                         * this lagging node can never get the deleted journal.
-                         */
-                        idURL = new URL("http://"; + host + ":" + port + 
"/journal_id");
-                        conn = (HttpURLConnection) idURL.openConnection();
-                        conn.setConnectTimeout(CONNECT_TIMEOUT_SECOND * 1000);
-                        conn.setReadTimeout(READ_TIMEOUT_SECOND * 1000);
-                        String idString = conn.getHeaderField("id");
-                        long id = Long.parseLong(idString);
-                        if (minOtherNodesJournalId > id) {
-                            minOtherNodesJournalId = id;
+            try {
+                long minOtherNodesJournalId = Long.MAX_VALUE;
+                long deleteVersion = checkPointVersion;
+                if (successPushed > 0) {
+                    for (Frontend fe : allFrontends) {
+                        String host = fe.getHost();
+                        if 
(host.equals(Catalog.getServingCatalog().getMasterIp())) {
+                            // skip master itself
+                            continue;
                         }
-                    } catch (IOException e) {
-                        LOG.error("Exception when getting current replayed 
journal id. host={}, port={}",
-                                host, port, e);
-                        minOtherNodesJournalId = 0;
-                        break;
-                    } finally {
-                        if (conn != null) {
-                            conn.disconnect();
+                        int port = Config.http_port;
+                        URL idURL;
+                        HttpURLConnection conn = null;
+                        try {
+                            /*
+                             * get current replayed journal id of each 
non-master nodes.
+                             * when we delete bdb database, we cannot delete 
db newer than
+                             * any non-master node's current replayed journal 
id. otherwise,
+                             * this lagging node can never get the deleted 
journal.
+                             */
+                            idURL = new URL("http://"; + host + ":" + port + 
"/journal_id");
+                            conn = (HttpURLConnection) idURL.openConnection();
+                            conn.setConnectTimeout(CONNECT_TIMEOUT_SECOND * 
1000);
+                            conn.setReadTimeout(READ_TIMEOUT_SECOND * 1000);
+                            String idString = conn.getHeaderField("id");
+                            long id = Long.parseLong(idString);
+                            if (minOtherNodesJournalId > id) {
+                                minOtherNodesJournalId = id;
+                            }
+                        } catch (IOException e) {
+                            throw new 
CheckpointException(String.format("Exception when getting current replayed 
journal id. host=%s, port=%d",
+                                    host, port), e);
+                        } finally {
+                            if (conn != null) {
+                                conn.disconnect();
+                            }
                         }
                     }
+                    deleteVersion = Math.min(minOtherNodesJournalId, 
checkPointVersion);
+                }
+                
+                editLog.deleteJournals(deleteVersion + 1);
+                if (MetricRepo.isInit) {
+                    MetricRepo.COUNTER_EDIT_LOG_CLEAN_SUCCESS.increase(1L);
+                }
+                LOG.info("journals <= {} are deleted. image version {}, other 
nodes min version {}",
+                        deleteVersion, checkPointVersion, 
minOtherNodesJournalId);
+            } catch (Throwable e) {
+                LOG.error("failed to delete old edit log", e);
+                if (MetricRepo.isInit) {
+                    MetricRepo.COUNTER_EDIT_LOG_CLEAN_FAILED.increase(1L);
                 }
-                deleteVersion = Math.min(minOtherNodesJournalId, 
checkPointVersion);
-            }
-            editLog.deleteJournals(deleteVersion + 1);
-            if (MetricRepo.isInit) {
-                MetricRepo.COUNTER_IMAGE_PUSH.increase(1L);
             }
-            LOG.info("journals <= {} are deleted. image version {}, other 
nodes min version {}", 
-                     deleteVersion, checkPointVersion, minOtherNodesJournalId);
         }
-        
+
         // Delete old image files
         MetaCleaner cleaner = new MetaCleaner(Config.meta_dir + "/image");
         try {
             cleaner.clean();
+            if (MetricRepo.isInit) {
+                MetricRepo.COUNTER_IMAGE_CLEAN_SUCCESS.increase(1L);
+            }
         } catch (IOException e) {
             LOG.error("Master delete old image file fail.", e);
+            if (MetricRepo.isInit) {
+                MetricRepo.COUNTER_IMAGE_CLEAN_FAILED.increase(1L);
+            }
         }
-    
     }
     
     /*
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 968a024..f72c5eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -83,8 +83,15 @@ public final class MetricRepo {
     public static LongCounterMetric COUNTER_EDIT_LOG_WRITE;
     public static LongCounterMetric COUNTER_EDIT_LOG_READ;
     public static LongCounterMetric COUNTER_EDIT_LOG_SIZE_BYTES;
-    public static LongCounterMetric COUNTER_IMAGE_WRITE;
-    public static LongCounterMetric COUNTER_IMAGE_PUSH;
+    public static LongCounterMetric COUNTER_IMAGE_WRITE_SUCCESS;
+    public static LongCounterMetric COUNTER_IMAGE_WRITE_FAILED;
+    public static LongCounterMetric COUNTER_IMAGE_PUSH_SUCCESS;
+    public static LongCounterMetric COUNTER_IMAGE_PUSH_FAILED;
+    public static LongCounterMetric COUNTER_IMAGE_CLEAN_SUCCESS;
+    public static LongCounterMetric COUNTER_IMAGE_CLEAN_FAILED;
+    public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_SUCCESS;
+    public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_FAILED;
+
     public static LongCounterMetric COUNTER_TXN_REJECT;
     public static LongCounterMetric COUNTER_TXN_BEGIN;
     public static LongCounterMetric COUNTER_TXN_FAILED;
@@ -279,11 +286,37 @@ public final class MetricRepo {
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_READ);
         COUNTER_EDIT_LOG_SIZE_BYTES = new 
LongCounterMetric("edit_log_size_bytes", MetricUnit.BYTES, "size of edit log");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_SIZE_BYTES);
-        COUNTER_IMAGE_WRITE = new LongCounterMetric("image_write", 
MetricUnit.OPERATIONS, "counter of image generated");
-        PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_WRITE);
-        COUNTER_IMAGE_PUSH = new LongCounterMetric("image_push", 
MetricUnit.OPERATIONS,
-                "counter of image succeeded in pushing to other frontends");
-        PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH);
+
+        // image generate
+        COUNTER_IMAGE_WRITE_SUCCESS = new LongCounterMetric("image_write", 
MetricUnit.OPERATIONS, "counter of image succeed in write");
+        COUNTER_IMAGE_WRITE_SUCCESS.addLabel(new MetricLabel("type", 
"success"));
+        PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_WRITE_SUCCESS);
+        COUNTER_IMAGE_WRITE_FAILED = new LongCounterMetric("image_write", 
MetricUnit.OPERATIONS, "counter of image failed to write");
+        COUNTER_IMAGE_WRITE_FAILED.addLabel(new MetricLabel("type", "failed"));
+        PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_WRITE_FAILED);
+
+        COUNTER_IMAGE_PUSH_SUCCESS = new LongCounterMetric("image_push", 
MetricUnit.OPERATIONS, "counter of image succeeded in pushing to other 
frontends");
+        COUNTER_IMAGE_PUSH_SUCCESS.addLabel(new MetricLabel("type", 
"success"));
+        PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH_SUCCESS);
+        COUNTER_IMAGE_PUSH_FAILED = new LongCounterMetric("image_push", 
MetricUnit.OPERATIONS, "counter of image failed to other frontends");
+        COUNTER_IMAGE_PUSH_FAILED.addLabel(new MetricLabel("type", "failed"));
+        PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH_FAILED);
+
+        // image clean
+        COUNTER_IMAGE_CLEAN_SUCCESS = new LongCounterMetric("image_clean", 
MetricUnit.OPERATIONS, "counter of image succeeded in cleaning");
+        COUNTER_IMAGE_CLEAN_SUCCESS.addLabel(new MetricLabel("type", 
"success"));
+        PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_CLEAN_SUCCESS);
+        COUNTER_IMAGE_CLEAN_FAILED = new LongCounterMetric("image_clean", 
MetricUnit.OPERATIONS, "counter of image failed to clean");
+        COUNTER_IMAGE_CLEAN_FAILED.addLabel(new MetricLabel("type", "failed"));
+        PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_CLEAN_FAILED);
+
+        // edit log clean
+        COUNTER_EDIT_LOG_CLEAN_SUCCESS = new 
LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS, "counter of edit log 
succeed in cleaning");
+        COUNTER_EDIT_LOG_CLEAN_SUCCESS.addLabel(new MetricLabel("type", 
"success"));
+        PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_CLEAN_SUCCESS);
+        COUNTER_EDIT_LOG_CLEAN_FAILED = new 
LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS, "counter of edit log 
failed to clean");
+        COUNTER_EDIT_LOG_CLEAN_FAILED.addLabel(new MetricLabel("type", 
"failed"));
+        PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_EDIT_LOG_CLEAN_FAILED);
 
         COUNTER_TXN_REJECT = new LongCounterMetric("txn_reject", 
MetricUnit.REQUESTS, "counter of rejected transactions");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_REJECT);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index ee2ca13..ec3a4c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1447,6 +1447,7 @@ public class StmtExecutor implements ProfileWriter {
             context.setState(e.getQueryState());
         } catch (UserException e) {
             // Return message to info client what happened.
+            LOG.debug("DDL statement({}) process failed.", 
originStmt.originStmt, e);
             context.getState().setError(e.getMessage());
         } catch (Exception e) {
             // Maybe our bug

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to