Repository: tajo Updated Branches: refs/heads/branch-0.11.1 0b32c1baf -> 24b8e891e
TAJO-1966: Decrease memory usage of TajoTestingCluster. Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/24b8e891 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/24b8e891 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/24b8e891 Branch: refs/heads/branch-0.11.1 Commit: 24b8e891e48d7b58aa1a3439c9f646d599edef9a Parents: 0b32c1b Author: Jinho Kim <[email protected]> Authored: Fri Nov 6 12:16:25 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Fri Nov 6 12:16:25 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 ++ .../apache/tajo/catalog/store/DerbyStore.java | 4 +-- .../org/apache/tajo/CatalogTestingUtil.java | 26 +++++++++++++++ .../java/org/apache/tajo/QueryTestCaseBase.java | 19 ++++++++--- .../org/apache/tajo/TajoTestingCluster.java | 27 +++++++++++++--- .../org/apache/tajo/storage/BufferPool.java | 33 +++++++++++++++++++- .../org/apache/tajo/util/JvmPauseMonitor.java | 1 + .../java/org/apache/tajo/worker/Fetcher.java | 3 +- tajo-project/pom.xml | 2 +- .../tajo/pullserver/TajoPullServerService.java | 3 +- .../org/apache/tajo/rpc/NettyServerBase.java | 8 ++--- .../java/org/apache/tajo/rpc/NettyUtils.java | 21 +++++++++++-- .../java/org/apache/tajo/rpc/RpcConstants.java | 3 ++ .../org/apache/tajo/rpc/NettyClientBase.java | 3 +- .../apache/tajo/storage/TablespaceManager.java | 12 +++++-- 15 files changed, 139 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index fa389b5..bf58419 100644 --- a/CHANGES +++ b/CHANGES @@ -7,6 +7,8 @@ Release 0.11.1 - unreleased IMPROVEMENT + TAJO-1966: Decrease memory usage of TajoTestingCluster. (jinho) + BUG FIXES http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java index 5e3ff49..1678472 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java @@ -35,8 +35,8 @@ import java.sql.Statement; public class DerbyStore extends AbstractDBStore { private static final Log LOG = LogFactory.getLog(DerbyStore.class); - - private static final String CATALOG_DRIVER="org.apache.derby.jdbc.EmbeddedDriver"; + public static final String NAME = "derby"; + public static final String CATALOG_DRIVER="org.apache.derby.jdbc.EmbeddedDriver"; protected String getCatalogDriverName(){ return CATALOG_DRIVER; http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java ---------------------------------------------------------------------- diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java index 3f45ede..e601f25 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java @@ -21,11 +21,16 @@ package org.apache.tajo; import org.apache.tajo.annotation.NotNull; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.CatalogConstants; +import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.store.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.exception.UnsupportedCatalogStore; +import java.net.URI; +import java.sql.Connection; +import java.sql.DriverManager; + public class CatalogTestingUtil { public static TajoConf configureCatalog(TajoConf conf, String testDirPath) throws UnsupportedCatalogStore { @@ -39,6 +44,27 @@ public class CatalogTestingUtil { return conf; } + /** + * cleanup external catalog store resources + */ + public static void shutdownCatalogStore(TajoConf conf) throws Exception { + String catalogUri = conf.get(CatalogConstants.CATALOG_URI); + URI uri = new URI(catalogUri); + String[] schemeSpecificPart = uri.getSchemeSpecificPart().split(":"); + + if(DerbyStore.NAME.equals(schemeSpecificPart[0]) && "memory".equals(schemeSpecificPart[1])) { + Connection conn = null; + try { + // Removing an in-memory database. + String removingUri = catalogUri.split(";")[0] + ";drop=true"; + Class.forName(DerbyStore.CATALOG_DRIVER).newInstance(); + conn = DriverManager.getConnection(removingUri); + } finally { + CatalogUtil.closeQuietly(conn); + } + } + } + static <T extends CatalogStore> boolean requireAuth(Class<T> clazz) { return clazz.equals(MySQLStore.class) || clazz.equals(MariaDBStore.class) || http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 2fde31c..179ed36 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -38,7 +38,6 @@ import org.apache.tajo.cli.tsql.ParsedResult; import org.apache.tajo.cli.tsql.SimpleParser; import org.apache.tajo.client.TajoClient; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.parser.sql.SQLAnalyzer; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.InsufficientPrivilegeException; import org.apache.tajo.exception.TajoException; @@ -46,12 +45,14 @@ import org.apache.tajo.exception.UndefinedTableException; import org.apache.tajo.jdbc.FetchResultSet; import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.master.GlobalEngine; +import org.apache.tajo.parser.sql.SQLAnalyzer; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.verifier.LogicalPlanVerifier; import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier; import org.apache.tajo.plan.verifier.VerificationState; +import org.apache.tajo.storage.BufferPool; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.FileUtil; import org.junit.AfterClass; @@ -67,6 +68,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.lang.annotation.*; +import java.lang.management.BufferPoolMXBean; import java.lang.reflect.Method; import java.net.URL; import java.sql.ResultSet; @@ -248,9 +250,18 @@ public class QueryTestCaseBase { @Before public void printTestName() { /* protect a travis stalled build */ - System.out.println("Run: " + name.getMethodName() + - " Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) - / (1024 * 1024)) + " MBytes, Active Threads:" + Thread.activeCount()); + BufferPoolMXBean direct = BufferPool.getDirectBufferPool(); + BufferPoolMXBean mapped = BufferPool.getMappedBufferPool(); + System.out.println(String.format("Used heap: %s/%s, direct:%s/%s, mapped:%s/%s, Active Threads: %d, Run: %s.%s", + FileUtil.humanReadableByteCount(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(), false), + FileUtil.humanReadableByteCount(Runtime.getRuntime().maxMemory(), false), + FileUtil.humanReadableByteCount(direct.getMemoryUsed(), false), + FileUtil.humanReadableByteCount(direct.getTotalCapacity(), false), + FileUtil.humanReadableByteCount(mapped.getMemoryUsed(), false), + FileUtil.humanReadableByteCount(mapped.getTotalCapacity(), false), + Thread.activeCount(), + getClass().getSimpleName(), + name.getMethodName())); } public QueryTestCaseBase() { http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java index 35e27b8..0871084 100644 --- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -131,7 +131,7 @@ public class TajoTestingCluster { conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES.varname, 4); conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 2000); conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM.varname, 3); - conf.setInt(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM.varname, 2); + conf.setInt(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM.varname, 1); // Client API RPC conf.setIntVar(ConfVars.RPC_CLIENT_WORKER_THREAD_NUM, 2); @@ -161,6 +161,19 @@ public class TajoTestingCluster { // Query output file conf.setVar(ConfVars.QUERY_OUTPUT_DEFAULT_FILE_FORMAT, BuiltinStorages.DRAW); + /** decrease Hbase thread and memory cache for testing */ + //server handler + conf.setInt("hbase.regionserver.handler.count", 5); + //client handler + conf.setInt("hbase.hconnection.threads.core", 5); + conf.setInt("hbase.hconnection.threads.max", 10); + conf.setInt("hbase.hconnection.meta.lookup.threads.core", 5); + conf.setInt("hbase.hconnection.meta.lookup.threads.max", 10); + + //memory cache + conf.setFloat("hfile.block.cache.size", 0.0f); //disable cache + conf.setBoolean("hbase.bucketcache.combinedcache.enabled", false); + /* Since Travis CI limits the size of standard output log up to 4MB */ if (!StringUtils.isEmpty(LOG_LEVEL)) { Level defaultLevel = Logger.getRootLogger().getLevel(); @@ -307,6 +320,13 @@ public class TajoTestingCluster { } public void shutdownCatalogCluster() { + + try { + CatalogTestingUtil.shutdownCatalogStore(conf); + } catch (Exception e) { + //ignore + } + if (catalogServer != null) { this.catalogServer.stop(); } @@ -490,7 +510,7 @@ public class TajoTestingCluster { public void startMiniCluster(final int numSlaves, final String [] dataNodeHosts) throws Exception { int numDataNodes = numSlaves; - if(dataNodeHosts != null && dataNodeHosts.length != 0) { + if (dataNodeHosts != null && dataNodeHosts.length != 0) { numDataNodes = dataNodeHosts.length; } @@ -505,9 +525,6 @@ public class TajoTestingCluster { startMiniDFSCluster(numDataNodes, clusterTestBuildDir, dataNodeHosts); this.dfsCluster.waitClusterUp(); - - conf.setInt("hbase.hconnection.threads.core", 5); - conf.setInt("hbase.hconnection.threads.max", 50); hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir); startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false); http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java index 403d789..97ac7b4 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java @@ -25,9 +25,12 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.tajo.TajoConstants; import org.apache.tajo.conf.TajoConf; +import java.lang.management.BufferPoolMXBean; +import java.lang.management.ManagementFactory; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.List; /* this class is PooledBuffer holder */ public class BufferPool { @@ -133,10 +136,38 @@ public class BufferPool { } /** - * deallocate the specified direct + * deallocate the specified direct memory * @param byteBuffer */ public static void free(ByteBuffer byteBuffer) { PlatformDependent.freeDirectBuffer(byteBuffer); } + + /** + * get the specified direct memory bean + */ + public static BufferPoolMXBean getDirectBufferPool() { + for (BufferPoolMXBean pool : getBufferPools()) { + if (pool.getName().equals("direct")) { + return pool; + } + } + return null; + } + + /** + * get the specified mapped memory bean + */ + public static BufferPoolMXBean getMappedBufferPool() { + for (BufferPoolMXBean pool : getBufferPools()) { + if (pool.getName().equals("mapped")) { + return pool; + } + } + return null; + } + + private static List<BufferPoolMXBean> getBufferPools() { + return ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java b/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java index 3ec6c40..8939bda 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java @@ -211,6 +211,7 @@ public class JvmPauseMonitor { * log messages about the GC pauses. */ public static void main(String []args) throws Exception { + Thread.setDefaultUncaughtExceptionHandler(new TajoUncaughtExceptionHandler()); new JvmPauseMonitor(new Configuration()).start(); List<String> list = Lists.newArrayList(); int i = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java index 71d30cd..2e1639f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java @@ -20,7 +20,6 @@ package org.apache.tajo.worker; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.*; @@ -92,7 +91,7 @@ public class Fetcher { NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER, conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM))) .channel(NioSocketChannel.class) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000) .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-project/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml index 62e314d..9861717 100644 --- a/tajo-project/pom.xml +++ b/tajo-project/pom.xml @@ -38,7 +38,7 @@ <tajo.version>0.11.1-SNAPSHOT</tajo.version> <hbase.version>1.1.1</hbase.version> <hive.version>1.1.0</hive.version> - <netty.version>4.0.29.Final</netty.version> + <netty.version>4.0.33.Final</netty.version> <jersey.version>2.6</jersey.version> <jetty.version>6.1.14</jetty.version> <tajo.root>${project.parent.relativePath}/..</tajo.root> http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 6a9deb5..47a92c4 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -20,7 +20,6 @@ package org.apache.tajo.pullserver; import com.google.common.collect.Lists; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; @@ -189,7 +188,7 @@ public class TajoPullServerService extends AbstractService { selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum) .option(ChannelOption.TCP_NODELAY, true) - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR) .childOption(ChannelOption.TCP_NODELAY, true); localFS = new LocalFileSystem(); http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java index 2c154bf..258afe8 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java @@ -18,11 +18,7 @@ package org.apache.tajo.rpc; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; @@ -31,6 +27,8 @@ import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.GlobalEventExecutor; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import java.io.IOException; import java.net.DatagramSocket; @@ -87,7 +85,7 @@ public class NettyServerBase { .childHandler(initializer) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.TCP_NODELAY, true) - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .childOption(ChannelOption.SO_RCVBUF, 1048576 * 10); http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java index 01fd48b..93c7fb2 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java @@ -20,8 +20,12 @@ package org.apache.tajo.rpc; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.ResourceLeakDetector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,9 +38,10 @@ public final class NettyUtils { private static final Log LOG = LogFactory.getLog(NettyUtils.class); private static final int DEFAULT_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2; - private static final Object lockObjectForLoopGroup = new Object(); - private static AtomicInteger serverCount = new AtomicInteger(0); + private static final AtomicInteger serverCount = new AtomicInteger(0); + + public static final ByteBufAllocator ALLOCATOR; public enum GROUP { DEFAULT, @@ -46,6 +51,18 @@ public final class NettyUtils { private static final Map<GROUP, EventLoopGroup> eventLoopGroupMap = new ConcurrentHashMap<GROUP, EventLoopGroup>(); + static { + if (RpcConstants.IS_TEST_MODE) { + /* Disable pooling buffers for memory usage */ + ALLOCATOR = UnpooledByteBufAllocator.DEFAULT; + + /* if you are finding memory leak, please enable this line */ + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + } else { + ALLOCATOR = PooledByteBufAllocator.DEFAULT; + } + } + private NettyUtils(){ } http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java index 601f3d2..95e5ae4 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java @@ -47,4 +47,7 @@ public class RpcConstants { public static final String CLIENT_HANG_DETECTION = "tajo.rpc.client.hang-detection"; public final static boolean CLIENT_HANG_DETECTION_DEFAULT = false; + + public static final String TEST_KEY = "tajo.test.enabled"; + public static final boolean IS_TEST_MODE = Boolean.parseBoolean(System.getProperty(TEST_KEY, "false")); } http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index 6008c4c..74ed3a0 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -23,7 +23,6 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleState; @@ -94,7 +93,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable .group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(initializer) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connTimeoutMillis) .option(ChannelOption.SO_RCVBUF, 1048576 * 10) http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java index 85d5d51..668b6c6 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java @@ -214,7 +214,7 @@ public class TablespaceManager implements StorageService { registerTableSpace(spaceName, uri, configs, true, override); } - private static void registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc, + private static Tablespace registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc, boolean visible, boolean override) { Tablespace tableSpace = initializeTableSpace(spaceName, uri, spaceDesc); tableSpace.setVisible(visible); @@ -236,6 +236,7 @@ public class TablespaceManager implements StorageService { registerTableSpace(tmpName, rootUri, spaceDesc, false, override); } } + return tableSpace; } private static void putTablespace(Tablespace space, boolean override) { @@ -317,7 +318,14 @@ public class TablespaceManager implements StorageService { existing = TABLE_SPACES.remove(space.getUri()); // Add anotherone for test - registerTableSpace(space.name, space.uri, space.getConfig(), true, true); + Tablespace tablespace = registerTableSpace(space.name, space.uri, space.getConfig(), true, true); + try { + //override conf for test + if (space.conf != null) + tablespace.init(space.conf); + } catch (IOException e) { + throw new RuntimeException(e); + } } // if there is an existing one, return it. return Optional.fromNullable(existing);
