Repository: tajo Updated Branches: refs/heads/master d3727c735 -> 7c5ef87f6
TAJO-991: Running PullServer on a dedicated JVM process which separates from worker. Closes #107 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7c5ef87f Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7c5ef87f Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7c5ef87f Branch: refs/heads/master Commit: 7c5ef87f6eedc13afb16311bbc3b27ea0d921eca Parents: d3727c7 Author: HyoungJun Kim <[email protected]> Authored: Wed Sep 3 20:31:22 2014 +0900 Committer: HyoungJun Kim <[email protected]> Committed: Wed Sep 3 20:31:22 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../tajo/master/querymaster/QueryMaster.java | 14 +- .../java/org/apache/tajo/worker/Fetcher.java | 2 +- .../java/org/apache/tajo/worker/TajoWorker.java | 77 ++++++++-- .../apache/tajo/worker/TaskRunnerManager.java | 12 +- .../tajo/worker/WorkerHeartbeatService.java | 24 +-- .../org/apache/tajo/TajoTestingCluster.java | 1 + tajo-dist/src/main/bin/start-tajo.sh | 3 + tajo-dist/src/main/bin/stop-tajo.sh | 3 + tajo-dist/src/main/bin/tajo | 9 ++ tajo-dist/src/main/conf/tajo-env.sh | 13 +- .../pullserver/listener/FileCloseListener.java | 15 +- .../pullserver/listener/FileCloseListener.java | 15 +- .../pullserver/listener/FileCloseListener.java | 15 +- .../tajo/pullserver/PullServerAuxService.java | 2 +- .../apache/tajo/pullserver/TajoPullServer.java | 73 +++++++++ .../tajo/pullserver/TajoPullServerService.java | 154 +++++++++++++++++-- 17 files changed, 372 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 269c307..54d71c0 100644 --- a/CHANGES +++ b/CHANGES @@ -31,6 +31,9 @@ Release 0.9.0 - unreleased IMPROVEMENT + TAJO-991: Running PullServer on a dedicated JVM process which separates from worker. + (Hyoungjun Kim) + TAJO-906: Runtime code generation for evaluating expression trees. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index 3a86802..4af929e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -28,10 +28,7 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tajo.QueryId; -import org.apache.tajo.SessionVars; -import org.apache.tajo.TajoIdProtos; -import org.apache.tajo.TajoProtos; +import org.apache.tajo.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.query.QueryContext; @@ -50,6 +47,7 @@ import org.apache.tajo.storage.AbstractStorageManager; import org.apache.tajo.storage.StorageManagerFactory; import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.worker.TajoWorker; import java.util.ArrayList; @@ -183,7 +181,13 @@ public class QueryMaster extends CompositeService implements EventHandler { } protected void cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> executionBlockIds) { - LOG.info("cleanup executionBlocks: " + executionBlockIds); + StringBuilder cleanupMessage = new StringBuilder(); + String prefix = ""; + for (TajoIdProtos.ExecutionBlockIdProto eachEbId: executionBlockIds) { + cleanupMessage.append(prefix).append(new ExecutionBlockId(eachEbId).toString()); + prefix = ","; + } + LOG.info("cleanup executionBlocks: " + cleanupMessage); NettyClientBase rpc = null; List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker(); TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder(); http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java index aa22bb8..64475fe 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java @@ -153,7 +153,7 @@ public class Fetcher { } this.finishTime = System.currentTimeMillis(); - LOG.info("Status: " + getState() + ", URI:" + uri); + LOG.info("Fetcher finished:" + (finishTime - startTime) + " ms, " + getState() + ", URI:" + uri); if (timer != null) { timer.stop(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index f76176d..8e6118d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -95,6 +95,8 @@ public class TajoWorker extends CompositeService { private TajoPullServerService pullService; + private int pullServerPort; + private boolean yarnContainerMode; private boolean queryMasterMode; @@ -205,7 +207,7 @@ public class TajoWorker extends CompositeService { addService(tajoWorkerManagerService); if(!yarnContainerMode) { - if(taskRunnerMode) { + if(taskRunnerMode && !TajoPullServerService.isStandaloneMode()) { pullService = new TajoPullServerService(); addService(pullService); } @@ -356,7 +358,7 @@ public class TajoWorker extends CompositeService { new ConcurrentHashMap<ExecutionBlockId, ExecutionBlockSharedResource>(); public QueryMaster getQueryMaster() { - if(queryMasterManagerService == null) { + if (queryMasterManagerService == null) { return null; } return queryMasterManagerService.getQueryMaster(); @@ -386,24 +388,21 @@ public class TajoWorker extends CompositeService { return catalogClient; } - public TajoPullServerService getPullService() { - return pullService; - } - public int getHttpPort() { return httpPort; } public String getWorkerName() { - if(queryMasterMode) { + if (queryMasterMode) { return getQueryMasterManagerService().getHostAndPort(); } else { return getTajoWorkerManagerService().getHostAndPort(); } } + public void stopWorker(boolean force) { stop(); - if(force) { + if (force) { System.exit(0); } } @@ -428,14 +427,14 @@ public class TajoWorker extends CompositeService { } protected void cleanup(String strPath) { - if(deletionService == null) return; + if (deletionService == null) return; LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); try { Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(strPath, systemConf); FileSystem localFS = FileSystem.getLocal(systemConf); - for (Path path : iter){ + for (Path path : iter) { deletionService.delete(localFS.makeQualified(path)); } } catch (IOException e) { @@ -444,21 +443,21 @@ public class TajoWorker extends CompositeService { } protected void cleanupTemporalDirectories() { - if(deletionService == null) return; + if (deletionService == null) return; LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); try { Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(".", systemConf); FileSystem localFS = FileSystem.getLocal(systemConf); - for (Path path : iter){ + for (Path path : iter) { PathData[] items = PathData.expandAsGlob(localFS.makeQualified(new Path(path, "*")).toString(), systemConf); ArrayList<Path> paths = new ArrayList<Path>(); - for (PathData pd : items){ + for (PathData pd : items) { paths.add(pd.path); } - if(paths.size() == 0) continue; + if (paths.size() == 0) continue; deletionService.delete(null, paths.toArray(new Path[paths.size()])); } @@ -480,13 +479,13 @@ public class TajoWorker extends CompositeService { } public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary clusterResource) { - synchronized(numClusterNodes) { + synchronized (numClusterNodes) { TajoWorker.this.clusterResource = clusterResource; } } public TajoMasterProtocol.ClusterResourceSummary getClusterResource() { - synchronized(numClusterNodes) { + synchronized (numClusterNodes) { return TajoWorker.this.clusterResource; } } @@ -526,6 +525,52 @@ public class TajoWorker extends CompositeService { public HashShuffleAppenderManager getHashShuffleAppenderManager() { return hashShuffleAppenderManager; } + + public int getPullServerPort() { + if (pullService != null) { + long startTime = System.currentTimeMillis(); + while (true) { + int pullServerPort = pullService.getPort(); + if (pullServerPort > 0) { + return pullServerPort; + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + if (System.currentTimeMillis() - startTime > 30 * 1000) { + LOG.fatal("TajoWorker stopped cause can't get PullServer port."); + System.exit(-1); + } + } + } else { + if (pullServerPort != 0) { + return pullServerPort; + } else { + loadPullServerPort(); + return pullServerPort; + } + } + } + } + + private void loadPullServerPort() { + // get pull server port + long startTime = System.currentTimeMillis(); + while (true) { + pullServerPort = TajoPullServerService.readPullServerPort(); + if (pullServerPort > 0) { + break; + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + if (System.currentTimeMillis() - startTime > 30 * 1000) { + LOG.fatal("TajoWorker stopped cause can't get PullServer port."); + System.exit(-1); + } + } } public void stopWorkerForce() { http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index ec413b2..8009ce3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -28,6 +28,7 @@ import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.util.Pair; @@ -172,16 +173,19 @@ public class TaskRunnerManager extends CompositeService { failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond()); failureIntermediateItems.add(failureBuilder.build()); } + intermediateBuilder.clear(); + intermediateBuilder.setEbId(ebId.getProto()) .setHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName() + ":" + - workerContext.getPullService().getPort()) + workerContext.getPullServerPort()) .setTaskId(-1) .setAttemptId(-1) .setPartId(eachShuffle.getPartId()) .setVolume(eachShuffle.getVolume()) .addAllPages(pages) .addAllFailures(failureIntermediateItems); + intermediateEntries.add(intermediateBuilder.build()); } @@ -191,7 +195,11 @@ public class TaskRunnerManager extends CompositeService { } catch (Exception e) { LOG.error(e.getMessage(), e); reporterBuilder.setReportSuccess(false); - reporterBuilder.setReportErrorMessage(e.getMessage()); + if (e.getMessage() == null) { + reporterBuilder.setReportErrorMessage(e.getClass().getSimpleName()); + } else { + reporterBuilder.setReportErrorMessage(e.getMessage()); + } } lastTaskRunner.sendExecutionBlockReport(reporterBuilder.build()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index 5ab5b5d..b337754 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -27,6 +27,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; +import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; @@ -141,24 +142,6 @@ public class WorkerHeartbeatService extends AbstractService { LOG.info("Worker Resource Heartbeat Thread start."); int sendDiskInfoCount = 0; int pullServerPort = 0; - if(context.getPullService()!= null) { - long startTime = System.currentTimeMillis(); - while(true) { - pullServerPort = context.getPullService().getPort(); - if(pullServerPort > 0) { - break; - } - //waiting while pull server init - try { - Thread.sleep(100); - } catch (InterruptedException e) { - } - if(System.currentTimeMillis() - startTime > 30 * 1000) { - LOG.fatal("Too long push server init."); - System.exit(0); - } - } - } String hostName = null; int peerRpcPort = 0; @@ -176,9 +159,8 @@ public class WorkerHeartbeatService extends AbstractService { if(context.getTajoWorkerClientService() != null) { clientPort = context.getTajoWorkerClientService().getBindAddr().getPort(); } - if (context.getPullService() != null) { - pullServerPort = context.getPullService().getPort(); - } + + pullServerPort = context.getPullServerPort(); while(!stopped.get()) { if(sendDiskInfoCount == 0 && diskDeviceInfos != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 346fa69..b07ba96 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -103,6 +103,7 @@ public class TajoTestingCluster { } void initPropertiesAndConfigs() { + System.setProperty("TAJO_PULLSERVER_STANDALONE", "false"); if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) { String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname); Preconditions.checkState(testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName())); http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-dist/src/main/bin/start-tajo.sh ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/bin/start-tajo.sh b/tajo-dist/src/main/bin/start-tajo.sh index 63b7488..8fcf1a6 100755 --- a/tajo-dist/src/main/bin/start-tajo.sh +++ b/tajo-dist/src/main/bin/start-tajo.sh @@ -45,6 +45,9 @@ if [ -f "${TAJO_CONF_DIR}/tajo-env.sh" ]; then fi if [ "$TAJO_WORKER_STANDBY_MODE" = "true" ]; then + if [ "$TAJO_PULLSERVER_STANDALONE" = "true" ]; then + "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start pullserver + fi "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start worker if [ -f "${TAJO_CONF_DIR}/querymasters" ]; then "$bin/tajo-daemons.sh" --hosts querymasters cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start querymaster http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-dist/src/main/bin/stop-tajo.sh ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/bin/stop-tajo.sh b/tajo-dist/src/main/bin/stop-tajo.sh index f50ae3a..770034b 100755 --- a/tajo-dist/src/main/bin/stop-tajo.sh +++ b/tajo-dist/src/main/bin/stop-tajo.sh @@ -46,6 +46,9 @@ fi if [ "$TAJO_WORKER_STANDBY_MODE" = "true" ]; then "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop worker + if [ "$TAJO_PULLSERVER_STANDALONE" = "true" ]; then + "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop pullserver + fi if [ -f "${TAJO_CONF_DIR}/querymasters" ]; then "$bin/tajo-daemons.sh" --hosts querymasters cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop querymaster fi http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-dist/src/main/bin/tajo ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/bin/tajo b/tajo-dist/src/main/bin/tajo index f579864..3fc2ae0 100755 --- a/tajo-dist/src/main/bin/tajo +++ b/tajo-dist/src/main/bin/tajo @@ -64,6 +64,7 @@ if [ $# = 0 ]; then echo "where COMMAND is one of:" echo " master run the Master Server" echo " worker run the Worker Server" + echo " pullserver run the Pull Server" echo " catalog run the Catalog server" echo " catutil catalog utility" echo " cli run the tajo cli" @@ -121,6 +122,11 @@ if [ "$TAJO_WORKER_HEAPSIZE" != "" ]; then JAVA_WORKER_HEAP_MAX="-Xmx""$TAJO_WORKER_HEAPSIZE""m" #echo $JAVA_WORKER_HEAP_MAX fi +if [ "$TAJO_PULLSERVER_HEAPSIZE" != "" ]; then + #echo "run with heapsize $TAJO_PULLSERVER_HEAPSIZE" + JAVA_PULLSERVER_HEAP_MAX="-Xmx""$TAJO_PULLSERVER_HEAPSIZE""m" + #echo $JAVA_PULLSERVER_HEAP_MAX +fi if [ "$TAJO_QUERYMASTER_HEAPSIZE" != "" ]; then #echo "run with heapsize $TAJO_QUERYMASTER_HEAPSIZE" JAVA_QUERYMASTER_HEAP_MAX="-Xmx""$TAJO_QUERYMASTER_HEAPSIZE""m" @@ -330,6 +336,9 @@ elif [ "$COMMAND" = "master" ] ; then elif [ "$COMMAND" = "worker" ] ; then CLASS='org.apache.tajo.worker.TajoWorker' TAJO_OPTS="$TAJO_OPTS $JAVA_WORKER_HEAP_MAX $TAJO_WORKER_OPTS" +elif [ "$COMMAND" = "pullserver" ] ; then + CLASS='org.apache.tajo.pullserver.TajoPullServer' + TAJO_OPTS="$TAJO_OPTS $JAVA_PULLSERVER_HEAP_MAX $TAJO_PULLSERVER_OPTS" elif [ "$COMMAND" = "querymaster" ] ; then CLASS='org.apache.tajo.worker.TajoWorker' TAJO_OPTS="$TAJO_OPTS $JAVA_QUERYMASTER_HEAP_MAX $TAJO_QUERYMASTER_OPTS" http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-dist/src/main/conf/tajo-env.sh ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/conf/tajo-env.sh b/tajo-dist/src/main/conf/tajo-env.sh index 747ee1c..bd14af6 100755 --- a/tajo-dist/src/main/conf/tajo-env.sh +++ b/tajo-dist/src/main/conf/tajo-env.sh @@ -37,6 +37,9 @@ # export TAJO_WORKER_HEAPSIZE=1000 # The maximum amount of heap to use, in MB. Default is 1000. +# export TAJO_PULLSERVER_HEAPSIZE=1000 + +# The maximum amount of heap to use, in MB. Default is 1000. # export TAJO_QUERYMASTER_HEAPSIZE=1000 # Extra Java runtime options. Empty by default. @@ -45,9 +48,12 @@ # Extra TajoMaster's java runtime options for TajoMaster. Empty by default # export TAJO_MASTER_OPTS= -# Extra TajoWorker's java runtime options for TajoMaster. Empty by default +# Extra TajoWorker's java runtime options. Empty by default # export TAJO_WORKER_OPTS= +# Extra TajoPullServer's java runtime options. Empty by default +# export TAJO_PULLSERVER_OPTS= + # Extra QueryMaster mode TajoWorker's java runtime options for TajoMaster. Empty by default # export TAJO_QUERYMASTER_OPTS= @@ -68,4 +74,7 @@ export TAJO_WORKER_STANDBY_MODE=true # It must be required to use HCatalogStore # export HIVE_HOME= -# export HIVE_JDBC_DRIVER_DIR= \ No newline at end of file +# export HIVE_JDBC_DRIVER_DIR= + +# Tajo PullServer mode. the default mode is standalone mode +export TAJO_PULLSERVER_STANDALONE=true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java b/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java index 0933167..2e36644 100644 --- a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java +++ b/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java @@ -21,14 +21,24 @@ package org.apache.tajo.pullserver.listener; import org.apache.hadoop.mapred.FadvisedFileRegion; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; +import org.apache.tajo.pullserver.TajoPullServerService; @Deprecated public class FileCloseListener implements ChannelFutureListener { private FadvisedFileRegion filePart; + private String requestUri; + private TajoPullServerService pullServerService; + private long startTime; - public FileCloseListener(FadvisedFileRegion filePart) { + public FileCloseListener(FadvisedFileRegion filePart, + String requestUri, + long startTime, + TajoPullServerService pullServerService) { this.filePart = filePart; + this.requestUri = requestUri; + this.pullServerService = pullServerService; + this.startTime = startTime; } // TODO error handling; distinguish IO/connection failures, @@ -36,5 +46,8 @@ public class FileCloseListener implements ChannelFutureListener { @Override public void operationComplete(ChannelFuture future) { filePart.releaseExternalResources(); + if (pullServerService != null) { + pullServerService.completeFileChunk(filePart, requestUri, startTime); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java b/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java index 5b2d1b3..be599c3 100644 --- a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java +++ b/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java @@ -19,15 +19,25 @@ package org.apache.tajo.pullserver.listener; import org.apache.hadoop.mapred.FadvisedFileRegion; +import org.apache.tajo.pullserver.TajoPullServerService; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; public class FileCloseListener implements ChannelFutureListener { private FadvisedFileRegion filePart; + private String requestUri; + private TajoPullServerService pullServerService; + private long startTime; - public FileCloseListener(FadvisedFileRegion filePart) { + public FileCloseListener(FadvisedFileRegion filePart, + String requestUri, + long startTime, + TajoPullServerService pullServerService) { this.filePart = filePart; + this.requestUri = requestUri; + this.pullServerService = pullServerService; + this.startTime = startTime; } // TODO error handling; distinguish IO/connection failures, @@ -38,5 +48,8 @@ public class FileCloseListener implements ChannelFutureListener { filePart.transferSuccessful(); } filePart.releaseExternalResources(); + if (pullServerService != null) { + pullServerService.completeFileChunk(filePart, requestUri, startTime); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java b/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java index 5b2d1b3..7d4ca3a 100644 --- a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java +++ b/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java @@ -21,13 +21,23 @@ package org.apache.tajo.pullserver.listener; import org.apache.hadoop.mapred.FadvisedFileRegion; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; +import org.apache.tajo.pullserver.TajoPullServerService; public class FileCloseListener implements ChannelFutureListener { private FadvisedFileRegion filePart; + private String requestUri; + private TajoPullServerService pullServerService; + private long startTime; - public FileCloseListener(FadvisedFileRegion filePart) { + public FileCloseListener(FadvisedFileRegion filePart, + String requestUri, + long startTime, + TajoPullServerService pullServerService) { this.filePart = filePart; + this.requestUri = requestUri; + this.pullServerService = pullServerService; + this.startTime = startTime; } // TODO error handling; distinguish IO/connection failures, @@ -38,5 +48,8 @@ public class FileCloseListener implements ChannelFutureListener { filePart.transferSuccessful(); } filePart.releaseExternalResources(); + if (pullServerService != null) { + pullServerService.completeFileChunk(filePart, requestUri, startTime); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java index dd3bee3..e6e7ce3 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java @@ -493,7 +493,7 @@ public class PullServerAuxService extends AuxiliaryService { file.startOffset, file.length(), manageOsCache, readaheadLength, readaheadPool, file.getFile().getAbsolutePath()); writeFuture = ch.write(partition); - writeFuture.addListener(new FileCloseListener(partition)); + writeFuture.addListener(new FileCloseListener(partition, null, 0, null)); } else { // HTTPS cannot be done with zero copy. final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java new file mode 100644 index 0000000..7d7065e --- /dev/null +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.CompositeService; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.pullserver.PullServerAuxService.PullServer; +import org.apache.tajo.util.StringUtils; + +public class TajoPullServer extends CompositeService { + private static final Log LOG = LogFactory.getLog(TajoPullServer.class); + + private TajoPullServerService pullService; + private TajoConf systemConf; + + public TajoPullServer() { + super(TajoPullServer.class.getName()); + } + + @Override + public void init(Configuration conf) { + this.systemConf = (TajoConf)conf; + pullService = new TajoPullServerService(); + addService(pullService); + + super.init(conf); + } + + public void startPullServer(TajoConf systemConf) { + init(systemConf); + start(); + } + + public void start() { + super.start(); + + } + + public static void main(String[] args) throws Exception { + StringUtils.startupShutdownMessage(PullServer.class, args, LOG); + + if (!TajoPullServerService.isStandaloneMode()) { + LOG.fatal("TAJO_PULLSERVER_STANDALONE env variable is not 'true'"); + return; + } + + TajoConf tajoConf = new TajoConf(); + tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME)); + + (new TajoPullServer()).startPullServer(tajoConf); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index e68e351..150ac85 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.mapred.FadvisedChunkedFile; +import org.apache.hadoop.mapred.FadvisedFileRegion; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; @@ -63,10 +64,7 @@ import org.jboss.netty.handler.ssl.SslHandler; import org.jboss.netty.handler.stream.ChunkedWriteHandler; import org.jboss.netty.util.CharsetUtil; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; +import java.io.*; import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; @@ -76,6 +74,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; @@ -245,6 +244,72 @@ public class TajoPullServerService extends AbstractService { sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); + + if (isStandaloneMode()) { + File pullServerPortFile = getPullServerPortFile(); + if (pullServerPortFile.exists()) { + pullServerPortFile.delete(); + } + pullServerPortFile.getParentFile().mkdirs(); + LOG.info("Write PullServerPort to " + pullServerPortFile); + try { + FileOutputStream out = new FileOutputStream(pullServerPortFile); + out.write(("" + port).getBytes()); + out.close(); + } catch (Exception e) { + LOG.fatal("PullServer exists cause can't write PullServer port to " + pullServerPortFile + + ", " + e.getMessage(), e); + System.exit(-1); + } + } + LOG.info("TajoPullServerService started: port=" + port); + } + + private static File getPullServerPortFile() { + String pullServerPortInfoFile = System.getenv("TAJO_PID_DIR"); + if (pullServerPortInfoFile == null || pullServerPortInfoFile.isEmpty()) { + pullServerPortInfoFile = "/tmp"; + } + + return new File(pullServerPortInfoFile + "/pullserver.port"); + } + + public static boolean isStandaloneMode() { + String mode = System.getenv("TAJO_PULLSERVER_STANDALONE"); + if (mode == null || mode.trim().isEmpty()) { + mode = System.getProperty("TAJO_PULLSERVER_STANDALONE"); + } + + if (mode == null || mode.trim().isEmpty()) { + return true; + } else { + return mode.equalsIgnoreCase("true"); + } + } + + public static int readPullServerPort() { + FileInputStream in = null; + try { + File pullServerPortFile = getPullServerPortFile(); + + if (!pullServerPortFile.exists() || pullServerPortFile.isDirectory()) { + return -1; + } + in = new FileInputStream(pullServerPortFile); + byte[] buf = new byte[1024]; + int readBytes = in.read(buf); + return Integer.parseInt(new String(buf, 0, readBytes)); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + return -1; + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + } + } + } } public int getPort() { @@ -317,6 +382,61 @@ public class TajoPullServerService extends AbstractService { } } + + Map<String, ProcessingStatus> processingStatusMap = new ConcurrentHashMap<String, ProcessingStatus>(); + + public void completeFileChunk(FadvisedFileRegion filePart, + String requestUri, + long startTime) { + ProcessingStatus status = processingStatusMap.get(requestUri); + if (status != null) { + status.decrementRemainFiles(filePart, startTime); + } + } + + class ProcessingStatus { + String requestUri; + int numFiles; + AtomicInteger remainFiles; + long startTime; + long makeFileListTime; + long minTime = Long.MAX_VALUE; + long maxTime; + int numSlowFile; + + public ProcessingStatus(String requestUri) { + this.requestUri = requestUri; + this.startTime = System.currentTimeMillis(); + } + + public void setNumFiles(int numFiles) { + this.numFiles = numFiles; + this.remainFiles = new AtomicInteger(numFiles); + } + public void decrementRemainFiles(FadvisedFileRegion filePart, long fileStartTime) { + synchronized(remainFiles) { + long fileSendTime = System.currentTimeMillis() - fileStartTime; + if (fileSendTime > 20 * 1000) { + LOG.info("PullServer send too long time: filePos=" + filePart.getPosition() + ", fileLen=" + filePart.getCount()); + numSlowFile++; + } + if (fileSendTime > maxTime) { + maxTime = fileSendTime; + } + if (fileSendTime < minTime) { + minTime = fileSendTime; + } + int remain = remainFiles.decrementAndGet(); + if (remain <= 0) { + processingStatusMap.remove(requestUri); + LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " + + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " + + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile); + } + } + } + } + class PullServer extends SimpleChannelUpstreamHandler { private final Configuration conf; @@ -370,6 +490,10 @@ public class TajoPullServerService extends AbstractService { return; } + ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString()); + synchronized(processingStatusMap) { + processingStatusMap.put(request.getUri().toString(), processingStatus); + } // Parsing the URL into key-values final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).getParameters(); @@ -407,13 +531,15 @@ public class TajoPullServerService extends AbstractService { List<String> taskIds = splitMaps(taskIdList); - LOG.info("PullServer request param: shuffleType=" + shuffleType + - ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList); - - // the working dir of tajo worker for each query String queryBaseDir = queryId.toString() + "/output"; - LOG.info("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir); + if (LOG.isDebugEnabled()) { + LOG.debug("PullServer request param: shuffleType=" + shuffleType + + ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList); + + // the working dir of tajo worker for each query + LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir); + } final List<FileChunk> chunks = Lists.newArrayList(); @@ -474,6 +600,8 @@ public class TajoPullServerService extends AbstractService { return; } + processingStatus.setNumFiles(chunks.size()); + processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime; // Write the content. Channel ch = e.getChannel(); if (chunks.size() == 0) { @@ -497,7 +625,7 @@ public class TajoPullServerService extends AbstractService { ChannelFuture writeFuture = null; for (FileChunk chunk : file) { - writeFuture = sendFile(ctx, ch, chunk); + writeFuture = sendFile(ctx, ch, chunk, request.getUri().toString()); if (writeFuture == null) { sendError(ctx, NOT_FOUND); return; @@ -514,7 +642,9 @@ public class TajoPullServerService extends AbstractService { private ChannelFuture sendFile(ChannelHandlerContext ctx, Channel ch, - FileChunk file) throws IOException { + FileChunk file, + String requestUri) throws IOException { + long startTime = System.currentTimeMillis(); RandomAccessFile spill = null; ChannelFuture writeFuture; try { @@ -524,7 +654,7 @@ public class TajoPullServerService extends AbstractService { file.startOffset, file.length(), manageOsCache, readaheadLength, readaheadPool, file.getFile().getAbsolutePath()); writeFuture = ch.write(filePart); - writeFuture.addListener(new FileCloseListener(filePart)); + writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this)); } else { // HTTPS cannot be done with zero copy. final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
