Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.1 0b32c1baf -> 24b8e891e


TAJO-1966: Decrease memory usage of TajoTestingCluster.


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/24b8e891
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/24b8e891
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/24b8e891

Branch: refs/heads/branch-0.11.1
Commit: 24b8e891e48d7b58aa1a3439c9f646d599edef9a
Parents: 0b32c1b
Author: Jinho Kim <[email protected]>
Authored: Fri Nov 6 12:16:25 2015 +0900
Committer: Jinho Kim <[email protected]>
Committed: Fri Nov 6 12:16:25 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 ++
 .../apache/tajo/catalog/store/DerbyStore.java   |  4 +--
 .../org/apache/tajo/CatalogTestingUtil.java     | 26 +++++++++++++++
 .../java/org/apache/tajo/QueryTestCaseBase.java | 19 ++++++++---
 .../org/apache/tajo/TajoTestingCluster.java     | 27 +++++++++++++---
 .../org/apache/tajo/storage/BufferPool.java     | 33 +++++++++++++++++++-
 .../org/apache/tajo/util/JvmPauseMonitor.java   |  1 +
 .../java/org/apache/tajo/worker/Fetcher.java    |  3 +-
 tajo-project/pom.xml                            |  2 +-
 .../tajo/pullserver/TajoPullServerService.java  |  3 +-
 .../org/apache/tajo/rpc/NettyServerBase.java    |  8 ++---
 .../java/org/apache/tajo/rpc/NettyUtils.java    | 21 +++++++++++--
 .../java/org/apache/tajo/rpc/RpcConstants.java  |  3 ++
 .../org/apache/tajo/rpc/NettyClientBase.java    |  3 +-
 .../apache/tajo/storage/TablespaceManager.java  | 12 +++++--
 15 files changed, 139 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index fa389b5..bf58419 100644
--- a/CHANGES
+++ b/CHANGES
@@ -7,6 +7,8 @@ Release 0.11.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1966: Decrease memory usage of TajoTestingCluster. (jinho)
+
 
   BUG FIXES
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
index 5e3ff49..1678472 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
@@ -35,8 +35,8 @@ import java.sql.Statement;
 public class DerbyStore extends AbstractDBStore {
 
   private static final Log LOG = LogFactory.getLog(DerbyStore.class);
-
-  private static final String 
CATALOG_DRIVER="org.apache.derby.jdbc.EmbeddedDriver";
+  public static final String NAME = "derby";
+  public static final String 
CATALOG_DRIVER="org.apache.derby.jdbc.EmbeddedDriver";
 
   protected String getCatalogDriverName(){
     return CATALOG_DRIVER;

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java 
b/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java
index 3f45ede..e601f25 100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/CatalogTestingUtil.java
@@ -21,11 +21,16 @@ package org.apache.tajo;
 import org.apache.tajo.annotation.NotNull;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.CatalogConstants;
+import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.store.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.exception.UnsupportedCatalogStore;
 
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.DriverManager;
+
 public class CatalogTestingUtil {
 
   public static TajoConf configureCatalog(TajoConf conf, String testDirPath) 
throws UnsupportedCatalogStore {
@@ -39,6 +44,27 @@ public class CatalogTestingUtil {
     return conf;
   }
 
+  /**
+   * cleanup external catalog store resources
+   */
+  public static void shutdownCatalogStore(TajoConf conf) throws Exception {
+    String catalogUri = conf.get(CatalogConstants.CATALOG_URI);
+    URI uri = new URI(catalogUri);
+    String[] schemeSpecificPart = uri.getSchemeSpecificPart().split(":");
+
+    if(DerbyStore.NAME.equals(schemeSpecificPart[0]) && 
"memory".equals(schemeSpecificPart[1])) {
+      Connection conn = null;
+      try {
+        // Removing an in-memory database.
+        String removingUri = catalogUri.split(";")[0] + ";drop=true";
+        Class.forName(DerbyStore.CATALOG_DRIVER).newInstance();
+        conn = DriverManager.getConnection(removingUri);
+      } finally {
+        CatalogUtil.closeQuietly(conn);
+      }
+    }
+  }
+
   static <T extends CatalogStore> boolean requireAuth(Class<T> clazz) {
     return clazz.equals(MySQLStore.class) ||
         clazz.equals(MariaDBStore.class) ||

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git 
a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java 
b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 2fde31c..179ed36 100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -38,7 +38,6 @@ import org.apache.tajo.cli.tsql.ParsedResult;
 import org.apache.tajo.cli.tsql.SimpleParser;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.parser.sql.SQLAnalyzer;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.InsufficientPrivilegeException;
 import org.apache.tajo.exception.TajoException;
@@ -46,12 +45,14 @@ import org.apache.tajo.exception.UndefinedTableException;
 import org.apache.tajo.jdbc.FetchResultSet;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
 import org.apache.tajo.master.GlobalEngine;
+import org.apache.tajo.parser.sql.SQLAnalyzer;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.VerificationState;
+import org.apache.tajo.storage.BufferPool;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.FileUtil;
 import org.junit.AfterClass;
@@ -67,6 +68,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.annotation.*;
+import java.lang.management.BufferPoolMXBean;
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.sql.ResultSet;
@@ -248,9 +250,18 @@ public class QueryTestCaseBase {
   @Before
   public void printTestName() {
     /* protect a travis stalled build */
-    System.out.println("Run: " + name.getMethodName() +
-        " Used memory: " + ((Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory())
-        / (1024 * 1024)) + " MBytes, Active Threads:" + Thread.activeCount());
+    BufferPoolMXBean direct = BufferPool.getDirectBufferPool();
+    BufferPoolMXBean mapped = BufferPool.getMappedBufferPool();
+    System.out.println(String.format("Used heap: %s/%s, direct:%s/%s, 
mapped:%s/%s, Active Threads: %d, Run: %s.%s",
+        FileUtil.humanReadableByteCount(Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory(), false),
+        FileUtil.humanReadableByteCount(Runtime.getRuntime().maxMemory(), 
false),
+        FileUtil.humanReadableByteCount(direct.getMemoryUsed(), false),
+        FileUtil.humanReadableByteCount(direct.getTotalCapacity(), false),
+        FileUtil.humanReadableByteCount(mapped.getMemoryUsed(), false),
+        FileUtil.humanReadableByteCount(mapped.getTotalCapacity(), false),
+        Thread.activeCount(),
+        getClass().getSimpleName(),
+        name.getMethodName()));
   }
 
   public QueryTestCaseBase() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git 
a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java 
b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 35e27b8..0871084 100644
--- a/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-cluster-tests/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -131,7 +131,7 @@ public class TajoTestingCluster {
     conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES.varname, 4);
     conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 2000);
     conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM.varname, 
3);
-    conf.setInt(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM.varname, 
2);
+    conf.setInt(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM.varname, 
1);
 
     // Client API RPC
     conf.setIntVar(ConfVars.RPC_CLIENT_WORKER_THREAD_NUM, 2);
@@ -161,6 +161,19 @@ public class TajoTestingCluster {
     // Query output file
     conf.setVar(ConfVars.QUERY_OUTPUT_DEFAULT_FILE_FORMAT, 
BuiltinStorages.DRAW);
 
+    /** decrease Hbase thread and memory cache for testing */
+    //server handler
+    conf.setInt("hbase.regionserver.handler.count", 5);
+    //client handler
+    conf.setInt("hbase.hconnection.threads.core", 5);
+    conf.setInt("hbase.hconnection.threads.max", 10);
+    conf.setInt("hbase.hconnection.meta.lookup.threads.core", 5);
+    conf.setInt("hbase.hconnection.meta.lookup.threads.max", 10);
+
+    //memory cache
+    conf.setFloat("hfile.block.cache.size", 0.0f);  //disable cache
+    conf.setBoolean("hbase.bucketcache.combinedcache.enabled", false);
+
     /* Since Travis CI limits the size of standard output log up to 4MB */
     if (!StringUtils.isEmpty(LOG_LEVEL)) {
       Level defaultLevel = Logger.getRootLogger().getLevel();
@@ -307,6 +320,13 @@ public class TajoTestingCluster {
   }
 
   public void shutdownCatalogCluster() {
+
+    try {
+      CatalogTestingUtil.shutdownCatalogStore(conf);
+    } catch (Exception e) {
+      //ignore
+    }
+
     if (catalogServer != null) {
       this.catalogServer.stop();
     }
@@ -490,7 +510,7 @@ public class TajoTestingCluster {
   public void startMiniCluster(final int numSlaves, final String [] 
dataNodeHosts) throws Exception {
 
     int numDataNodes = numSlaves;
-    if(dataNodeHosts != null && dataNodeHosts.length != 0) {
+    if (dataNodeHosts != null && dataNodeHosts.length != 0) {
       numDataNodes = dataNodeHosts.length;
     }
 
@@ -505,9 +525,6 @@ public class TajoTestingCluster {
 
     startMiniDFSCluster(numDataNodes, clusterTestBuildDir, dataNodeHosts);
     this.dfsCluster.waitClusterUp();
-
-    conf.setInt("hbase.hconnection.threads.core", 5);
-    conf.setInt("hbase.hconnection.threads.max", 50);
     hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir);
 
     startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false);

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java 
b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
index 403d789..97ac7b4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java
@@ -25,9 +25,12 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.conf.TajoConf;
 
+import java.lang.management.BufferPoolMXBean;
+import java.lang.management.ManagementFactory;
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.List;
 
 /* this class is PooledBuffer holder */
 public class BufferPool {
@@ -133,10 +136,38 @@ public class BufferPool {
   }
 
   /**
-   * deallocate the specified direct
+   * deallocate the specified direct memory
    * @param byteBuffer
    */
   public static void free(ByteBuffer byteBuffer) {
     PlatformDependent.freeDirectBuffer(byteBuffer);
   }
+
+  /**
+   * get the specified direct memory bean
+   */
+  public static BufferPoolMXBean getDirectBufferPool() {
+    for (BufferPoolMXBean pool : getBufferPools()) {
+      if (pool.getName().equals("direct")) {
+        return pool;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * get the specified mapped memory bean
+   */
+  public static BufferPoolMXBean getMappedBufferPool() {
+    for (BufferPoolMXBean pool : getBufferPools()) {
+      if (pool.getName().equals("mapped")) {
+        return pool;
+      }
+    }
+    return null;
+  }
+
+  private static List<BufferPoolMXBean> getBufferPools() {
+    return ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java 
b/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java
index 3ec6c40..8939bda 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JvmPauseMonitor.java
@@ -211,6 +211,7 @@ public class JvmPauseMonitor {
    * log messages about the GC pauses.
    */
   public static void main(String []args) throws Exception {
+    Thread.setDefaultUncaughtExceptionHandler(new 
TajoUncaughtExceptionHandler());
     new JvmPauseMonitor(new Configuration()).start();
     List<String> list = Lists.newArrayList();
     int i = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 71d30cd..2e1639f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -20,7 +20,6 @@ package org.apache.tajo.worker;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.*;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.http.*;
@@ -92,7 +91,7 @@ public class Fetcher {
               NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
                   
conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
           .channel(NioSocketChannel.class)
-          .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+          .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
           .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
               
conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000)
           .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index 62e314d..9861717 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -38,7 +38,7 @@
     <tajo.version>0.11.1-SNAPSHOT</tajo.version>
     <hbase.version>1.1.1</hbase.version>
     <hive.version>1.1.0</hive.version>
-    <netty.version>4.0.29.Final</netty.version>
+    <netty.version>4.0.33.Final</netty.version>
     <jersey.version>2.6</jersey.version>
     <jetty.version>6.1.14</jetty.version>
     <tajo.root>${project.parent.relativePath}/..</tajo.root>

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git 
a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
 
b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 6a9deb5..47a92c4 100644
--- 
a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ 
b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -20,7 +20,6 @@ package org.apache.tajo.pullserver;
 
 import com.google.common.collect.Lists;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.*;
 import io.netty.channel.group.ChannelGroup;
@@ -189,7 +188,7 @@ public class TajoPullServerService extends AbstractService {
 
       selector = NettyUtils.createServerBootstrap("TajoPullServerService", 
workerNum)
                    .option(ChannelOption.TCP_NODELAY, true)
-                   .childOption(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT)
+                   .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
                    .childOption(ChannelOption.TCP_NODELAY, true);
 
       localFS = new LocalFileSystem();

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
 
b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index 2c154bf..258afe8 100644
--- 
a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ 
b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -18,11 +18,7 @@
 
 package org.apache.tajo.rpc;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
@@ -31,6 +27,8 @@ import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.util.concurrent.GlobalEventExecutor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
 import java.net.DatagramSocket;
@@ -87,7 +85,7 @@ public class NettyServerBase {
       .childHandler(initializer)
       .option(ChannelOption.SO_REUSEADDR, true)
       .option(ChannelOption.TCP_NODELAY, true)
-      .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+      .childOption(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
       .childOption(ChannelOption.TCP_NODELAY, true)
       .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
       .childOption(ChannelOption.SO_RCVBUF, 1048576 * 10);

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java 
b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java
index 01fd48b..93c7fb2 100644
--- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java
@@ -20,8 +20,12 @@ package org.apache.tajo.rpc;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.ResourceLeakDetector;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -34,9 +38,10 @@ public final class NettyUtils {
   private static final Log LOG = LogFactory.getLog(NettyUtils.class);
   
   private static final int DEFAULT_THREAD_NUM = 
Runtime.getRuntime().availableProcessors() * 2;
-
   private static final Object lockObjectForLoopGroup = new Object();
-  private static AtomicInteger serverCount = new AtomicInteger(0);
+  private static final AtomicInteger serverCount = new AtomicInteger(0);
+
+  public static final ByteBufAllocator ALLOCATOR;
 
   public enum GROUP {
     DEFAULT,
@@ -46,6 +51,18 @@ public final class NettyUtils {
   private static final Map<GROUP, EventLoopGroup> eventLoopGroupMap =
       new ConcurrentHashMap<GROUP, EventLoopGroup>();
 
+  static {
+    if (RpcConstants.IS_TEST_MODE) {
+      /* Disable pooling buffers for memory usage  */
+      ALLOCATOR = UnpooledByteBufAllocator.DEFAULT;
+
+      /* if you are finding memory leak, please enable this line */
+      ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
+    } else {
+      ALLOCATOR = PooledByteBufAllocator.DEFAULT;
+    }
+  }
+
   private NettyUtils(){
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java 
b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
index 601f3d2..95e5ae4 100644
--- 
a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
+++ 
b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
@@ -47,4 +47,7 @@ public class RpcConstants {
 
   public static final String CLIENT_HANG_DETECTION = 
"tajo.rpc.client.hang-detection";
   public final static boolean CLIENT_HANG_DETECTION_DEFAULT =  false;
+
+  public static final String TEST_KEY = "tajo.test.enabled";
+  public static final boolean IS_TEST_MODE = 
Boolean.parseBoolean(System.getProperty(TEST_KEY, "false"));
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 6008c4c..74ed3a0 100644
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -23,7 +23,6 @@ import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
 import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.*;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.timeout.IdleState;
@@ -94,7 +93,7 @@ public abstract class NettyClientBase<T> implements 
ProtoDeclaration, Closeable
         .group(eventLoopGroup)
         .channel(NioSocketChannel.class)
         .handler(initializer)
-        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+        .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
         .option(ChannelOption.SO_REUSEADDR, true)
         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connTimeoutMillis)
         .option(ChannelOption.SO_RCVBUF, 1048576 * 10)

http://git-wip-us.apache.org/repos/asf/tajo/blob/24b8e891/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
index 85d5d51..668b6c6 100644
--- 
a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
+++ 
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
@@ -214,7 +214,7 @@ public class TablespaceManager implements StorageService {
     registerTableSpace(spaceName, uri, configs, true, override);
   }
 
-  private static void registerTableSpace(String spaceName, URI uri, JSONObject 
spaceDesc,
+  private static Tablespace registerTableSpace(String spaceName, URI uri, 
JSONObject spaceDesc,
                                          boolean visible, boolean override) {
     Tablespace tableSpace = initializeTableSpace(spaceName, uri, spaceDesc);
     tableSpace.setVisible(visible);
@@ -236,6 +236,7 @@ public class TablespaceManager implements StorageService {
         registerTableSpace(tmpName, rootUri, spaceDesc, false, override);
       }
     }
+    return tableSpace;
   }
 
   private static void putTablespace(Tablespace space, boolean override) {
@@ -317,7 +318,14 @@ public class TablespaceManager implements StorageService {
       existing = TABLE_SPACES.remove(space.getUri());
 
       // Add anotherone for test
-      registerTableSpace(space.name, space.uri, space.getConfig(), true, true);
+      Tablespace tablespace = registerTableSpace(space.name, space.uri, 
space.getConfig(), true, true);
+      try {
+        //override conf for test
+        if (space.conf != null)
+          tablespace.init(space.conf);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     }
     // if there is an existing one, return it.
     return Optional.fromNullable(existing);

Reply via email to