Repository: tajo Updated Branches: refs/heads/master bfc70203b -> afa9b432f
TAJO-2021: Some refactoring and changing anonymous class to lambda expression. Closes #911 Signed-off-by: Jinho Kim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/afa9b432 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/afa9b432 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/afa9b432 Branch: refs/heads/master Commit: afa9b432fd357a2e958ece7917a7704aadb9b3ab Parents: bfc7020 Author: Jongyoung Park <[email protected]> Authored: Tue Dec 15 15:46:39 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Tue Dec 15 15:46:39 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/worker/TajoWorker.java | 78 ++++---------------- 2 files changed, 17 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/afa9b432/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index c297940..74a86fa 100644 --- a/CHANGES +++ b/CHANGES @@ -121,6 +121,9 @@ Release 0.12.0 - unreleased TASKS + TAJO-2021: Some refactoring and changing anonymous class to lambda expression. + (Contributed by Jongyoung Park. Committed by jinho) + TAJO-2019: Replace manual array copy with System.arraycopy(). (Contributed by Dongkyu Hwangbo, committed by jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/afa9b432/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index c7cac4f..8315c1c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -18,7 +18,6 @@ package org.apache.tajo.worker; -import com.codahale.metrics.Gauge; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,10 +32,8 @@ import org.apache.hadoop.yarn.util.RackResolver; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.CatalogClient; import org.apache.tajo.catalog.CatalogService; -import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.function.FunctionLoader; -import org.apache.tajo.function.FunctionSignature; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.metrics.Node; import org.apache.tajo.plan.function.python.PythonScriptEngine; @@ -81,55 +78,26 @@ public class TajoWorker extends CompositeService { private static final Log LOG = LogFactory.getLog(TajoWorker.class); private TajoConf systemConf; - private StaticHttpServer webServer; - - private TajoWorkerClientService tajoWorkerClientService; - private QueryMasterManagerService queryMasterManagerService; - - private TajoWorkerManagerService tajoWorkerManagerService; - - private TajoMasterInfo tajoMasterInfo; - private CatalogClient catalogClient; - private WorkerContext workerContext; - private TaskManager taskManager; - private TaskExecutor taskExecutor; - private TajoPullServerService pullService; - private ServiceTracker serviceTracker; - private NodeResourceManager nodeResourceManager; - - private NodeStatusUpdater nodeStatusUpdater; - private AtomicBoolean stopped = new AtomicBoolean(false); - private WorkerConnectionInfo connectionInfo; - private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); - private String[] cmdArgs; - private DeletionService deletionService; - private TajoSystemMetrics workerSystemMetrics; - private HashShuffleAppenderManager hashShuffleAppenderManager; - - private AsyncDispatcher dispatcher; - private LocalDirAllocator lDirAllocator; - private JvmPauseMonitor pauseMonitor; private HistoryWriter taskHistoryWriter; - private HistoryReader historyReader; public TajoWorker() throws Exception { @@ -143,9 +111,12 @@ public class TajoWorker extends CompositeService { start(); } - @Override public void serviceInit(Configuration conf) throws Exception { + AsyncDispatcher dispatcher; + TajoWorkerClientService tajoWorkerClientService; + TajoWorkerManagerService tajoWorkerManagerService; + ShutdownHookManager.get().addShutdownHook(new ShutdownHook(), SHUTDOWN_HOOK_PRIORITY); this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); @@ -156,13 +127,11 @@ public class TajoWorker extends CompositeService { this.workerContext = new TajoWorkerContext(); this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); - int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort(); int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort(); int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort(); - - this.dispatcher = new AsyncDispatcher(); + dispatcher = new AsyncDispatcher(); addIfService(dispatcher); tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort); @@ -186,8 +155,7 @@ public class TajoWorker extends CompositeService { this.nodeResourceManager = new NodeResourceManager(rmDispatcher, workerContext); addService(nodeResourceManager); - this.nodeStatusUpdater = new NodeStatusUpdater(workerContext); - addService(nodeStatusUpdater); + addService(new NodeStatusUpdater(workerContext)); int httpPort = 0; if(!TajoPullServerService.isStandalone()) { @@ -240,29 +208,15 @@ public class TajoWorker extends CompositeService { private void initWorkerMetrics() { workerSystemMetrics = new TajoSystemMetrics(systemConf, Node.class, workerContext.getWorkerName()); + workerSystemMetrics.start(); - workerSystemMetrics.register(Node.QueryMaster.RUNNING_QM, new Gauge<Integer>() { - @Override - public Integer getValue() { - if(queryMasterManagerService != null) { - return queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size(); - } else { - return 0; - } - } - }); - - workerSystemMetrics.register(Node.Tasks.RUNNING_TASKS, new Gauge<Integer>() { - @Override - public Integer getValue() { - if(taskExecutor != null) { - return taskExecutor.getRunningTasks(); - } else { - return 0; - } - } - }); + workerSystemMetrics.register(Node.QueryMaster.RUNNING_QM, + () -> queryMasterManagerService != null ? + queryMasterManagerService.getQueryMaster().getQueryMasterTasks().size() : 0); + + workerSystemMetrics.register(Node.Tasks.RUNNING_TASKS, + () -> taskExecutor != null ? taskExecutor.getRunningTasks() : 0); } private int initWebServer() { @@ -310,7 +264,7 @@ public class TajoWorker extends CompositeService { public void serviceStart() throws Exception { startJvmPauseMonitor(); - tajoMasterInfo = new TajoMasterInfo(); + TajoMasterInfo tajoMasterInfo = new TajoMasterInfo(); if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { tajoMasterInfo.setTajoMasterAddress(serviceTracker.getUmbilicalAddress()); @@ -453,10 +407,6 @@ public class TajoWorker extends CompositeService { return catalogClient; } - public TajoPullServerService getPullService() { - return pullService; - } - public WorkerConnectionInfo getConnectionInfo() { return connectionInfo; }
