Repository: tajo Updated Branches: refs/heads/master c1beaa71c -> b1959116a
TAJO-2014: TestRpcClientManager fails occasionally. Closes #905 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b1959116 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b1959116 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b1959116 Branch: refs/heads/master Commit: b1959116a7c129bc7de4156c366c81cebc14958a Parents: c1beaa7 Author: Jinho Kim <[email protected]> Authored: Wed Dec 9 18:17:38 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Wed Dec 9 18:17:38 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/rpc/NettyServerBase.java | 13 ++-- .../org/apache/tajo/rpc/RpcClientManager.java | 10 +-- .../apache/tajo/rpc/TestRpcClientManager.java | 79 +++++++++----------- .../apache/tajo/storage/TestFileTablespace.java | 24 +++--- .../tajo/storage/raw/TestDirectRawFile.java | 8 +- 6 files changed, 64 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index a7a7966..6f20d8d 100644 --- a/CHANGES +++ b/CHANGES @@ -57,6 +57,8 @@ Release 0.12.0 - unreleased BUG FIXES + TAJO-2014: TestRpcClientManager fails occasionally. (jinho) + TAJO-2000: BSTIndex can cause OOM. (jinho) TAJO-1992: \set timezone in cli doesn't work because of casesensitive (DaeMyung) http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java index 08d877f..5d1ad26 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java @@ -26,6 +26,7 @@ import io.netty.channel.ChannelOption; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -140,22 +141,22 @@ public class NettyServerBase { for (RpcEventListener listener: listeners) { listener.onBeforeShutdown(this); } - + try { accepted.close(); - if(bootstrap != null) { + if (bootstrap != null) { if (bootstrap.childGroup() != null) { - bootstrap.childGroup().shutdownGracefully(); + Future future = bootstrap.childGroup().shutdownGracefully(); if (waitUntilThreadsStop) { - bootstrap.childGroup().terminationFuture().sync(); + future.sync(); } } if (bootstrap.group() != null) { - bootstrap.group().shutdownGracefully(); + Future future = bootstrap.group().shutdownGracefully(); if (waitUntilThreadsStop) { - bootstrap.childGroup().terminationFuture().sync(); + future.sync(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java index 032cf35..67c5936 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java @@ -18,10 +18,10 @@ package org.apache.tajo.rpc; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.internal.logging.CommonsLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory; import org.apache.commons.logging.Log; @@ -106,7 +106,7 @@ public class RpcClientManager { public void channelRegistered(ChannelHandlerContext ctx) { /* Register client to managed map */ clients.put(target.getKey(), target); - target.getChannel().closeFuture().addListener(new ClientCloseFutureListener(target.getKey())); + ctx.channel().closeFuture().addListener(new ClientCloseFutureListener(target.getKey())); } @Override @@ -210,7 +210,7 @@ public class RpcClientManager { } } - static class ClientCloseFutureListener implements GenericFutureListener { + static class ClientCloseFutureListener implements ChannelFutureListener { private RpcConnectionKey key; public ClientCloseFutureListener(RpcConnectionKey key) { @@ -218,7 +218,7 @@ public class RpcClientManager { } @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(ChannelFuture future) throws Exception { clients.remove(key); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java index 865e5dd..2b50c1f 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java @@ -20,6 +20,8 @@ package org.apache.tajo.rpc; import org.apache.tajo.rpc.test.DummyProtocol; import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import java.net.InetSocketAddress; @@ -34,15 +36,27 @@ import static org.junit.Assert.*; public class TestRpcClientManager { + static NettyServerBase server; + + @BeforeClass + public static void setupClass() throws Exception { + final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl(); + server = new AsyncRpcServer(DummyProtocol.class, + service, new InetSocketAddress("127.0.0.1", 0), 10); + server.start(); + + } + + @AfterClass + public static void afterClass(){ + server.shutdown(true); + } + @Test public void testRaceCondition() throws Exception { final int parallelCount = 50; - final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl(); ExecutorService executor = Executors.newFixedThreadPool(parallelCount); - NettyServerBase server = new AsyncRpcServer(DummyProtocol.class, - service, new InetSocketAddress("127.0.0.1", 0), parallelCount); - server.start(); try { final InetSocketAddress address = server.getListenAddress(); final RpcClientManager manager = RpcClientManager.getInstance(); @@ -54,7 +68,7 @@ public class TestRpcClientManager { public void run() { NettyClientBase client = null; try { - client = manager.getClient(address, DummyProtocol.class, false, new Properties()); + client = manager.getClient(address, DummyProtocol.class, true, new Properties()); } catch (Throwable e) { fail(e.getMessage()); } @@ -68,10 +82,9 @@ public class TestRpcClientManager { future.get(); } - NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false, new Properties()); + NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, true, new Properties()); RpcClientManager.cleanup(clientBase); } finally { - server.shutdown(true); executor.shutdown(); RpcClientManager.close(); } @@ -79,10 +92,6 @@ public class TestRpcClientManager { @Test public void testClientCloseEvent() throws Exception { - final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl(); - NettyServerBase server = new AsyncRpcServer(DummyProtocol.class, - service, new InetSocketAddress("127.0.0.1", 0), 3); - server.start(); RpcClientManager manager = RpcClientManager.getInstance(); try { @@ -97,18 +106,13 @@ public class TestRpcClientManager { client.close(); assertFalse(RpcClientManager.contains(key)); } finally { - server.shutdown(true); RpcClientManager.close(); } } @Test public void testClientCloseEventWithReconnect() throws Exception { - final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl(); - NettyServerBase server = new AsyncRpcServer(DummyProtocol.class, - service, new InetSocketAddress("127.0.0.1", 0), 3); - server.start(); - int repeat = 10; + int repeat = 100; RpcClientManager manager = RpcClientManager.getInstance(); try { @@ -133,46 +137,37 @@ public class TestRpcClientManager { assertFalse(RpcClientManager.contains(key)); } } finally { - server.shutdown(true); RpcClientManager.close(); } } @Test public void testUnManagedClient() throws Exception { - final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl(); - NettyServerBase server = new AsyncRpcServer(DummyProtocol.class, - service, new InetSocketAddress("127.0.0.1", 0), 3); - server.start(); RpcConnectionKey key = new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); RpcClientManager.close(); RpcClientManager manager = RpcClientManager.getInstance(); - try { - NettyClientBase client1 = manager.newClient(key, new Properties()); - assertTrue(client1.isConnected()); - assertFalse(RpcClientManager.contains(key)); + NettyClientBase client1 = manager.newClient(key, new Properties()); + assertTrue(client1.isConnected()); + assertFalse(RpcClientManager.contains(key)); - NettyClientBase client2 = manager.newClient(key, new Properties()); - assertTrue(client2.isConnected()); - assertFalse(RpcClientManager.contains(key)); + NettyClientBase client2 = manager.newClient(key, new Properties()); + assertTrue(client2.isConnected()); + assertFalse(RpcClientManager.contains(key)); - assertEquals(client1.getRemoteAddress(), client2.getRemoteAddress()); - assertNotEquals(client1.getChannel(), client2.getChannel()); + assertEquals(client1.getRemoteAddress(), client2.getRemoteAddress()); + assertNotEquals(client1.getChannel(), client2.getChannel()); - client1.close(); - assertFalse(client1.isConnected()); - assertTrue(client2.isConnected()); + client1.close(); + assertFalse(client1.isConnected()); + assertTrue(client2.isConnected()); - client1.connect(); - client2.close(); - assertFalse(client2.isConnected()); - assertTrue(client1.isConnected()); + client1.connect(); + client2.close(); + assertFalse(client2.isConnected()); + assertTrue(client1.isConnected()); - RpcClientManager.cleanup(client1, client2); - } finally { - server.shutdown(true); - } + RpcClientManager.cleanup(client1, client2); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java index cd5bd32..d599a25 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java @@ -101,17 +101,17 @@ public class TestFileTablespace { assertEquals(4,i); } - @Test + @Test(timeout = 60000) public void testGetSplit() throws Exception { final Configuration conf = new HdfsConfiguration(); String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(1).build(); - cluster.waitClusterUp(); TajoConf tajoConf = new TajoConf(conf); tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo"); @@ -158,16 +158,16 @@ public class TestFileTablespace { } } - @Test + @Test(timeout = 60000) public void testZeroLengthSplit() throws Exception { final Configuration conf = new HdfsConfiguration(); String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(1).build(); - cluster.waitClusterUp(); TajoConf tajoConf = new TajoConf(conf); tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo"); @@ -207,17 +207,17 @@ public class TestFileTablespace { } } - @Test + @Test(timeout = 60000) public void testGetSplitWithBlockStorageLocationsBatching() throws Exception { final Configuration conf = new HdfsConfiguration(); String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2); conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(2).build(); - cluster.waitClusterUp(); TajoConf tajoConf = new TajoConf(conf); tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo"); @@ -258,17 +258,17 @@ public class TestFileTablespace { } } - @Test + @Test(timeout = 60000) public void testGetFileTablespace() throws Exception { final Configuration hdfsConf = new HdfsConfiguration(); String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + hdfsConf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(hdfsConf).numDataNodes(2).build(); - cluster.waitClusterUp(); + new MiniDFSCluster.Builder(hdfsConf).numDataNodes(1).build(); URI uri = URI.create(cluster.getFileSystem().getUri() + "/tajo"); Optional<Tablespace> existingTs = Optional.empty(); @@ -287,13 +287,7 @@ public class TestFileTablespace { space = (FileTablespace) TablespaceManager.getByName("testGetFileTablespace"); assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri()); - } finally { - - if (existingTs.isPresent()) { - TablespaceManager.addTableSpaceForTest(existingTs.get()); - } - cluster.shutdown(true); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b1959116/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java index bae81a9..179c9eb 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/raw/TestDirectRawFile.java @@ -100,6 +100,7 @@ public class TestDirectRawFile { String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false); MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf)); @@ -110,7 +111,6 @@ public class TestDirectRawFile { builder.waitSafeMode(true); cluster = builder.build(); - cluster.waitClusterUp(); dfs = cluster.getFileSystem(); localFs = FileSystem.getLocal(new TajoConf()); } @@ -168,7 +168,7 @@ public class TestDirectRawFile { return writeRowBlock(conf, meta, rowBlock, outputFile); } - @Test + @Test(timeout = 60000) public void testRWForAllTypesWithNextTuple() throws IOException { int rowNum = 10000; @@ -198,7 +198,7 @@ public class TestDirectRawFile { assertEquals(rowNum, j); } - @Test + @Test(timeout = 60000) public void testRepeatedScan() throws IOException { int rowNum = 2; @@ -226,7 +226,7 @@ public class TestDirectRawFile { reader.close(); } - @Test + @Test(timeout = 60000) public void testReset() throws IOException { int rowNum = 2;
