This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ba868c6  [Optimize] Optimize some tablet scheduling logic (#5926)
ba868c6 is described below

commit ba868c610f8067039f75efb539d8fa77ea86f1b5
Author: Mingyu Chen <[email protected]>
AuthorDate: Sun May 30 23:08:59 2021 +0800

    [Optimize] Optimize some tablet scheduling logic (#5926)
    
    1. The partitions set by the admin repair command are prioritized
       to ensure that the tablets of these partitions can be repaired as soon 
as possible.
    
    2. Add an FE metric "query_begin" to monitor the number of queries 
submitted to the Doris.
---
 be/src/exec/tablet_sink.cpp                        |   5 +-
 .../java/org/apache/doris/clone/TabletChecker.java | 235 +++++++++++++--------
 .../java/org/apache/doris/metric/MetricRepo.java   |   3 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |   1 +
 4 files changed, 159 insertions(+), 85 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 60b1706..991b7ed 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -183,8 +183,11 @@ Status NodeChannel::open_wait() {
                     }
                 } else {
                     std::stringstream ss;
+                                       // FIXME(cmy): There is a problem that 
when calling node_info, the node_info seems not initialized.
+                                       //             But I don't know why. so 
here I print node_info()->id instead of node_info()->host
+                                       //             to avoid BE crash. It 
needs further observation.
                     ss << name() << " add batch req success but status isn't 
ok, "
-                                 << print_load_info() << ", node=" << 
node_info()->host << ":"
+                                 << print_load_info() << ", backend id=" << 
node_info()->id << ":"
                                  << node_info()->brpc_port << ", errmsg=" << 
status.get_error_msg();
                     _cancel_with_msg(ss.str());
                 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
index ede26be..4e375c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
@@ -176,7 +176,7 @@ public class TabletChecker extends MasterDaemon {
                     pendingNum, runningNum, Config.max_scheduling_tablets);
             return;
         }
-        
+
         checkTablets();
 
         removePriosIfNecessary();
@@ -185,16 +185,68 @@ public class TabletChecker extends MasterDaemon {
         LOG.info(stat.incrementalBrief());
     }
 
+    private static class CheckerCounter {
+        public long totalTabletNum = 0;
+        public long unhealthyTabletNum = 0;
+        public long addToSchedulerTabletNum = 0;
+        public long tabletInScheduler = 0;
+        public long tabletNotReady = 0;
+    }
+
+    private enum LoopControlStatus {
+        CONTINUE,
+        BREAK_OUT
+    }
+
     private void checkTablets() {
         long start = System.currentTimeMillis();
-        long totalTabletNum = 0;
-        long unhealthyTabletNum = 0;
-        long addToSchedulerTabletNum = 0;
-        long tabletInScheduler = 0;
-        long tabletNotReady = 0;
+        CheckerCounter counter = new CheckerCounter();
 
+        // 1. Traverse partitions in "prios" first,
+        // To prevent the partitions in the "prios" from being unscheduled
+        // because the queue in the tablet scheduler is full
+        com.google.common.collect.Table<Long, Long, Set<PrioPart>> copiedPrios;
+        synchronized (prios) {
+            copiedPrios = HashBasedTable.create(prios);
+        }
+
+        OUT:
+        for (long dbId : copiedPrios.rowKeySet()) {
+            Database db = catalog.getDb(dbId);
+            if (db == null) {
+                continue;
+            }
+            List<Long> aliveBeIdsInCluster = 
infoService.getClusterBackendIds(db.getClusterName(), true);
+            Map<Long, Set<PrioPart>> tblPartMap = copiedPrios.row(dbId);
+            for (long tblId : tblPartMap.keySet()) {
+                OlapTable tbl = (OlapTable) db.getTable(tblId);
+                if (tbl == null) {
+                    continue;
+                }
+                tbl.readLock();
+                try {
+                    if (!tbl.needSchedule()) {
+                        continue;
+                    }
+                    for (Partition partition : tbl.getAllPartitions()) {
+                        LoopControlStatus st = handlePartitionTablet(db, tbl, 
partition, true,
+                                aliveBeIdsInCluster, start, counter);
+                        if (st == LoopControlStatus.BREAK_OUT) {
+                            break OUT;
+                        } else {
+                            continue;
+                        }
+                    }
+                } finally {
+                    tbl.readUnlock();
+                }
+            }
+        }
+
+        // 2. Traverse other partitions not in "prios"
         List<Long> dbIds = catalog.getDbIds();
-        OUT: for (Long dbId : dbIds) {
+        OUT:
+        for (Long dbId : dbIds) {
             Database db = catalog.getDb(dbId);
             if (db == null) {
                 continue;
@@ -214,79 +266,18 @@ public class TabletChecker extends MasterDaemon {
                         continue;
                     }
 
-                    OlapTable olapTbl = (OlapTable) table;
-                    for (Partition partition : olapTbl.getAllPartitions()) {
-                        if (partition.getState() != PartitionState.NORMAL) {
-                            // when alter job is in FINISHING state, partition 
state will be set to NORMAL,
-                            // and we can schedule the tablets in it.
+                    OlapTable tbl = (OlapTable) table;
+                    for (Partition partition : tbl.getAllPartitions()) {
+                        // skip partitions in prios, because it has been 
checked before.
+                        if (isInPrios(db.getId(), tbl.getId(), 
partition.getId())) {
                             continue;
                         }
-                        boolean isInPrios = isInPrios(dbId, table.getId(), 
partition.getId());
-                        boolean prioPartIsHealthy = true;
-                        /*
-                         * Tablet in SHADOW index can not be repaired of 
balanced
-                         */
-                        for (MaterializedIndex idx : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
-                            for (Tablet tablet : idx.getTablets()) {
-                                totalTabletNum++;
-
-                                if 
(tabletScheduler.containsTablet(tablet.getId())) {
-                                    tabletInScheduler++;
-                                    continue;
-                                }
-
-                                Pair<TabletStatus, TabletSchedCtx.Priority> 
statusWithPrio = tablet.getHealthStatusWithPriority(
-                                        infoService,
-                                        db.getClusterName(),
-                                        partition.getVisibleVersion(),
-                                        partition.getVisibleVersionHash(),
-                                        
olapTbl.getPartitionInfo().getReplicationNum(partition.getId()),
-                                        aliveBeIdsInCluster);
-
-                                if (statusWithPrio.first == 
TabletStatus.HEALTHY) {
-                                    // Only set last status check time when 
status is healthy.
-                                    tablet.setLastStatusCheckTime(start);
-                                    continue;
-                                } else if (isInPrios) {
-                                    statusWithPrio.second = 
TabletSchedCtx.Priority.VERY_HIGH;
-                                    prioPartIsHealthy = false;
-                                }
-
-                                unhealthyTabletNum++;
-
-                                if 
(!tablet.readyToBeRepaired(statusWithPrio.second)) {
-                                    tabletNotReady++;
-                                    continue;
-                                }
-
-                                TabletSchedCtx tabletCtx = new TabletSchedCtx(
-                                        TabletSchedCtx.Type.REPAIR,
-                                        db.getClusterName(),
-                                        db.getId(), olapTbl.getId(),
-                                        partition.getId(), idx.getId(), 
tablet.getId(),
-                                        System.currentTimeMillis());
-                                // the tablet status will be set again when 
being scheduled
-                                
tabletCtx.setTabletStatus(statusWithPrio.first);
-                                
tabletCtx.setOrigPriority(statusWithPrio.second);
-
-                                AddResult res = 
tabletScheduler.addTablet(tabletCtx, false /* not force */);
-                                if (res == AddResult.LIMIT_EXCEED) {
-                                    LOG.info("number of scheduling tablets in 
tablet scheduler"
-                                            + " exceed to limit. stop tablet 
checker");
-                                    break OUT;
-                                } else if (res == AddResult.ADDED) {
-                                    addToSchedulerTabletNum++;
-                                }
-                            }
-                        } // indices
-
-                        if (prioPartIsHealthy && isInPrios) {
-                            // if all replicas in this partition are healthy, 
remove this partition from
-                            // priorities.
-                            LOG.debug("partition is healthy, remove from 
prios: {}-{}-{}",
-                                    db.getId(), olapTbl.getId(), 
partition.getId());
-                            removePrios(new RepairTabletInfo(db.getId(),
-                                    olapTbl.getId(), 
Lists.newArrayList(partition.getId())));
+                        LoopControlStatus st = handlePartitionTablet(db, tbl, 
partition, false,
+                                aliveBeIdsInCluster, start, counter);
+                        if (st == LoopControlStatus.BREAK_OUT) {
+                            break OUT;
+                        } else {
+                            continue;
                         }
                     } // partitions
                 } finally {
@@ -296,14 +287,90 @@ public class TabletChecker extends MasterDaemon {
         } // end for dbs
 
         long cost = System.currentTimeMillis() - start;
-
         stat.counterTabletCheckCostMs.addAndGet(cost);
-        stat.counterTabletChecked.addAndGet(totalTabletNum);
-        stat.counterUnhealthyTabletNum.addAndGet(unhealthyTabletNum);
-        stat.counterTabletAddToBeScheduled.addAndGet(addToSchedulerTabletNum);
+        stat.counterTabletChecked.addAndGet(counter.totalTabletNum);
+        stat.counterUnhealthyTabletNum.addAndGet(counter.unhealthyTabletNum);
+        
stat.counterTabletAddToBeScheduled.addAndGet(counter.addToSchedulerTabletNum);
 
         LOG.info("finished to check tablets. 
unhealth/total/added/in_sched/not_ready: {}/{}/{}/{}/{}, cost: {} ms",
-                unhealthyTabletNum, totalTabletNum, addToSchedulerTabletNum, 
tabletInScheduler, tabletNotReady, cost);
+                counter.unhealthyTabletNum, counter.totalTabletNum, 
counter.addToSchedulerTabletNum,
+                counter.tabletInScheduler, counter.tabletNotReady, cost);
+    }
+
+    private LoopControlStatus handlePartitionTablet(Database db, OlapTable 
tbl, Partition partition, boolean isInPrios,
+                                                    List<Long> 
aliveBeIdsInCluster, long startTime, CheckerCounter counter) {
+        if (partition.getState() != PartitionState.NORMAL) {
+            // when alter job is in FINISHING state, partition state will be 
set to NORMAL,
+            // and we can schedule the tablets in it.
+            return LoopControlStatus.CONTINUE;
+        }
+        boolean prioPartIsHealthy = true;
+        /*
+         * Tablet in SHADOW index can not be repaired of balanced
+         */
+        for (MaterializedIndex idx : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+            for (Tablet tablet : idx.getTablets()) {
+                counter.totalTabletNum++;
+
+                if (tabletScheduler.containsTablet(tablet.getId())) {
+                    counter.tabletInScheduler++;
+                    continue;
+                }
+
+                Pair<TabletStatus, TabletSchedCtx.Priority> statusWithPrio = 
tablet.getHealthStatusWithPriority(
+                        infoService,
+                        db.getClusterName(),
+                        partition.getVisibleVersion(),
+                        partition.getVisibleVersionHash(),
+                        
tbl.getPartitionInfo().getReplicationNum(partition.getId()),
+                        aliveBeIdsInCluster);
+
+                if (statusWithPrio.first == TabletStatus.HEALTHY) {
+                    // Only set last status check time when status is healthy.
+                    tablet.setLastStatusCheckTime(startTime);
+                    continue;
+                } else if (isInPrios) {
+                    statusWithPrio.second = TabletSchedCtx.Priority.VERY_HIGH;
+                    prioPartIsHealthy = false;
+                }
+
+                counter.unhealthyTabletNum++;
+
+                if (!tablet.readyToBeRepaired(statusWithPrio.second)) {
+                    counter.tabletNotReady++;
+                    continue;
+                }
+
+                TabletSchedCtx tabletCtx = new TabletSchedCtx(
+                        TabletSchedCtx.Type.REPAIR,
+                        db.getClusterName(),
+                        db.getId(), tbl.getId(),
+                        partition.getId(), idx.getId(), tablet.getId(),
+                        System.currentTimeMillis());
+                // the tablet status will be set again when being scheduled
+                tabletCtx.setTabletStatus(statusWithPrio.first);
+                tabletCtx.setOrigPriority(statusWithPrio.second);
+
+                AddResult res = tabletScheduler.addTablet(tabletCtx, false /* 
not force */);
+                if (res == AddResult.LIMIT_EXCEED) {
+                    LOG.info("number of scheduling tablets in tablet scheduler"
+                            + " exceed to limit. stop tablet checker");
+                    return LoopControlStatus.BREAK_OUT;
+                } else if (res == AddResult.ADDED) {
+                    counter.addToSchedulerTabletNum++;
+                }
+            }
+        } // indices
+
+        if (prioPartIsHealthy && isInPrios) {
+            // if all replicas in this partition are healthy, remove this 
partition from
+            // priorities.
+            LOG.debug("partition is healthy, remove from prios: {}-{}-{}",
+                    db.getId(), tbl.getId(), partition.getId());
+            removePrios(new RepairTabletInfo(db.getId(),
+                    tbl.getId(), Lists.newArrayList(partition.getId())));
+        }
+        return LoopControlStatus.CONTINUE;
     }
 
     private boolean isInPrios(long dbId, long tblId, long partId) {
@@ -373,7 +440,7 @@ public class TabletChecker extends MasterDaemon {
      */
     public void repairTable(AdminRepairTableStmt stmt) throws DdlException {
         RepairTabletInfo repairTabletInfo = 
getRepairTabletInfo(stmt.getDbName(), stmt.getTblName(), stmt.getPartitions());
-        addPrios(repairTabletInfo, stmt.getTimeoutS());
+        addPrios(repairTabletInfo, stmt.getTimeoutS() * 1000);
         LOG.info("repair database: {}, table: {}, partition: {}", 
repairTabletInfo.dbId, repairTabletInfo.tblId, repairTabletInfo.partIds);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 8b1d308..2e26c70 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -63,6 +63,7 @@ public final class MetricRepo {
     public static final String TABLET_MAX_COMPACTION_SCORE = 
"tablet_max_compaction_score";
 
     public static LongCounterMetric COUNTER_REQUEST_ALL;
+    public static LongCounterMetric COUNTER_QUERY_BEGIN;
     public static LongCounterMetric COUNTER_QUERY_ALL;
     public static LongCounterMetric COUNTER_QUERY_ERR;
     public static LongCounterMetric COUNTER_QUERY_TABLE;
@@ -237,6 +238,8 @@ public final class MetricRepo {
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_REQUEST_ALL);
         COUNTER_QUERY_ALL = new LongCounterMetric("query_total", 
MetricUnit.REQUESTS, "total query");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_QUERY_ALL);
+        COUNTER_QUERY_BEGIN = new LongCounterMetric("query_begin", 
MetricUnit.REQUESTS, "query begin");
+        PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_QUERY_BEGIN);
         COUNTER_QUERY_ERR = new LongCounterMetric("query_err", 
MetricUnit.REQUESTS, "total error query");
         PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_QUERY_ERR);
         COUNTER_LOAD_ADD = new LongCounterMetric("load_add", 
MetricUnit.REQUESTS, "total load submit");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index eaa5b29..22f2b28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -283,6 +283,7 @@ public class StmtExecutor implements ProfileWriter {
 
             if (parsedStmt instanceof QueryStmt) {
                 context.getState().setIsQuery(true);
+                MetricRepo.COUNTER_QUERY_BEGIN.increase(1L);
                 int retryTime = Config.max_query_retry_time;
                 for (int i = 0; i < retryTime; i ++) {
                     try {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to