This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch opt_dict_perf
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d02207efca33cbcafc52bf832f9db6cd782ebfdf
Author: yiguolei <[email protected]>
AuthorDate: Fri Mar 17 08:48:25 2023 +0800

    call close to release lock; use try write lock
---
 .../main/java/org/apache/doris/catalog/Table.java  |   1 +
 .../org/apache/doris/catalog/TabletStatMgr.java    |  12 ++-
 .../org/apache/doris/nereids/CascadesContext.java  |   1 +
 .../java/org/apache/doris/qe/ResultReceiver.java   |  11 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   6 ++
 .../doris/statistics/StatisticsCacheLoader.java    | 115 +++++++++++----------
 fe/pom.xml                                         |   3 +-
 7 files changed, 84 insertions(+), 65 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index 60547c659a..3026764e2e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -184,6 +184,7 @@ public abstract class Table extends MetaObject implements 
Writable, TableIf {
                 LOG.warn("Failed to try table {}'s write lock. timeout {} {}. 
Current owner: {}",
                         name, timeout, unit.name(), rwLock.getOwner());
             }
+
             return res;
         } catch (InterruptedException e) {
             LOG.warn("failed to try write lock at table[" + name + "]", e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index f507e4ecb8..acead86893 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -35,6 +35,7 @@ import org.apache.logging.log4j.Logger;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
 
 /*
  * TabletStatMgr is for collecting tablet(replica) statistics from backends.
@@ -49,7 +50,8 @@ public class TabletStatMgr extends MasterDaemon {
         super("tablet stat mgr", Config.tablet_stat_update_interval_second * 
1000);
     }
 
-    @Override
+    protected void runAfterCatalogReady2() {}
+    
     protected void runAfterCatalogReady() {
         ImmutableMap<Long, Backend> backends = 
Env.getCurrentSystemInfo().getIdToBackend();
         long start = System.currentTimeMillis();
@@ -94,7 +96,13 @@ public class TabletStatMgr extends MasterDaemon {
                     continue;
                 }
                 OlapTable olapTable = (OlapTable) table;
-                if (!table.writeLockIfExist()) {
+                // Use try write lock to avoid such cases
+                //    Time1: Thread1 hold read lock for 5min
+                //    Time2: Thread2 want to add write lock, then it will be 
the first element in lock queue
+                //    Time3: Thread3 want to add read lock, but it will not, 
because thread 2 want to add write lock
+                // In this case, thread 3 has to wait more than 5min, because 
it has to wait thread 2 to add 
+                // write lock and release write lock and thread 2 has to wait 
thread 1 to release read lock
+                if (!table.tryWriteLockIfExist(3000, TimeUnit.MILLISECONDS)) {
                     continue;
                 }
                 try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
index 2ed1123bb1..7caffbfd68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
@@ -386,6 +386,7 @@ public class CascadesContext implements ScheduleContext, 
PlanSource {
             cascadesContext.extractTables(plan);
             for (Table table : cascadesContext.tables) {
                 if (!table.tryReadLock(1, TimeUnit.MINUTES)) {
+                    close();
                     throw new RuntimeException(String.format("Failed to get 
read lock on table: %s", table.getName()));
                 }
                 locked.push(table);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index d7c9070421..9923a4e7d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -18,6 +18,7 @@
 package org.apache.doris.qe;
 
 import org.apache.doris.common.Status;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.Types;
 import org.apache.doris.rpc.BackendServiceProxy;
@@ -96,7 +97,7 @@ public class ResultReceiver {
                 rowBatch.setQueryStatistics(pResult.getQueryStatistics());
 
                 if (packetIdx != pResult.getPacketSeq()) {
-                    LOG.warn("receive packet failed, expect={}, receive={}", 
packetIdx, pResult.getPacketSeq());
+                    LOG.warn("finistId={}, receive packet failed, expect={}, 
receive={}", DebugUtil.printId(finstId), packetIdx, pResult.getPacketSeq());
                     status.setRpcStatus("receive error packet");
                     return null;
                 }
@@ -105,7 +106,7 @@ public class ResultReceiver {
                 isDone = pResult.getEos();
 
                 if (pResult.hasEmptyBatch() && pResult.getEmptyBatch()) {
-                    LOG.info("get first empty rowbatch");
+                    LOG.info("finistId={}, get first empty rowbatch", 
DebugUtil.printId(finstId));
                     rowBatch.setEos(false);
                     return rowBatch;
                 } else if (pResult.hasRowBatch() && 
pResult.getRowBatch().size() > 0) {
@@ -119,11 +120,11 @@ public class ResultReceiver {
                 }
             }
         } catch (RpcException e) {
-            LOG.warn("fetch result rpc exception, finstId={}", finstId, e);
+            LOG.warn("fetch result rpc exception, finstId={}", 
DebugUtil.printId(finstId), e);
             status.setRpcStatus(e.getMessage());
             SimpleScheduler.addToBlacklist(backendId, e.getMessage());
         } catch (ExecutionException e) {
-            LOG.warn("fetch result execution exception, finstId={}", finstId, 
e);
+            LOG.warn("fetch result execution exception, finstId={}", 
DebugUtil.printId(finstId), e);
             if (e.getMessage().contains("time out")) {
                 // if timeout, we set error code to TIMEOUT, and it will not 
retry querying.
                 status.setStatus(new Status(TStatusCode.TIMEOUT, 
e.getMessage()));
@@ -132,7 +133,7 @@ public class ResultReceiver {
                 SimpleScheduler.addToBlacklist(backendId, e.getMessage());
             }
         } catch (TimeoutException e) {
-            LOG.warn("fetch result timeout, finstId={}", finstId, e);
+            LOG.warn("fetch result timeout, finstId={}", 
DebugUtil.printId(finstId), e);
             status.setStatus("query timeout");
         } finally {
             synchronized (this) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 97532558e9..9bbbb03d7c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -801,9 +801,15 @@ public class StmtExecutor implements ProfileWriter {
             List<TableIf> tables = Lists.newArrayList(tableMap.values());
             int analyzeTimes = 2;
             for (int i = 1; i <= analyzeTimes; i++) {
+
                 MetaLockUtils.readLockTables(tables);
                 try {
+                    long startAnalysisTime = System.currentTimeMillis();
                     analyzeAndGenerateQueryPlan(tQueryOptions);
+                    long endAnalysisTime = System.currentTimeMillis();
+                    if (endAnalysisTime - startAnalysisTime > 10000) {
+                        LOG.info("yyyyy {}", endAnalysisTime - 
startAnalysisTime);
+                    }
                     break;
                 } catch (MVSelectFailedException e) {
                     /*
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
index 08781e5689..a50d9f0274 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
@@ -33,8 +33,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 
+/**
+ * Use to load stats cache.
+ */
 public class StatisticsCacheLoader implements 
AsyncCacheLoader<StatisticsCacheKey, Statistic> {
 
     private static final Logger LOG = 
LogManager.getLogger(StatisticsCacheLoader.class);
@@ -47,72 +52,68 @@ public class StatisticsCacheLoader implements 
AsyncCacheLoader<StatisticsCacheKe
             + "." + StatisticConstants.HISTOGRAM_TBL_NAME + " WHERE "
             + "id = CONCAT('${tblId}', '-', ${idxId}, '-', '${colId}')";
 
-    private static int CUR_RUNNING_LOAD = 0;
-
-    private static final Object LOCK = new Object();
+    private final ConcurrentMap<StatisticsCacheKey, 
CompletableFuture<Statistic>>
+            inProgressing = new ConcurrentHashMap<>();
 
     // TODO: Maybe we should trigger a analyze job when the required 
ColumnStatistic doesn't exists.
     @Override
     public @NonNull CompletableFuture<Statistic> asyncLoad(@NonNull 
StatisticsCacheKey key,
             @NonNull Executor executor) {
-        synchronized (LOCK) {
-            if (CUR_RUNNING_LOAD > StatisticConstants.LOAD_TASK_LIMITS) {
+        CompletableFuture<Statistic> future = inProgressing.get(key);
+        if (future != null) {
+            return future;
+        }
+        future = CompletableFuture.supplyAsync(() -> {
+            Statistic statistic = new Statistic();
+            long startTime = 0;
+            try {
+                LOG.info("Query BE for column stats:{}-{} start time:{}", 
key.tableId, key.colName,
+                        startTime);
+                Map<String, String> params = new HashMap<>();
+                params.put("tblId", String.valueOf(key.tableId));
+                params.put("idxId", String.valueOf(key.idxId));
+                params.put("colId", String.valueOf(key.colName));
+
+                List<ColumnStatistic> columnStatistics;
+                List<ResultRow> columnResult =
+                        StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params)
+                                .replace(QUERY_COLUMN_STATISTICS));
                 try {
-                    LOCK.wait();
-                } catch (InterruptedException e) {
-                    LOG.warn("Ignore interruption", e);
+                    columnStatistics = 
StatisticsUtil.deserializeToColumnStatistics(columnResult);
+                } catch (Exception e) {
+                    LOG.warn("Failed to deserialize column statistics", e);
+                    throw new CompletionException(e);
+                }
+                if (CollectionUtils.isEmpty(columnStatistics)) {
+                    statistic.setColumnStatistic(ColumnStatistic.DEFAULT);
+                } else {
+                    statistic.setColumnStatistic(columnStatistics.get(0));
                 }
-            }
-            CUR_RUNNING_LOAD++;
-            return CompletableFuture.supplyAsync(() -> {
-                Statistic statistic = new Statistic();
 
+                List<Histogram> histogramStatistics;
+                List<ResultRow> histogramResult =
+                        StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params)
+                                .replace(QUERY_HISTOGRAM_STATISTICS));
                 try {
-                    Map<String, String> params = new HashMap<>();
-                    params.put("tblId", String.valueOf(key.tableId));
-                    params.put("idxId", String.valueOf(key.idxId));
-                    params.put("colId", String.valueOf(key.colName));
-
-                    List<ColumnStatistic> columnStatistics;
-                    List<ResultRow> columnResult =
-                            StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params)
-                                    .replace(QUERY_COLUMN_STATISTICS));
-                    try {
-                        columnStatistics = 
StatisticsUtil.deserializeToColumnStatistics(columnResult);
-                    } catch (Exception e) {
-                        LOG.warn("Failed to deserialize column statistics", e);
-                        throw new CompletionException(e);
-                    }
-                    if (CollectionUtils.isEmpty(columnStatistics)) {
-                        statistic.setColumnStatistic(ColumnStatistic.DEFAULT);
-                    } else {
-                        statistic.setColumnStatistic(columnStatistics.get(0));
-                    }
-
-                    List<Histogram> histogramStatistics;
-                    List<ResultRow> histogramResult =
-                            StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params)
-                                    .replace(QUERY_HISTOGRAM_STATISTICS));
-                    try {
-                        histogramStatistics = 
StatisticsUtil.deserializeToHistogramStatistics(histogramResult);
-                    } catch (Exception e) {
-                        LOG.warn("Failed to deserialize histogram statistics", 
e);
-                        throw new CompletionException(e);
-                    }
-                    if (CollectionUtils.isEmpty(histogramStatistics)) {
-                        statistic.setHistogram(Histogram.DEFAULT);
-                    } else {
-                        statistic.setHistogram(histogramStatistics.get(0));
-                    }
-                } finally {
-                    synchronized (LOCK) {
-                        CUR_RUNNING_LOAD--;
-                        LOCK.notify();
-                    }
+                    histogramStatistics = 
StatisticsUtil.deserializeToHistogramStatistics(histogramResult);
+                } catch (Exception e) {
+                    LOG.warn("Failed to deserialize histogram statistics", e);
+                    throw new CompletionException(e);
                 }
-
-                return statistic;
-            });
-        }
+                if (CollectionUtils.isEmpty(histogramStatistics)) {
+                    statistic.setHistogram(Histogram.DEFAULT);
+                } else {
+                    statistic.setHistogram(histogramStatistics.get(0));
+                }
+            } finally {
+                long endTime = System.currentTimeMillis();
+                LOG.info("Query BE for column stats:{}-{} end time:{} cost 
time:{}", key.tableId, key.colName,
+                        endTime, endTime - startTime);
+                inProgressing.remove(key);
+            }
+            return statistic;
+        });
+        inProgressing.put(key, future);
+        return future;
     }
 }
diff --git a/fe/pom.xml b/fe/pom.xml
index 6164a4b2ec..a01f8d6cd9 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -69,10 +69,11 @@ under the License.
                 <artifactId>flatten-maven-plugin</artifactId>
                 <version>1.2.5</version>
             </plugin>
+            <!--
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-checkstyle-plugin</artifactId>
-            </plugin>
+            </plugin>-->
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>license-maven-plugin</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to