This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 00d4fd8 thread leakage checker and memory usage reporter #1226 (#1452)
00d4fd8 is described below
commit 00d4fd831180f72e15c620486dd401efdffac7a0
Author: kaisun2000 <[email protected]>
AuthorDate: Fri Oct 9 14:19:12 2020 -0700
thread leakage checker and memory usage reporter #1226 (#1452)
Add thread leakage checker and memory usage reporter. The two utility would
be invoke before and after test classes. The would help to detect/monitor
resource/memory usage of the unit test.
Co-authored-by: Kai Sun <[email protected]>
---
.../org/apache/helix/ThreadLeakageChecker.java | 220 +++++++++++++++++++++
.../java/org/apache/helix/common/ZkTestBase.java | 29 ++-
.../multizk/TestMultiZkHelixJavaApis.java | 18 ++
3 files changed, 266 insertions(+), 1 deletion(-)
diff --git
a/helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
b/helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
new file mode 100644
index 0000000..c2d3e41
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/ThreadLeakageChecker.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.helix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.helix.common.ZkTestBase;
+
+
+public class ThreadLeakageChecker {
+ private static ThreadGroup getRootThreadGroup() {
+ ThreadGroup candidate = Thread.currentThread().getThreadGroup();
+ while (candidate.getParent() != null) {
+ candidate = candidate.getParent();
+ }
+ return candidate;
+ }
+
+ private static List<Thread> getAllThreads() {
+ ThreadGroup rootThreadGroup = getRootThreadGroup();
+ Thread[] threads = new Thread[32];
+ int count = rootThreadGroup.enumerate(threads);
+ while (count == threads.length) {
+ threads = new Thread[threads.length * 2];
+ count = rootThreadGroup.enumerate(threads);
+ }
+ return Arrays.asList(Arrays.copyOf(threads, count));
+ }
+
+ private static final String[] ZKSERVER_THRD_PATTERN =
+ {"SessionTracker", "NIOServerCxn", "SyncThread:", "ProcessThread"};
+ private static final String[] ZKSESSION_THRD_PATTERN =
+ new String[]{"ZkClient-EventThread", "ZkClient-AsyncCallback",
"-EventThread", "-SendThread"};
+ private static final String[] FORKJOIN_THRD_PATTERN = new
String[]{"ForkJoinPool"};
+ private static final String[] TIMER_THRD_PATTERN = new String[]{"time"};
+ private static final String[] TASKSTATEMODEL_THRD_PATTERN = new
String[]{"TaskStateModel"};
+
+ /*
+ * The two threshold -- warning and limit, are mostly empirical.
+ *
+ * ZkServer, current version has only 4 threads. In case later version use
more, we the limit to 100.
+ * The reasoning is that these ZkServer threads are not deemed as leaking no
matter how much they have.
+ *
+ * ZkSession is the ZkClient and native Zookeeper client we have. ZkTestBase
has 12 at starting up time.
+ * Thus, if there is more than that, it is the test code leaking ZkClient.
+ *
+ * ForkJoin is created by using parallel stream or similar Java features.
This is out of our control.
+ * Similar to ZkServer. The limit is to 100 while keep a small _warningLimit.
+ *
+ * Timer should not happen. Setting limit to 2 not 0 mostly because even
when you cancel the timer
+ * thread, it may take some not deterministic time for it to go away. So
give it some slack here
+ *
+ * Also note, this ThreadLeakage checker depends on the fact that tests are
running sequentially.
+ * Otherwise, the report is not going to be accurate.
+ */
+ private static enum ThreadCategory {
+ ZkServer("zookeeper server threads", 4, 100, ZKSERVER_THRD_PATTERN),
+ ZkSession("zkclient/zooKeeper session threads", 12, 12,
ZKSESSION_THRD_PATTERN),
+ ForkJoin("fork join pool threads", 2, 100, FORKJOIN_THRD_PATTERN),
+ Timer("timer threads", 0, 2, TIMER_THRD_PATTERN),
+ TaskStateModel("TaskStateModel threads", 0, 0,
TASKSTATEMODEL_THRD_PATTERN),
+ Other("Other threads", 0, 2, new String[]{""});
+
+ private String _description;
+ private List<String> _pattern;
+ private int _warningLimit;
+ private int _limit;
+
+ public String getDescription() {
+ return _description;
+ }
+
+ public Predicate<String> getMatchPred() {
+ if (this.name() != ThreadCategory.Other.name()) {
+ Predicate<String> pred = target -> {
+ for (String p : _pattern) {
+ if (target.toLowerCase().contains(p.toLowerCase())) {
+ return true;
+ }
+ }
+ return false;
+ };
+ return pred;
+ }
+
+ List<Predicate<String>> predicateList = new ArrayList<>();
+ for (ThreadCategory threadCategory : ThreadCategory.values()) {
+ if (threadCategory == ThreadCategory.Other) {
+ continue;
+ }
+ predicateList.add(threadCategory.getMatchPred());
+ }
+ Predicate<String> pred = target -> {
+ for (Predicate<String> p : predicateList) {
+ if (p.test(target)) {
+ return false;
+ }
+ }
+ return true;
+ };
+
+ return pred;
+ }
+
+ public int getWarningLimit() {
+ return _warningLimit;
+ }
+
+ public int getLimit() {
+ return _limit;
+ }
+
+ private ThreadCategory(String description, int warningLimit, int limit,
String[] patterns) {
+ _description = description;
+ _pattern = Arrays.asList(patterns);
+ _warningLimit = warningLimit;
+ _limit = limit;
+ }
+ }
+
+ public static boolean afterClassCheck(String classname) {
+ ZkTestBase.reportPhysicalMemory();
+ // step 1: get all active threads
+ List<Thread> threads = getAllThreads();
+ System.out.println(classname + " has active threads cnt:" +
threads.size());
+
+ // step 2: categorize threads
+ Map<String, List<Thread>> threadByName = null;
+ Map<ThreadCategory, Integer> threadByCnt = new HashMap<>();
+ Map<ThreadCategory, Set<Thread>> threadByCat = new HashMap<>();
+ try {
+ threadByName = threads.
+ stream().
+ filter(p -> p.getThreadGroup() != null &&
p.getThreadGroup().getName() != null
+ && ! "system".equals(p.getThreadGroup().getName())).
+ collect(Collectors.groupingBy(p -> p.getName()));
+ } catch (Exception e) {
+ System.out.println("filtering thread failure with exception:" +
e.getStackTrace());
+ }
+
+ threadByName.entrySet().stream().forEach(entry -> {
+ String key = entry.getKey(); // thread name
+ Arrays.asList(ThreadCategory.values()).stream().forEach(category -> {
+ if (category.getMatchPred().test(key)) {
+ Integer count = threadByCnt.containsKey(category) ?
threadByCnt.get(category) : 0;
+ threadByCnt.put(category, count + entry.getValue().size());
+ Set<Thread> thisSet = threadByCat.getOrDefault(category, new
HashSet<>());
+ thisSet.addAll(entry.getValue());
+ threadByCat.put(category, thisSet);
+ }
+ });
+ });
+
+ // todo: We should make the following System.out as LOG.INfO once we
achieve 0 thread leakage.
+ // todo: also the calling point of this method would fail the test
+ // step 3: enforce checking policy
+ boolean checkStatus = true;
+ for (ThreadCategory threadCategory : ThreadCategory.values()) {
+ int limit = threadCategory.getLimit();
+ int warningLimit = threadCategory.getWarningLimit();
+
+ Integer categoryThreadCnt = threadByCnt.get(threadCategory);
+ if (categoryThreadCnt != null) {
+ boolean dumpThread = false;
+ if (categoryThreadCnt > limit) {
+ checkStatus = false;
+ System.out.println(
+ "Failure " + threadCategory.getDescription() + " has " +
categoryThreadCnt + " thread");
+ dumpThread = true;
+ } else if (categoryThreadCnt > warningLimit) {
+ System.out.println(
+ "Warning " + threadCategory.getDescription() + " has " +
categoryThreadCnt + " thread");
+ dumpThread = true;
+ } else {
+ System.out.println(threadCategory.getDescription() + " has " +
categoryThreadCnt + " thread");
+ }
+ if (!dumpThread) {
+ continue;
+ }
+ // print first 100 thread names
+ int i = 0;
+ for (Thread t : threadByCat.get(threadCategory)) {
+ System.out.println(i + " thread:" + t.getName());
+ i++;
+ if (i == 100) {
+ System.out.println(" skipping the rest");
+ break;
+ }
+ }
+ }
+ }
+
+ return checkStatus;
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 67fee96..c9741a9 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -44,6 +44,7 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.TestHelper;
+import org.apache.helix.ThreadLeakageChecker;
import org.apache.helix.api.config.HelixConfigProperty;
import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.Pipeline;
@@ -127,8 +128,22 @@ public class ZkTestBase {
protected Map<String, ClusterSetup> _clusterSetupMap = new HashMap<>();
protected Map<String, BaseDataAccessor> _baseDataAccessorMap = new
HashMap<>();
+ static public void reportPhysicalMemory() {
+ com.sun.management.OperatingSystemMXBean os =
(com.sun.management.OperatingSystemMXBean)
+ java.lang.management.ManagementFactory.getOperatingSystemMXBean();
+ long physicalMemorySize = os.getTotalPhysicalMemorySize();
+ System.out.println("************ SYSTEM Physical Memory:" +
physicalMemorySize);
+
+ long MB = 1024 * 1024;
+ Runtime runtime = Runtime.getRuntime();
+ long free = runtime.freeMemory()/MB;
+ long total = runtime.totalMemory()/MB;
+ System.out.println("************ total memory:" + total + " free memory:"
+ free);
+ }
+
@BeforeSuite
public void beforeSuite() throws Exception {
+ reportPhysicalMemory();
// TODO: use logging.properties file to config java.util.logging.Logger
levels
java.util.logging.Logger topJavaLogger =
java.util.logging.Logger.getLogger("");
topJavaLogger.setLevel(Level.WARNING);
@@ -710,7 +725,8 @@ public class ZkTestBase {
}
@AfterClass
- public void cleanupLiveInstanceOwners() {
+ public void cleanupLiveInstanceOwners() throws InterruptedException {
+ String testClassName = this.getShortClassName();
for (String cluster : _liveInstanceOwners.keySet()) {
Map<String, HelixZkClient> clientMap = _liveInstanceOwners.get(cluster);
for (HelixZkClient client : clientMap.values()) {
@@ -719,6 +735,17 @@ public class ZkTestBase {
clientMap.clear();
}
_liveInstanceOwners.clear();
+
+ boolean status = false;
+ try {
+ status = ThreadLeakageChecker.afterClassCheck(testClassName);
+ } catch (Exception e) {
+ LOG.error("ThreadLeakageChecker exception:", e);
+ }
+ // todo: We should fail test here once we achieved 0 leakage and remove
the following System print
+ if (!status) {
+ System.out.println("---------- Test Class " + testClassName + " thread
leakage detected! ---------------");
+ }
}
protected List<LiveInstance> setupLiveInstances(String clusterName, int[]
liveInstances) {
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index 8f62207..a8f2c2c 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -43,7 +43,9 @@ import org.apache.helix.HelixManagerProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.TestHelper;
+import org.apache.helix.ThreadLeakageChecker;
import org.apache.helix.api.config.RebalanceConfig;
+import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -83,6 +85,8 @@ import
org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -95,6 +99,7 @@ import org.testng.annotations.Test;
* This test verifies that all Helix Java APIs work as expected.
*/
public class TestMultiZkHelixJavaApis {
+ private static Logger LOG =
LoggerFactory.getLogger(TestMultiZkHelixJavaApis.class);
private static final int NUM_ZK = 3;
private static final Map<String, ZkServer> ZK_SERVER_MAP = new HashMap<>();
private static final Map<String, HelixZkClient> ZK_CLIENT_MAP = new
HashMap<>();
@@ -170,6 +175,8 @@ public class TestMultiZkHelixJavaApis {
@AfterClass
public void afterClass() throws Exception {
+ String testClassName = getClass().getSimpleName();
+
try {
// Kill all mock controllers and participants
MOCK_CONTROLLERS.values().forEach(ClusterControllerManager::syncStop);
@@ -216,6 +223,17 @@ public class TestMultiZkHelixJavaApis {
System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
}
}
+
+ boolean status = false;
+ try {
+ status = ThreadLeakageChecker.afterClassCheck(testClassName);
+ } catch (Exception e) {
+ LOG.error("ThreadLeakageChecker exception:", e);
+ }
+ // todo: We should fail test here once we achieved 0 leakage and remove
the following System print
+ if (!status) {
+ System.out.println("---------- Test Class " + testClassName + " thread
leakage detected! ---------------");
+ }
}
/**