This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a14eea8  HBASE-22963 Netty ByteBuf leak in rpc client implementation 
(#577)
a14eea8 is described below

commit a14eea82b4d1393c75bbf676ceb9c3fd5153f8e9
Author: Duo Zhang <[email protected]>
AuthorDate: Sun Sep 8 21:54:09 2019 +0800

    HBASE-22963 Netty ByteBuf leak in rpc client implementation (#577)
    
    Signed-off-by: Michael Stack <[email protected]>
---
 .../apache/hadoop/hbase/ipc/AbstractRpcClient.java |  1 +
 .../hbase/client/TestConnectionImplementation.java | 52 ++++++++++++++++++++--
 pom.xml                                            |  2 +
 3 files changed, 51 insertions(+), 4 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index c8904f4..7fdc4aa 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -450,6 +450,7 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
               + connection.remoteId);
           connections.removeValue(remoteId, connection);
           connection.shutdown();
+          connection.cleanupConnection();
         }
       }
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java
index 4d9f39b..33350b3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.apache.hadoop.hbase.util.Threads;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -81,6 +82,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
+import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level;
 
 /**
  * This class is for testing HBaseConnectionManager features
@@ -112,6 +115,7 @@ public class TestConnectionImplementation {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    ResourceLeakDetector.setLevel(Level.PARANOID);
     TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
     // Up the handlers; this test needs more than usual.
     
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
 10);
@@ -126,6 +130,11 @@ public class TestConnectionImplementation {
     TEST_UTIL.shutdownMiniCluster();
   }
 
+  @After
+  public void tearDown() throws IOException {
+    TEST_UTIL.getAdmin().balancerSwitch(true, true);
+  }
+
   @Test
   public void testClusterConnection() throws IOException {
     ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1,
@@ -284,7 +293,7 @@ public class TestConnectionImplementation {
     TableName tableName = TableName.valueOf("HCM-testConnectionClose" + 
allowsInterrupt);
     TEST_UTIL.createTable(tableName, FAM_NAM).close();
 
-    boolean previousBalance = TEST_UTIL.getAdmin().setBalancerRunning(false, 
true);
+    TEST_UTIL.getAdmin().balancerSwitch(false, true);
 
     Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
     // We want to work on a separate connection.
@@ -349,9 +358,9 @@ public class TestConnectionImplementation {
     RpcClient rpcClient = conn.getRpcClient();
 
     LOG.info("Going to cancel connections. connection=" + conn.toString() + ", 
sn=" + sn);
-    for (int i = 0; i < 5000; i++) {
+    for (int i = 0; i < 500; i++) {
       rpcClient.cancelConnections(sn);
-      Thread.sleep(5);
+      Thread.sleep(50);
     }
 
     step.compareAndSet(1, 2);
@@ -366,7 +375,6 @@ public class TestConnectionImplementation {
     table.close();
     connection.close();
     Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() 
== null);
-    TEST_UTIL.getAdmin().setBalancerRunning(previousBalance, true);
   }
 
   /**
@@ -1080,4 +1088,40 @@ public class TestConnectionImplementation {
       TEST_UTIL.deleteTable(tableName);
     }
   }
+
+  @Test
+  public void testMetaLookupThreadPoolCreated() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
+    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
+      TEST_UTIL.getAdmin().disableTable(tableName);
+      TEST_UTIL.getAdmin().deleteTable(tableName);
+    }
+    try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) {
+      byte[] row = Bytes.toBytes("test");
+      ConnectionImplementation c = ((ConnectionImplementation) 
TEST_UTIL.getConnection());
+      // check that metalookup pool would get created
+      c.relocateRegion(tableName, row);
+      ExecutorService ex = c.getCurrentMetaLookupPool();
+      assertNotNull(ex);
+    }
+  }
+
+  // There is no assertion, but you need to confirm that there is no resource 
leak output from netty
+  @Test
+  public void testCancelConnectionMemoryLeak() throws IOException, 
InterruptedException {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    TEST_UTIL.createTable(tableName, FAM_NAM).close();
+    TEST_UTIL.getAdmin().balancerSwitch(false, true);
+    try (Connection connection = 
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+      Table table = connection.getTable(tableName)) {
+      table.get(new Get(Bytes.toBytes("1")));
+      ServerName sn = 
TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName();
+      RpcClient rpcClient = ((ConnectionImplementation) 
connection).getRpcClient();
+      rpcClient.cancelConnections(sn);
+      Thread.sleep(1000);
+      System.gc();
+      Thread.sleep(1000);
+    }
+  }
 }
diff --git a/pom.xml b/pom.xml
index 08ff72f..92a79a9 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1480,10 +1480,12 @@
     <hbase-surefire.argLine>-enableassertions -Dhbase.build.id=${build.id} 
-Xmx${surefire.Xmx}
       -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true
       -Djava.awt.headless=true 
-Djdk.net.URLClassPath.disableClassPathURLCheck=true
+      -Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=paranoid
     </hbase-surefire.argLine>
     <hbase-surefire.cygwin-argLine>-enableassertions -Xmx${surefire.cygwinXmx}
       -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true
       "-Djava.library.path=${hadoop.library.path};${java.library.path}"
+      -Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=paranoid
     </hbase-surefire.cygwin-argLine>
     <!-- Surefire argLine defaults to Linux, cygwin argLine is used in the 
os.windows profile -->
     <argLine>${hbase-surefire.argLine}</argLine>

Reply via email to