HBASE-15495 Connection leak in FanOutOneBlockAsyncDFSOutputHelper

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5fcadb86
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5fcadb86
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5fcadb86

Branch: refs/heads/hbase-12439
Commit: 5fcadb86ab9981b69fba92e590cffc028e2b78b3
Parents: 12f66e3
Author: zhangduo <[email protected]>
Authored: Tue Mar 22 10:33:03 2016 +0800
Committer: zhangduo <[email protected]>
Committed: Wed Mar 23 09:56:40 2016 +0800

----------------------------------------------------------------------
 .../FanOutOneBlockAsyncDFSOutputHelper.java     |  23 +++-
 .../util/TestFanOutOneBlockAsyncDFSOutput.java  | 137 +++++++++++++------
 2 files changed, 114 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5fcadb86/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
index d34bbb0..ea71701 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -99,6 +99,7 @@ import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
 import io.netty.util.concurrent.Promise;
 
 /**
@@ -594,13 +595,15 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
     beginFileLease(client, src, stat.getFileId());
     boolean succ = false;
     LocatedBlock locatedBlock = null;
-    List<Channel> datanodeList = new ArrayList<>();
+    List<Future<Channel>> futureList = null;
     try {
       DataChecksum summer = createChecksum(client);
       locatedBlock = namenode.addBlock(src, client.getClientName(), null, 
null, stat.getFileId(),
         null);
-      for (Future<Channel> future : connectToDataNodes(conf, clientName, 
locatedBlock, 0L, 0L,
-        PIPELINE_SETUP_CREATE, summer, eventLoop)) {
+      List<Channel> datanodeList = new ArrayList<>();
+      futureList = connectToDataNodes(conf, clientName, locatedBlock, 0L, 0L, 
PIPELINE_SETUP_CREATE,
+        summer, eventLoop);
+      for (Future<Channel> future : futureList) {
         // fail the creation if there are connection failures since we are 
fail-fast. The upper
         // layer should retry itself if needed.
         datanodeList.add(future.syncUninterruptibly().getNow());
@@ -610,8 +613,18 @@ public class FanOutOneBlockAsyncDFSOutputHelper {
           stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, 
ALLOC);
     } finally {
       if (!succ) {
-        for (Channel c : datanodeList) {
-          c.close();
+        if (futureList != null) {
+          for (Future<Channel> f : futureList) {
+            f.addListener(new FutureListener<Channel>() {
+
+              @Override
+              public void operationComplete(Future<Channel> future) throws 
Exception {
+                if (future.isSuccess()) {
+                  future.getNow().close();
+                }
+              }
+            });
+          }
         }
         endFileLease(client, src, stat.getFileId());
         fsUtils.recoverFileLease(dfs, new Path(src), conf, new 
CancelOnClose(client));

http://git-wip-us.apache.org/repos/asf/hbase/blob/5fcadb86/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java
index 0e9f42e..09cd61e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java
@@ -22,24 +22,26 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+import org.apache.hadoop.util.Daemon;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -47,9 +49,15 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
+import io.netty.channel.EventLoop;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
 @Category({ MiscTests.class, MediumTests.class })
 public class TestFanOutOneBlockAsyncDFSOutput {
 
+  private static final Log LOG = 
LogFactory.getLog(TestFanOutOneBlockAsyncDFSOutput.class);
+
   private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
 
   private static DistributedFileSystem FS;
@@ -63,8 +71,6 @@ public class TestFanOutOneBlockAsyncDFSOutput {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    
Logger.getLogger("org.apache.hadoop.hdfs.StateChange").setLevel(Level.DEBUG);
-    Logger.getLogger("BlockStateChange").setLevel(Level.DEBUG);
     TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 
READ_TIMEOUT_MS);
     TEST_UTIL.startMiniDFSCluster(3);
     FS = TEST_UTIL.getDFSCluster().getFileSystem();
@@ -79,12 +85,28 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     TEST_UTIL.shutdownMiniDFSCluster();
   }
 
+  private void ensureAllDatanodeAlive() throws InterruptedException {
+    // FanOutOneBlockAsyncDFSOutputHelper.createOutput is fail-fast, so we 
need to make sure that we
+    // can create a FanOutOneBlockAsyncDFSOutput after a datanode restarting, 
otherwise some tests
+    // will fail.
+    for (;;) {
+      try {
+        FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
+          new Path("/ensureDatanodeAlive"), true, true, (short) 3, 
FS.getDefaultBlockSize(),
+          EVENT_LOOP_GROUP.next());
+        out.close();
+        break;
+      } catch (IOException e) {
+        Thread.sleep(100);
+      }
+    }
+  }
+
   private void writeAndVerify(EventLoop eventLoop, Path f, final 
FanOutOneBlockAsyncDFSOutput out)
       throws IOException, InterruptedException, ExecutionException {
     final byte[] b = new byte[10];
     ThreadLocalRandom.current().nextBytes(b);
-    final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
-        new FanOutOneBlockAsyncDFSOutputFlushHandler();
+    final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new 
FanOutOneBlockAsyncDFSOutputFlushHandler();
     eventLoop.execute(new Runnable() {
 
       @Override
@@ -107,9 +129,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
   public void test() throws IOException, InterruptedException, 
ExecutionException {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-    final FanOutOneBlockAsyncDFSOutput out =
-        FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, 
(short) 3,
-          FS.getDefaultBlockSize(), eventLoop);
+    final FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
+      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
     writeAndVerify(eventLoop, f, out);
   }
 
@@ -117,13 +138,11 @@ public class TestFanOutOneBlockAsyncDFSOutput {
   public void testRecover() throws IOException, InterruptedException, 
ExecutionException {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-    final FanOutOneBlockAsyncDFSOutput out =
-        FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, 
(short) 3,
-          FS.getDefaultBlockSize(), eventLoop);
+    final FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
+      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
     final byte[] b = new byte[10];
     ThreadLocalRandom.current().nextBytes(b);
-    final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
-        new FanOutOneBlockAsyncDFSOutputFlushHandler();
+    final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new 
FanOutOneBlockAsyncDFSOutputFlushHandler();
     eventLoop.execute(new Runnable() {
 
       @Override
@@ -135,38 +154,41 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     handler.get();
     // restart one datanode which causes one connection broken
     TEST_UTIL.getDFSCluster().restartDataNode(0);
-    handler.reset();
-    eventLoop.execute(new Runnable() {
-
-      @Override
-      public void run() {
-        out.write(b, 0, b.length);
-        out.flush(null, handler, false);
-      }
-    });
     try {
-      handler.get();
-      fail("flush should fail");
-    } catch (ExecutionException e) {
-      // we restarted one datanode so the flush should fail
-      e.printStackTrace();
-    }
-    out.recoverAndClose(null);
-    assertEquals(b.length, FS.getFileStatus(f).getLen());
-    byte[] actual = new byte[b.length];
-    try (FSDataInputStream in = FS.open(f)) {
-      in.readFully(actual);
+      handler.reset();
+      eventLoop.execute(new Runnable() {
+
+        @Override
+        public void run() {
+          out.write(b, 0, b.length);
+          out.flush(null, handler, false);
+        }
+      });
+      try {
+        handler.get();
+        fail("flush should fail");
+      } catch (ExecutionException e) {
+        // we restarted one datanode so the flush should fail
+        LOG.info("expected exception caught", e);
+      }
+      out.recoverAndClose(null);
+      assertEquals(b.length, FS.getFileStatus(f).getLen());
+      byte[] actual = new byte[b.length];
+      try (FSDataInputStream in = FS.open(f)) {
+        in.readFully(actual);
+      }
+      assertArrayEquals(b, actual);
+    } finally {
+      ensureAllDatanodeAlive();
     }
-    assertArrayEquals(b, actual);
   }
 
   @Test
   public void testHeartbeat() throws IOException, InterruptedException, 
ExecutionException {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-    final FanOutOneBlockAsyncDFSOutput out =
-        FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, 
(short) 3,
-          FS.getDefaultBlockSize(), eventLoop);
+    final FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
+      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
     Thread.sleep(READ_TIMEOUT_MS * 2);
     // the connection to datanode should still alive.
     writeAndVerify(eventLoop, f, out);
@@ -184,7 +206,40 @@ public class TestFanOutOneBlockAsyncDFSOutput {
         FS.getDefaultBlockSize(), eventLoop);
       fail("should fail with parent does not exist");
     } catch (RemoteException e) {
+      LOG.info("expected exception caught", e);
       assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException);
     }
   }
+
+  @Test
+  public void testConnectToDatanodeFailed()
+      throws IOException, ClassNotFoundException, NoSuchMethodException, 
IllegalAccessException,
+      InvocationTargetException, InterruptedException, NoSuchFieldException {
+    Field xceiverServerDaemonField = 
DataNode.class.getDeclaredField("dataXceiverServer");
+    xceiverServerDaemonField.setAccessible(true);
+    Class<?> xceiverServerClass = Class
+        .forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
+    Method numPeersMethod = 
xceiverServerClass.getDeclaredMethod("getNumPeers");
+    numPeersMethod.setAccessible(true);
+    // make one datanode broken
+    TEST_UTIL.getDFSCluster().getDataNodes().get(0).shutdownDatanode(true);
+    try {
+      Path f = new Path("/test");
+      EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+      try {
+        FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, 
(short) 3,
+          FS.getDefaultBlockSize(), eventLoop);
+        fail("should fail with connection error");
+      } catch (IOException e) {
+        LOG.info("expected exception caught", e);
+      }
+      for (DataNode dn : TEST_UTIL.getDFSCluster().getDataNodes()) {
+        Daemon daemon = (Daemon) xceiverServerDaemonField.get(dn);
+        assertEquals(0, numPeersMethod.invoke(daemon.getRunnable()));
+      }
+    } finally {
+      TEST_UTIL.getDFSCluster().restartDataNode(0);
+      ensureAllDatanodeAlive();
+    }
+  }
 }

Reply via email to