Repository: tajo Updated Branches: refs/heads/master c6ad554e5 -> 3b0b68375
TAJO-1966: Decrease memory usage of TajoTestingCluster. Closes #850 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/3b0b6837 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/3b0b6837 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/3b0b6837 Branch: refs/heads/master Commit: 3b0b68375245c5123f9891e0f25bcbe15e9a2a20 Parents: c6ad554 Author: Jinho Kim <[email protected]> Authored: Fri Nov 6 12:11:48 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Fri Nov 6 12:11:48 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 ++ .../apache/tajo/catalog/store/DerbyStore.java | 4 +-- .../org/apache/tajo/CatalogTestingUtil.java | 26 +++++++++++++++ .../java/org/apache/tajo/QueryTestCaseBase.java | 19 ++++++++--- .../org/apache/tajo/TajoTestingCluster.java | 27 +++++++++++++--- .../org/apache/tajo/storage/BufferPool.java | 33 +++++++++++++++++++- .../org/apache/tajo/util/JvmPauseMonitor.java | 1 + .../java/org/apache/tajo/worker/Fetcher.java | 3 +- tajo-project/pom.xml | 2 +- .../tajo/pullserver/TajoPullServerService.java | 3 +- .../org/apache/tajo/rpc/NettyServerBase.java | 8 ++--- .../java/org/apache/tajo/rpc/NettyUtils.java | 21 +++++++++++-- .../java/org/apache/tajo/rpc/RpcConstants.java | 3 ++ .../org/apache/tajo/rpc/NettyClientBase.java | 3 +- .../apache/tajo/storage/TablespaceManager.java | 12 +++++-- 15 files changed, 139 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index ed96bb0..4be9ebf 100644 --- a/CHANGES +++ b/CHANGES @@ -8,6 +8,8 @@ Release 0.12.0 - unreleased IMPROVEMENT + TAJO-1966: Decrease memory usage of TajoTestingCluster. (jinho) + TAJO-1941: PermGen elimination in JDK 8. (Dongkyu Hwangbo via hyunsik) TAJO-1920: Calling 'Collection.toArray()' with zero-length array argument http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java index 5e3ff49..1678472 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java @@ -35,8 +35,8 @@ import java.sql.Statement; public class DerbyStore extends AbstractDBStore { private static final Log LOG = LogFactory.getLog(DerbyStore.class); - - private static final String CATALOG_DRIVER="org.apache.derby.jdbc.EmbeddedDriver"; + public static final String NAME = "derby"; + public static final String CATALOG_DRIVER="org.apache.derby.jdbc.EmbeddedDriver"; protected String getCatalogDriverName(){ return CATALOG_DRIVER; http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java ---------------------------------------------------------------------- diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java index 3f45ede..e601f25 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java @@ -21,11 +21,16 @@ package org.apache.tajo; import org.apache.tajo.annotation.NotNull; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.CatalogConstants; +import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.store.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.exception.UnsupportedCatalogStore; +import java.net.URI; +import java.sql.Connection; +import java.sql.DriverManager; + public class CatalogTestingUtil { public static TajoConf configureCatalog(TajoConf conf, String testDirPath) throws UnsupportedCatalogStore { @@ -39,6 +44,27 @@ public class CatalogTestingUtil { return conf; } + /** + * cleanup external catalog store resources + */ + public static void shutdownCatalogStore(TajoConf conf) throws Exception { + String catalogUri = conf.get(CatalogConstants.CATALOG_URI); + URI uri = new URI(catalogUri); + String[] schemeSpecificPart = uri.getSchemeSpecificPart().split(":"); + + if(DerbyStore.NAME.equals(schemeSpecificPart[0]) && "memory".equals(schemeSpecificPart[1])) { + Connection conn = null; + try { + // Removing an in-memory database. + String removingUri = catalogUri.split(";")[0] + ";drop=true"; + Class.forName(DerbyStore.CATALOG_DRIVER).newInstance(); + conn = DriverManager.getConnection(removingUri); + } finally { + CatalogUtil.closeQuietly(conn); + } + } + } + static <T extends CatalogStore> boolean requireAuth(Class<T> clazz) { return clazz.equals(MySQLStore.class) || clazz.equals(MariaDBStore.class) || http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java index ffa2dab..0639a76 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -38,7 +38,6 @@ import org.apache.tajo.cli.tsql.ParsedResult; import org.apache.tajo.cli.tsql.SimpleParser; import org.apache.tajo.client.TajoClient; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.parser.sql.SQLAnalyzer; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.InsufficientPrivilegeException; import org.apache.tajo.exception.TajoException; @@ -46,12 +45,14 @@ import org.apache.tajo.exception.UndefinedTableException; import org.apache.tajo.jdbc.FetchResultSet; import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.master.GlobalEngine; +import org.apache.tajo.parser.sql.SQLAnalyzer; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.verifier.LogicalPlanVerifier; import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier; import org.apache.tajo.plan.verifier.VerificationState; +import org.apache.tajo.storage.BufferPool; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.FileUtil; import org.junit.AfterClass; @@ -67,6 +68,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.lang.annotation.*; +import java.lang.management.BufferPoolMXBean; import java.lang.reflect.Method; import java.net.URL; import java.sql.ResultSet; @@ -248,9 +250,18 @@ public class QueryTestCaseBase { @Before public void printTestName() { /* protect a travis stalled build */ - System.out.println("Run: " + name.getMethodName() + - " Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) - / (1024 * 1024)) + " MBytes, Active Threads:" + Thread.activeCount()); + BufferPoolMXBean direct = BufferPool.getDirectBufferPool(); + BufferPoolMXBean mapped = BufferPool.getMappedBufferPool(); + System.out.println(String.format("Used heap: %s/%s, direct:%s/%s, mapped:%s/%s, Active Threads: %d, Run: %s.%s", + FileUtil.humanReadableByteCount(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(), false), + FileUtil.humanReadableByteCount(Runtime.getRuntime().maxMemory(), false), + FileUtil.humanReadableByteCount(direct.getMemoryUsed(), false), + FileUtil.humanReadableByteCount(direct.getTotalCapacity(), false), + FileUtil.humanReadableByteCount(mapped.getMemoryUsed(), false), + FileUtil.humanReadableByteCount(mapped.getTotalCapacity(), false), + Thread.activeCount(), + getClass().getSimpleName(), + name.getMethodName())); } public QueryTestCaseBase() { http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java index a4fff57..046a224 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -130,7 +130,7 @@ public class TajoTestingCluster { conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES.varname, 4); conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 2000); conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM.varname, 3); - conf.setInt(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM.varname, 2); + conf.setInt(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM.varname, 1); // Client API RPC conf.setIntVar(ConfVars.RPC_CLIENT_WORKER_THREAD_NUM, 2); @@ -160,6 +160,19 @@ public class TajoTestingCluster { // Query output file conf.setVar(ConfVars.QUERY_OUTPUT_DEFAULT_FILE_FORMAT, BuiltinStorages.DRAW); + /** decrease Hbase thread and memory cache for testing */ + //server handler + conf.setInt("hbase.regionserver.handler.count", 5); + //client handler + conf.setInt("hbase.hconnection.threads.core", 5); + conf.setInt("hbase.hconnection.threads.max", 10); + conf.setInt("hbase.hconnection.meta.lookup.threads.core", 5); + conf.setInt("hbase.hconnection.meta.lookup.threads.max", 10); + + //memory cache + conf.setFloat("hfile.block.cache.size", 0.0f); //disable cache + conf.setBoolean("hbase.bucketcache.combinedcache.enabled", false); + /* Since Travis CI limits the size of standard output log up to 4MB */ if (!StringUtils.isEmpty(LOG_LEVEL)) { Level defaultLevel = Logger.getRootLogger().getLevel(); @@ -306,6 +319,13 @@ public class TajoTestingCluster { } public void shutdownCatalogCluster() { + + try { + CatalogTestingUtil.shutdownCatalogStore(conf); + } catch (Exception e) { + //ignore + } + if (catalogServer != null) { this.catalogServer.stop(); } @@ -489,7 +509,7 @@ public class TajoTestingCluster { public void startMiniCluster(final int numSlaves, final String [] dataNodeHosts) throws Exception { int numDataNodes = numSlaves; - if(dataNodeHosts != null && dataNodeHosts.length != 0) { + if (dataNodeHosts != null && dataNodeHosts.length != 0) { numDataNodes = dataNodeHosts.length; } @@ -504,9 +524,6 @@ public class TajoTestingCluster { startMiniDFSCluster(numDataNodes, clusterTestBuildDir, dataNodeHosts); this.dfsCluster.waitClusterUp(); - - conf.setInt("hbase.hconnection.threads.core", 5); - conf.setInt("hbase.hconnection.threads.max", 50); hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir); startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false); http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java index 403d789..97ac7b4 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java @@ -25,9 +25,12 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.tajo.TajoConstants; import org.apache.tajo.conf.TajoConf; +import java.lang.management.BufferPoolMXBean; +import java.lang.management.ManagementFactory; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.List; /* this class is PooledBuffer holder */ public class BufferPool { @@ -133,10 +136,38 @@ public class BufferPool { } /** - * deallocate the specified direct + * deallocate the specified direct memory * @param byteBuffer */ public static void free(ByteBuffer byteBuffer) { PlatformDependent.freeDirectBuffer(byteBuffer); } + + /** + * get the specified direct memory bean + */ + public static BufferPoolMXBean getDirectBufferPool() { + for (BufferPoolMXBean pool : getBufferPools()) { + if (pool.getName().equals("direct")) { + return pool; + } + } + return null; + } + + /** + * get the specified mapped memory bean + */ + public static BufferPoolMXBean getMappedBufferPool() { + for (BufferPoolMXBean pool : getBufferPools()) { + if (pool.getName().equals("mapped")) { + return pool; + } + } + return null; + } + + private static List<BufferPoolMXBean> getBufferPools() { + return ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java b/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java index 3ec6c40..8939bda 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java @@ -211,6 +211,7 @@ public class JvmPauseMonitor { * log messages about the GC pauses. */ public static void main(String []args) throws Exception { + Thread.setDefaultUncaughtExceptionHandler(new TajoUncaughtExceptionHandler()); new JvmPauseMonitor(new Configuration()).start(); List<String> list = Lists.newArrayList(); int i = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java index 71d30cd..2e1639f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java @@ -20,7 +20,6 @@ package org.apache.tajo.worker; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.*; @@ -92,7 +91,7 @@ public class Fetcher { NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER, conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM))) .channel(NioSocketChannel.class) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000) .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-project/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml index ab7370c..84aeffd 100644 --- a/tajo-project/pom.xml +++ b/tajo-project/pom.xml @@ -38,7 +38,7 @@ <tajo.version>0.12.0-SNAPSHOT</tajo.version> <hbase.version>1.1.1</hbase.version> <hive.version>1.1.0</hive.version> - <netty.version>4.0.29.Final</netty.version> + <netty.version>4.0.33.Final</netty.version> <jersey.version>2.6</jersey.version> <jetty.version>6.1.14</jetty.version> <tajo.root>${project.parent.relativePath}/..</tajo.root> http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 4a4ae53..b3ab85d 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -20,7 +20,6 @@ package org.apache.tajo.pullserver; import com.google.common.collect.Lists; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; @@ -189,7 +188,7 @@ public class TajoPullServerService extends AbstractService { selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum) .option(ChannelOption.TCP_NODELAY, true) - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR) .childOption(ChannelOption.TCP_NODELAY, true); localFS = new LocalFileSystem(); http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java index 2c154bf..258afe8 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java @@ -18,11 +18,7 @@ package org.apache.tajo.rpc; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; @@ -31,6 +27,8 @@ import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.GlobalEventExecutor; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import java.io.IOException; import java.net.DatagramSocket; @@ -87,7 +85,7 @@ public class NettyServerBase { .childHandler(initializer) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.TCP_NODELAY, true) - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .childOption(ChannelOption.SO_RCVBUF, 1048576 * 10); http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java index 90b337a..4734b79 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java @@ -20,8 +20,12 @@ package org.apache.tajo.rpc; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.ResourceLeakDetector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,9 +38,10 @@ public final class NettyUtils { private static final Log LOG = LogFactory.getLog(NettyUtils.class); private static final int DEFAULT_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2; - private static final Object lockObjectForLoopGroup = new Object(); - private static AtomicInteger serverCount = new AtomicInteger(0); + private static final AtomicInteger serverCount = new AtomicInteger(0); + + public static final ByteBufAllocator ALLOCATOR; public enum GROUP { DEFAULT, @@ -46,6 +51,18 @@ public final class NettyUtils { private static final Map<GROUP, EventLoopGroup> eventLoopGroupMap = new ConcurrentHashMap<>(); + static { + if (RpcConstants.IS_TEST_MODE) { + /* Disable pooling buffers for memory usage */ + ALLOCATOR = UnpooledByteBufAllocator.DEFAULT; + + /* if you are finding memory leak, please enable this line */ + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + } else { + ALLOCATOR = PooledByteBufAllocator.DEFAULT; + } + } + private NettyUtils(){ } http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java index 601f3d2..95e5ae4 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java @@ -47,4 +47,7 @@ public class RpcConstants { public static final String CLIENT_HANG_DETECTION = "tajo.rpc.client.hang-detection"; public final static boolean CLIENT_HANG_DETECTION_DEFAULT = false; + + public static final String TEST_KEY = "tajo.test.enabled"; + public static final boolean IS_TEST_MODE = Boolean.parseBoolean(System.getProperty(TEST_KEY, "false")); } http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index 6008c4c..74ed3a0 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -23,7 +23,6 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleState; @@ -94,7 +93,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable .group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(initializer) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connTimeoutMillis) .option(ChannelOption.SO_RCVBUF, 1048576 * 10) http://git-wip-us.apache.org/repos/asf/tajo/blob/3b0b6837/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java index 569f1a2..b1e3275 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java @@ -210,7 +210,7 @@ public class TablespaceManager implements StorageService { registerTableSpace(spaceName, uri, configs, true, override); } - private static void registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc, + private static Tablespace registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc, boolean visible, boolean override) { Tablespace tableSpace = initializeTableSpace(spaceName, uri, spaceDesc); tableSpace.setVisible(visible); @@ -232,6 +232,7 @@ public class TablespaceManager implements StorageService { registerTableSpace(tmpName, rootUri, spaceDesc, false, override); } } + return tableSpace; } private static void putTablespace(Tablespace space, boolean override) { @@ -313,7 +314,14 @@ public class TablespaceManager implements StorageService { existing = TABLE_SPACES.remove(space.getUri()); // Add anotherone for test - registerTableSpace(space.name, space.uri, space.getConfig(), true, true); + Tablespace tablespace = registerTableSpace(space.name, space.uri, space.getConfig(), true, true); + try { + //override conf for test + if (space.conf != null) + tablespace.init(space.conf); + } catch (IOException e) { + throw new RuntimeException(e); + } } // if there is an existing one, return it. return Optional.ofNullable(existing);
