This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-0.15 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 56f01d67217dba9b2c543f0c52db7d43431de753 Author: Mingyu Chen <[email protected]> AuthorDate: Sat Oct 23 16:44:51 2021 +0800 [Bug][Memory Leak] Fix the issue of Catalog instance leakage (#6895) The Checkpoint Catalog instance may be incorrectly stored in MetricRepo, causing memory leaks --- .../org/apache/doris/common/ThreadPoolManager.java | 8 ++++--- .../org/apache/doris/master/ReportHandler.java | 4 ++-- .../apache/doris/metric/DorisMetricRegistry.java | 14 ++++++++++-- .../java/org/apache/doris/metric/MetricRepo.java | 26 +++++++++------------- 4 files changed, 30 insertions(+), 22 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java index c699065..20ee89a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -17,12 +17,14 @@ package org.apache.doris.common; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.doris.metric.GaugeMetric; import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.metric.MetricLabel; import org.apache.doris.metric.MetricRepo; + +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -91,7 +93,7 @@ public class ThreadPoolManager { }; gauge.addLabel(new MetricLabel("name", poolName)) .addLabel(new MetricLabel("type", poolMetricType)); - MetricRepo.addMetric(gauge); + MetricRepo.PALO_METRIC_REGISTER.addPaloMetrics(gauge); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 8ee321e..3775afb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -105,14 +105,14 @@ public class ReportHandler extends Daemon { } public ReportHandler() { - GaugeMetric<Long> gaugeQueueSize = new GaugeMetric<Long>( + GaugeMetric<Long> gauge = new GaugeMetric<Long>( "report_queue_size", MetricUnit.NOUNIT, "report queue size") { @Override public Long getValue() { return (long) reportQueue.size(); } }; - MetricRepo.addMetric(gaugeQueueSize); + MetricRepo.PALO_METRIC_REGISTER.addPaloMetrics(gauge); } public TMasterResult handleReport(TReportRequest request) throws TException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java b/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java index a0940da..9b07810 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java @@ -17,6 +17,8 @@ package org.apache.doris.metric; +import org.apache.doris.catalog.Catalog; + import com.google.common.collect.Lists; import java.util.Collection; @@ -34,7 +36,12 @@ public class DorisMetricRegistry { } public synchronized void addPaloMetrics(Metric paloMetric) { - paloMetrics.add(paloMetric); + // No metric needs to be added to the Checkpoint thread. + // And if you add a metric in Checkpoint thread, it will cause the metric to be added repeatedly, + // and the Checkpoint Catalog may be saved incorrectly, resulting in FE memory leaks. + if (!Catalog.isCheckpointThread()) { + paloMetrics.add(paloMetric); + } } public synchronized List<Metric> getPaloMetrics() { @@ -47,6 +54,9 @@ public class DorisMetricRegistry { } public synchronized void removeMetrics(String name) { - paloMetrics = paloMetrics.stream().filter(m -> !(m.getName().equals(name))).collect(Collectors.toList()); + // Same reason as comment in addPaloMetrics() + if (!Catalog.isCheckpointThread()) { + paloMetrics = paloMetrics.stream().filter(m -> !(m.getName().equals(name))).collect(Collectors.toList()); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index f72c5eb..90221e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -113,6 +113,7 @@ public final class MetricRepo { private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, "Metric-Timer-Pool", true); private static MetricCalculator metricCalculator = new MetricCalculator(); + // init() should only be called after catalog is contructed. public static synchronized void init() { if (isInit) { return; @@ -184,8 +185,8 @@ public final class MetricRepo { } }; gauge.addLabel(new MetricLabel("job", "alter")) - .addLabel(new MetricLabel("type", jobType.name())) - .addLabel(new MetricLabel("state", "running")); + .addLabel(new MetricLabel("type", jobType.name())) + .addLabel(new MetricLabel("state", "running")); PALO_METRIC_REGISTER.addPaloMetrics(gauge); } @@ -265,17 +266,17 @@ public final class MetricRepo { PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_MODE_SQL); COUNTER_CACHE_HIT_SQL = new LongCounterMetric("cache_hit_sql", MetricUnit.REQUESTS, "total hits query by sql model"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_HIT_SQL); - COUNTER_CACHE_MODE_PARTITION = new LongCounterMetric("query_mode_partition", MetricUnit.REQUESTS, - "total query of partition mode"); + COUNTER_CACHE_MODE_PARTITION = new LongCounterMetric("query_mode_partition", MetricUnit.REQUESTS, + "total query of partition mode"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_MODE_PARTITION); - COUNTER_CACHE_HIT_PARTITION = new LongCounterMetric("cache_hit_partition", MetricUnit.REQUESTS, - "total hits query by partition model"); + COUNTER_CACHE_HIT_PARTITION = new LongCounterMetric("cache_hit_partition", MetricUnit.REQUESTS, + "total hits query by partition model"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_HIT_PARTITION); - COUNTER_CACHE_PARTITION_ALL = new LongCounterMetric("partition_all", MetricUnit.REQUESTS, - "scan partition of cache partition model"); + COUNTER_CACHE_PARTITION_ALL = new LongCounterMetric("partition_all", MetricUnit.REQUESTS, + "scan partition of cache partition model"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_PARTITION_ALL); - COUNTER_CACHE_PARTITION_HIT = new LongCounterMetric("partition_hit", MetricUnit.REQUESTS, - "hit partition of cache partition model"); + COUNTER_CACHE_PARTITION_HIT = new LongCounterMetric("partition_hit", MetricUnit.REQUESTS, + "hit partition of cache partition model"); PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_CACHE_PARTITION_HIT); COUNTER_LOAD_FINISHED = new LongCounterMetric("load_finished", MetricUnit.REQUESTS, "total load finished"); @@ -554,10 +555,5 @@ public final class MetricRepo { public static synchronized List<Metric> getMetricsByName(String name) { return PALO_METRIC_REGISTER.getPaloMetricsByName(name); } - - public static void addMetric(Metric<?> metric) { - init(); - PALO_METRIC_REGISTER.addPaloMetrics(metric); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
