This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 149ce9e [Bug][Memory Leak] Fix the issue of Catalog instance leakage
(#6895)
149ce9e is described below
commit 149ce9ecf49031d8df6876938c922bb8305ac3d2
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat Oct 23 16:44:51 2021 +0800
[Bug][Memory Leak] Fix the issue of Catalog instance leakage (#6895)
The Checkpoint Catalog instance may be incorrectly stored in MetricRepo,
causing memory leaks
---
.../org/apache/doris/common/ThreadPoolManager.java | 8 ++++---
.../org/apache/doris/master/ReportHandler.java | 4 ++--
.../apache/doris/metric/DorisMetricRegistry.java | 14 ++++++++++--
.../java/org/apache/doris/metric/MetricRepo.java | 26 +++++++++-------------
4 files changed, 30 insertions(+), 22 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index c699065..20ee89a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -17,12 +17,14 @@
package org.apache.doris.common;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.doris.metric.GaugeMetric;
import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.metric.MetricLabel;
import org.apache.doris.metric.MetricRepo;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -91,7 +93,7 @@ public class ThreadPoolManager {
};
gauge.addLabel(new MetricLabel("name", poolName))
.addLabel(new MetricLabel("type", poolMetricType));
- MetricRepo.addMetric(gauge);
+ MetricRepo.PALO_METRIC_REGISTER.addPaloMetrics(gauge);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 8ee321e..3775afb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -105,14 +105,14 @@ public class ReportHandler extends Daemon {
}
public ReportHandler() {
- GaugeMetric<Long> gaugeQueueSize = new GaugeMetric<Long>(
+ GaugeMetric<Long> gauge = new GaugeMetric<Long>(
"report_queue_size", MetricUnit.NOUNIT, "report queue size") {
@Override
public Long getValue() {
return (long) reportQueue.size();
}
};
- MetricRepo.addMetric(gaugeQueueSize);
+ MetricRepo.PALO_METRIC_REGISTER.addPaloMetrics(gauge);
}
public TMasterResult handleReport(TReportRequest request) throws
TException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java
index a0940da..9b07810 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java
@@ -17,6 +17,8 @@
package org.apache.doris.metric;
+import org.apache.doris.catalog.Catalog;
+
import com.google.common.collect.Lists;
import java.util.Collection;
@@ -34,7 +36,12 @@ public class DorisMetricRegistry {
}
public synchronized void addPaloMetrics(Metric paloMetric) {
- paloMetrics.add(paloMetric);
+ // No metric needs to be added to the Checkpoint thread.
+ // And if you add a metric in Checkpoint thread, it will cause the
metric to be added repeatedly,
+ // and the Checkpoint Catalog may be saved incorrectly, resulting in
FE memory leaks.
+ if (!Catalog.isCheckpointThread()) {
+ paloMetrics.add(paloMetric);
+ }
}
public synchronized List<Metric> getPaloMetrics() {
@@ -47,6 +54,9 @@ public class DorisMetricRegistry {
}
public synchronized void removeMetrics(String name) {
- paloMetrics = paloMetrics.stream().filter(m ->
!(m.getName().equals(name))).collect(Collectors.toList());
+ // Same reason as comment in addPaloMetrics()
+ if (!Catalog.isCheckpointThread()) {
+ paloMetrics = paloMetrics.stream().filter(m ->
!(m.getName().equals(name))).collect(Collectors.toList());
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index f72c5eb..90221e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -113,6 +113,7 @@ public final class MetricRepo {
private static ScheduledThreadPoolExecutor metricTimer =
ThreadPoolManager.newDaemonScheduledThreadPool(1, "Metric-Timer-Pool", true);
private static MetricCalculator metricCalculator = new MetricCalculator();
+ // init() should only be called after catalog is contructed.
public static synchronized void init() {
if (isInit) {
return;
@@ -184,8 +185,8 @@ public final class MetricRepo {
}
};
gauge.addLabel(new MetricLabel("job", "alter"))
- .addLabel(new MetricLabel("type", jobType.name()))
- .addLabel(new MetricLabel("state", "running"));
+ .addLabel(new MetricLabel("type", jobType.name()))
+ .addLabel(new MetricLabel("state", "running"));
PALO_METRIC_REGISTER.addPaloMetrics(gauge);
}
@@ -265,17 +266,17 @@ public final class MetricRepo {
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_MODE_SQL);
COUNTER_CACHE_HIT_SQL = new LongCounterMetric("cache_hit_sql",
MetricUnit.REQUESTS, "total hits query by sql model");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_HIT_SQL);
- COUNTER_CACHE_MODE_PARTITION = new
LongCounterMetric("query_mode_partition", MetricUnit.REQUESTS,
- "total query of partition mode");
+ COUNTER_CACHE_MODE_PARTITION = new
LongCounterMetric("query_mode_partition", MetricUnit.REQUESTS,
+ "total query of partition mode");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_MODE_PARTITION);
- COUNTER_CACHE_HIT_PARTITION = new
LongCounterMetric("cache_hit_partition", MetricUnit.REQUESTS,
- "total hits query by partition model");
+ COUNTER_CACHE_HIT_PARTITION = new
LongCounterMetric("cache_hit_partition", MetricUnit.REQUESTS,
+ "total hits query by partition model");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_HIT_PARTITION);
- COUNTER_CACHE_PARTITION_ALL = new LongCounterMetric("partition_all",
MetricUnit.REQUESTS,
- "scan partition of cache partition model");
+ COUNTER_CACHE_PARTITION_ALL = new LongCounterMetric("partition_all",
MetricUnit.REQUESTS,
+ "scan partition of cache partition model");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_PARTITION_ALL);
- COUNTER_CACHE_PARTITION_HIT = new LongCounterMetric("partition_hit",
MetricUnit.REQUESTS,
- "hit partition of cache partition model");
+ COUNTER_CACHE_PARTITION_HIT = new LongCounterMetric("partition_hit",
MetricUnit.REQUESTS,
+ "hit partition of cache partition model");
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_PARTITION_HIT);
COUNTER_LOAD_FINISHED = new LongCounterMetric("load_finished",
MetricUnit.REQUESTS, "total load finished");
@@ -554,10 +555,5 @@ public final class MetricRepo {
public static synchronized List<Metric> getMetricsByName(String name) {
return PALO_METRIC_REGISTER.getPaloMetricsByName(name);
}
-
- public static void addMetric(Metric<?> metric) {
- init();
- PALO_METRIC_REGISTER.addPaloMetrics(metric);
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]