This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d12112b930 [fix](fe) Fix mem leaks (#14570)
d12112b930 is described below
commit d12112b9304871fe820bf61c6e8a6145126e1593
Author: Kikyou1997 <[email protected]>
AuthorDate: Fri Nov 25 09:16:54 2022 +0800
[fix](fe) Fix mem leaks (#14570)
1. Fix memory leaks in StmtExecutor::executeInternalQuery
2. Limit the number of concurrent running load task for statistics cache
---
.../java/org/apache/doris/qe/StmtExecutor.java | 86 +++++++++++-----------
.../doris/statistics/StatisticConstants.java | 5 +-
.../doris/statistics/StatisticsCacheLoader.java | 57 +++++++++-----
3 files changed, 88 insertions(+), 60 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c22cb0c2d8..cd8df03f24 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1792,52 +1792,56 @@ public class StmtExecutor implements ProfileWriter {
}
public List<ResultRow> executeInternalQuery() {
- analyzer = new Analyzer(context.getEnv(), context);
- try {
- analyze(context.getSessionVariable().toThrift());
- } catch (UserException e) {
- LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, e);
- return null;
- }
- planner.getFragments();
- RowBatch batch;
- coord = new Coordinator(context, analyzer, planner);
try {
- QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
- new QeProcessorImpl.QueryInfo(context,
originStmt.originStmt, coord));
- } catch (UserException e) {
- LOG.warn(e.getMessage(), e);
- }
+ analyzer = new Analyzer(context.getEnv(), context);
+ try {
+ analyze(context.getSessionVariable().toThrift());
+ } catch (UserException e) {
+ LOG.warn("Internal SQL execution failed, SQL: {}", originStmt,
e);
+ return null;
+ }
+ planner.getFragments();
+ RowBatch batch;
+ coord = new Coordinator(context, analyzer, planner);
+ try {
+ QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
+ new QeProcessorImpl.QueryInfo(context,
originStmt.originStmt, coord));
+ } catch (UserException e) {
+ LOG.warn(e.getMessage(), e);
+ }
- coord.setProfileWriter(this);
- Span queryScheduleSpan = context.getTracer()
- .spanBuilder("internal SQL
schedule").setParent(Context.current()).startSpan();
- try (Scope scope = queryScheduleSpan.makeCurrent()) {
- coord.exec();
- } catch (Exception e) {
- queryScheduleSpan.recordException(e);
- LOG.warn("Unexpected exception when SQL running", e);
- } finally {
- queryScheduleSpan.end();
- }
- Span fetchResultSpan = context.getTracer().spanBuilder("fetch internal
SQL result")
- .setParent(Context.current()).startSpan();
- List<ResultRow> resultRows = new ArrayList<>();
- try (Scope scope = fetchResultSpan.makeCurrent()) {
- while (true) {
- batch = coord.getNext();
- if (batch == null || batch.isEos()) {
- return resultRows;
- } else {
-
resultRows.addAll(convertResultBatchToResultRows(batch.getBatch()));
+ coord.setProfileWriter(this);
+ Span queryScheduleSpan = context.getTracer()
+ .spanBuilder("internal SQL
schedule").setParent(Context.current()).startSpan();
+ try (Scope scope = queryScheduleSpan.makeCurrent()) {
+ coord.exec();
+ } catch (Exception e) {
+ queryScheduleSpan.recordException(e);
+ LOG.warn("Unexpected exception when SQL running", e);
+ } finally {
+ queryScheduleSpan.end();
+ }
+ Span fetchResultSpan = context.getTracer().spanBuilder("fetch
internal SQL result")
+ .setParent(Context.current()).startSpan();
+ List<ResultRow> resultRows = new ArrayList<>();
+ try (Scope scope = fetchResultSpan.makeCurrent()) {
+ while (true) {
+ batch = coord.getNext();
+ if (batch == null || batch.isEos()) {
+ return resultRows;
+ } else {
+
resultRows.addAll(convertResultBatchToResultRows(batch.getBatch()));
+ }
}
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception when SQL running", e);
+ fetchResultSpan.recordException(e);
+ return null;
+ } finally {
+ fetchResultSpan.end();
}
- } catch (Exception e) {
- LOG.warn("Unexpected exception when SQL running", e);
- fetchResultSpan.recordException(e);
- return null;
} finally {
- fetchResultSpan.end();
+ QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
index d6d04683bf..dc89cc42fa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
@@ -53,8 +53,11 @@ public class StatisticConstants {
public static final long STATISTICS_RECORDS_CACHE_SIZE = 100000;
/**
- * If analysys job execution time exceeds this time, it would be cancelled.
+ * If analysis job execution time exceeds this time, it would be cancelled.
*/
public static final long STATISTICS_TASKS_TIMEOUT_IN_MS =
TimeUnit.MINUTES.toMillis(10);
+
+ public static final int LOAD_TASK_LIMITS = 10;
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
index be34a65622..d27f5893b5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java
@@ -43,28 +43,49 @@ public class StatisticsCacheLoader implements
AsyncCacheLoader<StatisticsCacheKe
+ "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE "
+ "id = CONCAT('${tblId}', '-', '${colId}')";
+ private static int CUR_RUNNING_LOAD = 0;
+
+ private static final Object LOCK = new Object();
+
// TODO: Maybe we should trigger a analyze job when the required
ColumnStatistic doesn't exists.
@Override
public @NonNull CompletableFuture<ColumnStatistic> asyncLoad(@NonNull
StatisticsCacheKey key,
@NonNull Executor executor) {
- return CompletableFuture.supplyAsync(() -> {
- Map<String, String> params = new HashMap<>();
- params.put("tblId", String.valueOf(key.tableId));
- params.put("colId", String.valueOf(key.colName));
- List<ResultRow> resultBatches =
- StatisticsUtil.execStatisticQuery(new
StringSubstitutor(params)
- .replace(QUERY_COLUMN_STATISTICS));
- List<ColumnStatistic> columnStatistics = null;
- try {
- columnStatistics =
StatisticsUtil.deserializeToColumnStatistics(resultBatches);
- } catch (Exception e) {
- LOG.warn("Failed to deserialize column statistics", e);
- throw new CompletionException(e);
- }
- if (CollectionUtils.isEmpty(columnStatistics)) {
- return ColumnStatistic.DEFAULT;
+ synchronized (LOCK) {
+ if (CUR_RUNNING_LOAD > StatisticConstants.LOAD_TASK_LIMITS) {
+ try {
+ LOCK.wait();
+ } catch (InterruptedException e) {
+ LOG.warn("Ignore interruption", e);
+ }
}
- return columnStatistics.get(0);
- });
+ CUR_RUNNING_LOAD++;
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ Map<String, String> params = new HashMap<>();
+ params.put("tblId", String.valueOf(key.tableId));
+ params.put("colId", String.valueOf(key.colName));
+ List<ResultRow> resultBatches =
+ StatisticsUtil.execStatisticQuery(new
StringSubstitutor(params)
+ .replace(QUERY_COLUMN_STATISTICS));
+ List<ColumnStatistic> columnStatistics = null;
+ try {
+ columnStatistics =
StatisticsUtil.deserializeToColumnStatistics(resultBatches);
+ } catch (Exception e) {
+ LOG.warn("Failed to deserialize column statistics", e);
+ throw new CompletionException(e);
+ }
+ if (CollectionUtils.isEmpty(columnStatistics)) {
+ return ColumnStatistic.DEFAULT;
+ }
+ return columnStatistics.get(0);
+ } finally {
+ synchronized (LOCK) {
+ CUR_RUNNING_LOAD--;
+ LOCK.notify();
+ }
+ }
+ });
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]