This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new f80c9b504d5 [fix](stats) Fix thread leaks when doing checkpoint #27334
#27335
f80c9b504d5 is described below
commit f80c9b504d5af79d3aa0e8ffa61ab83cc722a0a1
Author: AKIRA <[email protected]>
AuthorDate: Wed Nov 22 01:22:05 2023 +0900
[fix](stats) Fix thread leaks when doing checkpoint #27334 #27335
---
.../java/org/apache/doris/statistics/AnalysisManager.java | 1 -
.../org/apache/doris/statistics/AnalysisTaskExecutor.java | 11 ++---------
.../java/org/apache/doris/statistics/StatisticsCollector.java | 1 -
.../test/java/org/apache/doris/statistics/AnalyzeTest.java | 7 -------
4 files changed, 2 insertions(+), 18 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 91f54208f6a..e5d997d3425 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -240,7 +240,6 @@ public class AnalysisManager implements Writable {
if (!Env.isCheckpointThread()) {
this.taskExecutor = new
AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num);
this.statisticsCache = new StatisticsCache();
- taskExecutor.start();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
index 58bae9fe66b..fb4530837e4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java
@@ -32,7 +32,7 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-public class AnalysisTaskExecutor extends Thread {
+public class AnalysisTaskExecutor {
private static final Logger LOG =
LogManager.getLogger(AnalysisTaskExecutor.class);
@@ -50,19 +50,12 @@ public class AnalysisTaskExecutor extends Thread {
TimeUnit.DAYS, new LinkedBlockingQueue<>(),
new BlockedPolicy("Analysis Job Executor",
Integer.MAX_VALUE),
"Analysis Job Executor", true);
+ cancelExpiredTask();
} else {
executors = null;
}
}
- @Override
- public void run() {
- if (Env.isCheckpointThread()) {
- return;
- }
- cancelExpiredTask();
- }
-
private void cancelExpiredTask() {
String name = "Expired Analysis Task Killer";
Thread t = new Thread(this::doCancelExpiredJob, name);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
index 638db553987..569965ff1eb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java
@@ -39,7 +39,6 @@ public abstract class StatisticsCollector extends
MasterDaemon {
public StatisticsCollector(String name, long intervalMs,
AnalysisTaskExecutor analysisTaskExecutor) {
super(name, intervalMs);
this.analysisTaskExecutor = analysisTaskExecutor;
- analysisTaskExecutor.start();
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
index 74d52cf20f1..1487d192466 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java
@@ -36,7 +36,6 @@ import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.Maps;
-import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
@@ -174,12 +173,6 @@ public class AnalyzeTest extends TestWithFeService {
.setState(AnalysisState.RUNNING)
.build();
new OlapAnalysisTask(analysisJobInfo).doExecute();
- new Expectations() {
- {
- stmtExecutor.execute();
- times = 1;
- }
- };
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]