TAJO-1197: Unit test failed: unable to create new native thread. (jinho) Closes #255
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ef282ebe Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ef282ebe Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ef282ebe Branch: refs/heads/hbase_storage Commit: ef282ebec579c512bbb3860a49d7322e1fb45199 Parents: 67bdc71 Author: jhkim <[email protected]> Authored: Wed Nov 19 20:14:21 2014 +0900 Committer: jhkim <[email protected]> Committed: Wed Nov 19 20:14:21 2014 +0900 ---------------------------------------------------------------------- CHANGES | 4 ++ .../java/org/apache/tajo/cli/tsql/TajoCli.java | 5 +++ .../java/org/apache/tajo/master/TajoMaster.java | 29 ++++++++++---- .../tajo/scheduler/SimpleFifoScheduler.java | 2 +- .../apache/tajo/util/history/HistoryWriter.java | 2 + .../java/org/apache/tajo/worker/TajoWorker.java | 12 ++---- .../apache/tajo/worker/TaskRunnerManager.java | 7 ++-- .../org/apache/tajo/TajoTestingCluster.java | 19 +++++++++ .../org/apache/tajo/cli/tsql/TestTajoCli.java | 22 +++++++---- .../tajo/master/ha/TestHAServiceHDFSImpl.java | 41 +++++++++++--------- .../tajo/pullserver/TajoPullServerService.java | 2 +- 11 files changed, 97 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/ef282ebe/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 1ace332..5e9b14f 100644 --- a/CHANGES +++ b/CHANGES @@ -60,8 +60,12 @@ Release 0.9.1 - unreleased TAJO-1172: Remove Trevni storage type and its related classes. (DaeMyung Kang via hyunsik) + BUG FIXES + TAJO-1197: Unit test failed: unable to create new native thread. + (jinho) + TAJO-1178: Some error messages for wrong JSON queries are not so much helpful. (Jaewoong Jung via jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/ef282ebe/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index 34e2170..e96017b 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -664,11 +664,16 @@ public class TajoCli { sout.println("Invalid command " + command + ". Try \\? for help."); } + @VisibleForTesting public void close() { //for testcase if (client != null) { client.close(); } + + if (reader != null) { + reader.shutdown(); + } } private void checkMasterStatus() throws IOException, ServiceException { http://git-wip-us.apache.org/repos/asf/tajo/blob/ef282ebe/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 17658ac..6e585af 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -18,7 +18,6 @@ package org.apache.tajo.master; -import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -29,17 +28,18 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tajo.catalog.*; -import org.apache.tajo.engine.function.FunctionLoader; +import org.apache.tajo.catalog.CatalogServer; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.LocalCatalogWrapper; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.engine.function.FunctionLoader; import org.apache.tajo.master.ha.HAService; import org.apache.tajo.master.ha.HAServiceHDFSImpl; import org.apache.tajo.master.metrics.CatalogMetricsGaugeSet; @@ -51,9 +51,9 @@ import org.apache.tajo.master.session.SessionManager; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.util.*; -import org.apache.tajo.util.metrics.TajoSystemMetrics; import org.apache.tajo.util.history.HistoryReader; import org.apache.tajo.util.history.HistoryWriter; +import org.apache.tajo.util.metrics.TajoSystemMetrics; import org.apache.tajo.webapp.QueryExecutorServlet; import org.apache.tajo.webapp.StaticHttpServer; @@ -150,6 +150,7 @@ public class TajoMaster extends CompositeService { @Override public void serviceInit(Configuration _conf) throws Exception { this.systemConf = (TajoConf) _conf; + Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook())); context = new MasterContext(systemConf); clock = new SystemClock(); @@ -384,10 +385,9 @@ public class TajoMaster extends CompositeService { systemMetrics.stop(); } - RpcChannelFactory.shutdown(); - if(pauseMonitor != null) pauseMonitor.stop(); super.stop(); + LOG.info("Tajo Master main thread exiting"); } @@ -549,12 +549,25 @@ public class TajoMaster extends CompositeService { } } } + + private class ShutdownHook implements Runnable { + @Override + public void run() { + if(!isInState(STATE.STOPPED)) { + LOG.info("============================================"); + LOG.info("TajoMaster received SIGINT Signal"); + LOG.info("============================================"); + stop(); + RpcChannelFactory.shutdown(); + } + } + } + public static void main(String[] args) throws Exception { StringUtils.startupShutdownMessage(TajoMaster.class, args, LOG); try { TajoMaster master = new TajoMaster(); - ShutdownHookManager.get().addShutdownHook(new CompositeServiceShutdownHook(master), SHUTDOWN_HOOK_PRIORITY); TajoConf conf = new TajoConf(new YarnConfiguration()); master.init(conf); master.start(); http://git-wip-us.apache.org/repos/asf/tajo/blob/ef282ebe/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java index 87968a5..a74e606 100644 --- a/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/scheduler/SimpleFifoScheduler.java @@ -31,7 +31,7 @@ public class SimpleFifoScheduler implements Scheduler { private static final Log LOG = LogFactory.getLog(SimpleFifoScheduler.class.getName()); private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>(); private final Thread queryProcessor; - private static AtomicBoolean stopped = new AtomicBoolean(); + private AtomicBoolean stopped = new AtomicBoolean(); private QueryJobManager manager; private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator(); http://git-wip-us.apache.org/repos/asf/tajo/blob/ef282ebe/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java index 63a143b..17c9366 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java @@ -317,6 +317,8 @@ public class HistoryWriter extends AbstractService { } private synchronized void writeQuerySummary(QueryInfo queryInfo) throws Exception { + if(stopped.get()) return; + // writing to HDFS and rolling hourly if (querySummaryWriter == null) { querySummaryWriter = new WriterHolder(); http://git-wip-us.apache.org/repos/asf/tajo/blob/ef282ebe/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index ddd7e13..c24eef1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -19,6 +19,7 @@ package org.apache.tajo.worker; import com.codahale.metrics.Gauge; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -41,7 +42,6 @@ import org.apache.tajo.master.querymaster.QueryMasterManagerService; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.rpc.RpcChannelFactory; -import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.util.*; @@ -115,8 +115,6 @@ public class TajoWorker extends CompositeService { private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - private RpcConnectionPool connPool; - private String[] cmdArgs; private DeletionService deletionService; @@ -182,7 +180,6 @@ public class TajoWorker extends CompositeService { this.systemConf = (TajoConf)conf; RackResolver.init(systemConf); - this.connPool = RpcConnectionPool.getPool(systemConf); this.workerContext = new WorkerContext(); this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); @@ -371,11 +368,6 @@ public class TajoWorker extends CompositeService { catalogClient.close(); } - if(connPool != null) { - connPool.shutdown(); - RpcChannelFactory.shutdown(); - } - if(webServer != null && webServer.isAlive()) { try { webServer.stop(); @@ -585,6 +577,7 @@ public class TajoWorker extends CompositeService { return pullServerPort; } + @VisibleForTesting public void stopWorkerForce() { stop(); } @@ -605,6 +598,7 @@ public class TajoWorker extends CompositeService { LOG.info("TajoWorker received SIGINT Signal"); LOG.info("============================================"); stop(); + RpcChannelFactory.shutdown(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/ef282ebe/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index faadf58..a06e6e2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -188,14 +188,15 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< } else if (event instanceof TaskRunnerStopEvent) { ExecutionBlockContext executionBlockContext = executionBlockContextMap.remove(event.getExecutionBlockId()); if(executionBlockContext != null){ - TupleCache.getInstance().removeBroadcastCache(event.getExecutionBlockId()); - executionBlockContext.reportExecutionBlock(event.getExecutionBlockId()); - executionBlockContext.stop(); try { + TupleCache.getInstance().removeBroadcastCache(event.getExecutionBlockId()); + executionBlockContext.reportExecutionBlock(event.getExecutionBlockId()); workerContext.getHashShuffleAppenderManager().close(event.getExecutionBlockId()); } catch (IOException e) { LOG.fatal(e.getMessage(), e); throw new RuntimeException(e); + } finally { + executionBlockContext.stop(); } } LOG.info("Stopped execution block:" + event.getExecutionBlockId()); http://git-wip-us.apache.org/repos/asf/tajo/blob/ef282ebe/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 3511290..603da0c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -125,6 +125,25 @@ public class TajoTestingCluster { conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 1024); conf.setFloat(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.varname, 2.0f); + + // Client API RPC + conf.setIntVar(ConfVars.RPC_CLIENT_WORKER_THREAD_NUM, 2); + + //Client API service RPC Server + conf.setIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); + + // Internal RPC Client + conf.setIntVar(ConfVars.INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM, 2); + conf.setIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM, 2); + + // Internal RPC Server + conf.setIntVar(ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2); + this.standbyWorkerMode = conf.getVar(ConfVars.RESOURCE_MANAGER_CLASS) .indexOf(TajoWorkerResourceManager.class.getName()) >= 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/ef282ebe/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java index 797544f..b14bfa9 100644 --- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java +++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java @@ -28,8 +28,6 @@ import org.apache.tajo.SessionVars; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.TpchTestBase; import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.cli.tsql.DefaultTajoCliOutputFormatter; -import org.apache.tajo.cli.tsql.TajoCli; import org.apache.tajo.client.QueryStatus; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.StorageUtil; @@ -156,8 +154,12 @@ public class TestTajoCli { TajoConf tajoConf = TpchTestBase.getInstance().getTestingCluster().getConfiguration(); TajoCli testCli = new TajoCli(tajoConf, args, System.in, System.out); - assertEquals("false", testCli.getContext().get(SessionVars.CLI_PAGING_ENABLED)); - assertEquals("256", testCli.getContext().getConf().get("tajo.executor.join.inner.in-memory-table-num")); + try { + assertEquals("false", testCli.getContext().get(SessionVars.CLI_PAGING_ENABLED)); + assertEquals("256", testCli.getContext().getConf().get("tajo.executor.join.inner.in-memory-table-num")); + } finally { + testCli.close(); + } } @Test @@ -302,11 +304,15 @@ public class TestTajoCli { setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName()); ByteArrayOutputStream out = new ByteArrayOutputStream(); - tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out); - tajoCli.executeMetaCommand("\\getconf tajo.rootdir"); + TajoCli tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out); + try { + tajoCli.executeMetaCommand("\\getconf tajo.rootdir"); - String consoleResult = new String(out.toByteArray()); - assertEquals(consoleResult, tajoCli.getContext().getConf().getVar(TajoConf.ConfVars.ROOT_DIR) + "\n"); + String consoleResult = new String(out.toByteArray()); + assertEquals(consoleResult, tajoCli.getContext().getConf().getVar(TajoConf.ConfVars.ROOT_DIR) + "\n"); + } finally { + tajoCli.close(); + } } @Test http://git-wip-us.apache.org/repos/asf/tajo/blob/ef282ebe/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java index 249afae..69ed556 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoTestingCluster; @@ -57,24 +58,28 @@ public class TestHAServiceHDFSImpl { conf = cluster.getConfiguration(); client = new TajoClientImpl(conf); - - FileSystem fs = cluster.getDefaultFileSystem(); - startBackupMasters(); - - verifyMasterAddress(); - verifySystemDirectories(fs); - - Path backupMasterFile1 = new Path(backupPath, backupMaster1.getMasterName() - .replaceAll(":", "_")); - assertTrue(fs.exists(backupMasterFile1)); - - Path backupMasterFile2 = new Path(backupPath, backupMaster2.getMasterName() - .replaceAll(":", "_")); - assertTrue(fs.exists(backupMasterFile2)); - - assertTrue(cluster.getMaster().isActiveMaster()); - assertFalse(backupMaster1.isActiveMaster()); - assertFalse(backupMaster2.isActiveMaster()); + try { + FileSystem fs = cluster.getDefaultFileSystem(); + startBackupMasters(); + + verifyMasterAddress(); + verifySystemDirectories(fs); + + Path backupMasterFile1 = new Path(backupPath, backupMaster1.getMasterName() + .replaceAll(":", "_")); + assertTrue(fs.exists(backupMasterFile1)); + + Path backupMasterFile2 = new Path(backupPath, backupMaster2.getMasterName() + .replaceAll(":", "_")); + assertTrue(fs.exists(backupMasterFile2)); + + assertTrue(cluster.getMaster().isActiveMaster()); + assertFalse(backupMaster1.isActiveMaster()); + assertFalse(backupMaster2.isActiveMaster()); + } finally { + IOUtils.cleanup(LOG, client, backupMaster1, backupMaster2); + cluster.shutdownMiniCluster(); + } } private void startBackupMasters() throws Exception { http://git-wip-us.apache.org/repos/asf/tajo/blob/ef282ebe/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 2fb7c29..5a4e69f 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -209,7 +209,7 @@ public class TajoPullServerService extends AbstractService { readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, DEFAULT_SHUFFLE_READAHEAD_BYTES); - int workerNum = conf.getInt("tajo.shuffle.rpc.server.io-thread-num", + int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num", Runtime.getRuntime().availableProcessors() * 2); selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);
