This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d12112b930 [fix](fe) Fix mem leaks (#14570)
d12112b930 is described below

commit d12112b9304871fe820bf61c6e8a6145126e1593
Author: Kikyou1997 <[email protected]>
AuthorDate: Fri Nov 25 09:16:54 2022 +0800

    [fix](fe) Fix mem leaks (#14570)
    
    1. Fix memory leaks in StmtExecutor::executeInternalQuery
    2. Limit the number of concurrent running load task for statistics cache
---
 .../java/org/apache/doris/qe/StmtExecutor.java     | 86 +++++++++++-----------
 .../doris/statistics/StatisticConstants.java       |  5 +-
 .../doris/statistics/StatisticsCacheLoader.java    | 57 +++++++++-----
 3 files changed, 88 insertions(+), 60 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c22cb0c2d8..cd8df03f24 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1792,52 +1792,56 @@ public class StmtExecutor implements ProfileWriter {
     }
 
     public List<ResultRow> executeInternalQuery() {
-        analyzer = new Analyzer(context.getEnv(), context);
-        try {
-            analyze(context.getSessionVariable().toThrift());
-        } catch (UserException e) {
-            LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, e);
-            return null;
-        }
-        planner.getFragments();
-        RowBatch batch;
-        coord = new Coordinator(context, analyzer, planner);
         try {
-            QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
-                    new QeProcessorImpl.QueryInfo(context, 
originStmt.originStmt, coord));
-        } catch (UserException e) {
-            LOG.warn(e.getMessage(), e);
-        }
+            analyzer = new Analyzer(context.getEnv(), context);
+            try {
+                analyze(context.getSessionVariable().toThrift());
+            } catch (UserException e) {
+                LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, 
e);
+                return null;
+            }
+            planner.getFragments();
+            RowBatch batch;
+            coord = new Coordinator(context, analyzer, planner);
+            try {
+                QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
+                        new QeProcessorImpl.QueryInfo(context, 
originStmt.originStmt, coord));
+            } catch (UserException e) {
+                LOG.warn(e.getMessage(), e);
+            }
 
-        coord.setProfileWriter(this);
-        Span queryScheduleSpan = context.getTracer()
-                .spanBuilder("internal SQL 
schedule").setParent(Context.current()).startSpan();
-        try (Scope scope = queryScheduleSpan.makeCurrent()) {
-            coord.exec();
-        } catch (Exception e) {
-            queryScheduleSpan.recordException(e);
-            LOG.warn("Unexpected exception when SQL running", e);
-        } finally {
-            queryScheduleSpan.end();
-        }
-        Span fetchResultSpan = context.getTracer().spanBuilder("fetch internal 
SQL result")
-                .setParent(Context.current()).startSpan();
-        List<ResultRow> resultRows = new ArrayList<>();
-        try (Scope scope = fetchResultSpan.makeCurrent()) {
-            while (true) {
-                batch = coord.getNext();
-                if (batch == null || batch.isEos()) {
-                    return resultRows;
-                } else {
-                    
resultRows.addAll(convertResultBatchToResultRows(batch.getBatch()));
+            coord.setProfileWriter(this);
+            Span queryScheduleSpan = context.getTracer()
+                    .spanBuilder("internal SQL 
schedule").setParent(Context.current()).startSpan();
+            try (Scope scope = queryScheduleSpan.makeCurrent()) {
+                coord.exec();
+            } catch (Exception e) {
+                queryScheduleSpan.recordException(e);
+                LOG.warn("Unexpected exception when SQL running", e);
+            } finally {
+                queryScheduleSpan.end();
+            }
+            Span fetchResultSpan = context.getTracer().spanBuilder("fetch 
internal SQL result")
+                    .setParent(Context.current()).startSpan();
+            List<ResultRow> resultRows = new ArrayList<>();
+            try (Scope scope = fetchResultSpan.makeCurrent()) {
+                while (true) {
+                    batch = coord.getNext();
+                    if (batch == null || batch.isEos()) {
+                        return resultRows;
+                    } else {
+                        
resultRows.addAll(convertResultBatchToResultRows(batch.getBatch()));
+                    }
                 }
+            } catch (Exception e) {
+                LOG.warn("Unexpected exception when SQL running", e);
+                fetchResultSpan.recordException(e);
+                return null;
+            } finally {
+                fetchResultSpan.end();
             }
-        } catch (Exception e) {
-            LOG.warn("Unexpected exception when SQL running", e);
-            fetchResultSpan.recordException(e);
-            return null;
         } finally {
-            fetchResultSpan.end();
+            QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
index d6d04683bf..dc89cc42fa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
@@ -53,8 +53,11 @@ public class StatisticConstants {
     public static final long STATISTICS_RECORDS_CACHE_SIZE = 100000;
 
     /**
-     * If analysys job execution time exceeds this time, it would be cancelled.
+     * If analysis job execution time exceeds this time, it would be cancelled.
      */
     public static final long STATISTICS_TASKS_TIMEOUT_IN_MS = 
TimeUnit.MINUTES.toMillis(10);
 
+
+    public static final int LOAD_TASK_LIMITS = 10;
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
index be34a65622..d27f5893b5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
@@ -43,28 +43,49 @@ public class StatisticsCacheLoader implements 
AsyncCacheLoader<StatisticsCacheKe
             + "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE "
             + "id = CONCAT('${tblId}', '-', '${colId}')";
 
+    private static int CUR_RUNNING_LOAD = 0;
+
+    private static final Object LOCK = new Object();
+
     // TODO: Maybe we should trigger a analyze job when the required 
ColumnStatistic doesn't exists.
     @Override
     public @NonNull CompletableFuture<ColumnStatistic> asyncLoad(@NonNull 
StatisticsCacheKey key,
             @NonNull Executor executor) {
-        return CompletableFuture.supplyAsync(() -> {
-            Map<String, String> params = new HashMap<>();
-            params.put("tblId", String.valueOf(key.tableId));
-            params.put("colId", String.valueOf(key.colName));
-            List<ResultRow> resultBatches =
-                    StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params)
-                            .replace(QUERY_COLUMN_STATISTICS));
-            List<ColumnStatistic> columnStatistics = null;
-            try {
-                columnStatistics = 
StatisticsUtil.deserializeToColumnStatistics(resultBatches);
-            } catch (Exception e) {
-                LOG.warn("Failed to deserialize column statistics", e);
-                throw new CompletionException(e);
-            }
-            if (CollectionUtils.isEmpty(columnStatistics)) {
-                return ColumnStatistic.DEFAULT;
+        synchronized (LOCK) {
+            if (CUR_RUNNING_LOAD > StatisticConstants.LOAD_TASK_LIMITS) {
+                try {
+                    LOCK.wait();
+                } catch (InterruptedException e) {
+                    LOG.warn("Ignore interruption", e);
+                }
             }
-            return columnStatistics.get(0);
-        });
+            CUR_RUNNING_LOAD++;
+            return CompletableFuture.supplyAsync(() -> {
+                try {
+                    Map<String, String> params = new HashMap<>();
+                    params.put("tblId", String.valueOf(key.tableId));
+                    params.put("colId", String.valueOf(key.colName));
+                    List<ResultRow> resultBatches =
+                            StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params)
+                                    .replace(QUERY_COLUMN_STATISTICS));
+                    List<ColumnStatistic> columnStatistics = null;
+                    try {
+                        columnStatistics = 
StatisticsUtil.deserializeToColumnStatistics(resultBatches);
+                    } catch (Exception e) {
+                        LOG.warn("Failed to deserialize column statistics", e);
+                        throw new CompletionException(e);
+                    }
+                    if (CollectionUtils.isEmpty(columnStatistics)) {
+                        return ColumnStatistic.DEFAULT;
+                    }
+                    return columnStatistics.get(0);
+                } finally {
+                    synchronized (LOCK) {
+                        CUR_RUNNING_LOAD--;
+                        LOCK.notify();
+                    }
+                }
+            });
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to