Repository: tajo
Updated Branches:
  refs/heads/master d3727c735 -> 7c5ef87f6


TAJO-991: Running PullServer on a dedicated JVM process which separates from 
worker.

Closes #107


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7c5ef87f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7c5ef87f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7c5ef87f

Branch: refs/heads/master
Commit: 7c5ef87f6eedc13afb16311bbc3b27ea0d921eca
Parents: d3727c7
Author: HyoungJun Kim <[email protected]>
Authored: Wed Sep 3 20:31:22 2014 +0900
Committer: HyoungJun Kim <[email protected]>
Committed: Wed Sep 3 20:31:22 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../tajo/master/querymaster/QueryMaster.java    |  14 +-
 .../java/org/apache/tajo/worker/Fetcher.java    |   2 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  77 ++++++++--
 .../apache/tajo/worker/TaskRunnerManager.java   |  12 +-
 .../tajo/worker/WorkerHeartbeatService.java     |  24 +--
 .../org/apache/tajo/TajoTestingCluster.java     |   1 +
 tajo-dist/src/main/bin/start-tajo.sh            |   3 +
 tajo-dist/src/main/bin/stop-tajo.sh             |   3 +
 tajo-dist/src/main/bin/tajo                     |   9 ++
 tajo-dist/src/main/conf/tajo-env.sh             |  13 +-
 .../pullserver/listener/FileCloseListener.java  |  15 +-
 .../pullserver/listener/FileCloseListener.java  |  15 +-
 .../pullserver/listener/FileCloseListener.java  |  15 +-
 .../tajo/pullserver/PullServerAuxService.java   |   2 +-
 .../apache/tajo/pullserver/TajoPullServer.java  |  73 +++++++++
 .../tajo/pullserver/TajoPullServerService.java  | 154 +++++++++++++++++--
 17 files changed, 372 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 269c307..54d71c0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,9 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-991: Running PullServer on a dedicated JVM process which separates 
from worker.
+    (Hyoungjun Kim)
+
     TAJO-906: Runtime code generation for evaluating expression trees.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 3a86802..4af929e 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -28,10 +28,7 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.SessionVars;
-import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.TajoProtos;
+import org.apache.tajo.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.query.QueryContext;
@@ -50,6 +47,7 @@ import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.worker.TajoWorker;
 
 import java.util.ArrayList;
@@ -183,7 +181,13 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
   }
 
   protected void 
cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> 
executionBlockIds) {
-    LOG.info("cleanup executionBlocks: " + executionBlockIds);
+    StringBuilder cleanupMessage = new StringBuilder();
+    String prefix = "";
+    for (TajoIdProtos.ExecutionBlockIdProto eachEbId: executionBlockIds) {
+      cleanupMessage.append(prefix).append(new 
ExecutionBlockId(eachEbId).toString());
+      prefix = ",";
+    }
+    LOG.info("cleanup executionBlocks: " + cleanupMessage);
     NettyClientBase rpc = null;
     List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
     TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = 
TajoWorkerProtocol.ExecutionBlockListProto.newBuilder();

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index aa22bb8..64475fe 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -153,7 +153,7 @@ public class Fetcher {
       }
 
       this.finishTime = System.currentTimeMillis();
-      LOG.info("Status: " + getState() + ", URI:" + uri);
+      LOG.info("Fetcher finished:" + (finishTime - startTime) + " ms, " + 
getState() + ", URI:" + uri);
       if (timer != null) {
         timer.stop();
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index f76176d..8e6118d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -95,6 +95,8 @@ public class TajoWorker extends CompositeService {
 
   private TajoPullServerService pullService;
 
+  private int pullServerPort;
+
   private boolean yarnContainerMode;
 
   private boolean queryMasterMode;
@@ -205,7 +207,7 @@ public class TajoWorker extends CompositeService {
     addService(tajoWorkerManagerService);
 
     if(!yarnContainerMode) {
-      if(taskRunnerMode) {
+      if(taskRunnerMode && !TajoPullServerService.isStandaloneMode()) {
         pullService = new TajoPullServerService();
         addService(pullService);
       }
@@ -356,7 +358,7 @@ public class TajoWorker extends CompositeService {
         new ConcurrentHashMap<ExecutionBlockId, 
ExecutionBlockSharedResource>();
 
     public QueryMaster getQueryMaster() {
-      if(queryMasterManagerService == null) {
+      if (queryMasterManagerService == null) {
         return null;
       }
       return queryMasterManagerService.getQueryMaster();
@@ -386,24 +388,21 @@ public class TajoWorker extends CompositeService {
       return catalogClient;
     }
 
-    public TajoPullServerService getPullService() {
-      return pullService;
-    }
-
     public int getHttpPort() {
       return httpPort;
     }
 
     public String getWorkerName() {
-      if(queryMasterMode) {
+      if (queryMasterMode) {
         return getQueryMasterManagerService().getHostAndPort();
       } else {
         return getTajoWorkerManagerService().getHostAndPort();
       }
     }
+
     public void stopWorker(boolean force) {
       stop();
-      if(force) {
+      if (force) {
         System.exit(0);
       }
     }
@@ -428,14 +427,14 @@ public class TajoWorker extends CompositeService {
     }
 
     protected void cleanup(String strPath) {
-      if(deletionService == null) return;
+      if (deletionService == null) return;
 
       LocalDirAllocator lDirAllocator = new 
LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
 
       try {
         Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(strPath, 
systemConf);
         FileSystem localFS = FileSystem.getLocal(systemConf);
-        for (Path path : iter){
+        for (Path path : iter) {
           deletionService.delete(localFS.makeQualified(path));
         }
       } catch (IOException e) {
@@ -444,21 +443,21 @@ public class TajoWorker extends CompositeService {
     }
 
     protected void cleanupTemporalDirectories() {
-      if(deletionService == null) return;
+      if (deletionService == null) return;
 
       LocalDirAllocator lDirAllocator = new 
LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
 
       try {
         Iterable<Path> iter = lDirAllocator.getAllLocalPathsToRead(".", 
systemConf);
         FileSystem localFS = FileSystem.getLocal(systemConf);
-        for (Path path : iter){
+        for (Path path : iter) {
           PathData[] items = PathData.expandAsGlob(localFS.makeQualified(new 
Path(path, "*")).toString(), systemConf);
 
           ArrayList<Path> paths = new ArrayList<Path>();
-          for (PathData pd : items){
+          for (PathData pd : items) {
             paths.add(pd.path);
           }
-          if(paths.size() == 0) continue;
+          if (paths.size() == 0) continue;
 
           deletionService.delete(null, paths.toArray(new Path[paths.size()]));
         }
@@ -480,13 +479,13 @@ public class TajoWorker extends CompositeService {
     }
 
     public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary 
clusterResource) {
-      synchronized(numClusterNodes) {
+      synchronized (numClusterNodes) {
         TajoWorker.this.clusterResource = clusterResource;
       }
     }
 
     public TajoMasterProtocol.ClusterResourceSummary getClusterResource() {
-      synchronized(numClusterNodes) {
+      synchronized (numClusterNodes) {
         return TajoWorker.this.clusterResource;
       }
     }
@@ -526,6 +525,52 @@ public class TajoWorker extends CompositeService {
     public HashShuffleAppenderManager getHashShuffleAppenderManager() {
       return hashShuffleAppenderManager;
     }
+
+    public int getPullServerPort() {
+      if (pullService != null) {
+        long startTime = System.currentTimeMillis();
+        while (true) {
+          int pullServerPort = pullService.getPort();
+          if (pullServerPort > 0) {
+            return pullServerPort;
+          }
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+          }
+          if (System.currentTimeMillis() - startTime > 30 * 1000) {
+            LOG.fatal("TajoWorker stopped cause can't get PullServer port.");
+            System.exit(-1);
+          }
+        }
+      } else {
+        if (pullServerPort != 0) {
+          return pullServerPort;
+        } else {
+          loadPullServerPort();
+          return pullServerPort;
+        }
+      }
+    }
+  }
+
+  private void loadPullServerPort() {
+    // get pull server port
+    long startTime = System.currentTimeMillis();
+    while (true) {
+      pullServerPort = TajoPullServerService.readPullServerPort();
+      if (pullServerPort > 0) {
+        break;
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+      if (System.currentTimeMillis() - startTime > 30 * 1000) {
+        LOG.fatal("TajoWorker stopped cause can't get PullServer port.");
+        System.exit(-1);
+      }
+    }
   }
 
   public void stopWorkerForce() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index ec413b2..8009ce3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -28,6 +28,7 @@ import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.util.Pair;
 
@@ -172,16 +173,19 @@ public class TaskRunnerManager extends CompositeService {
           failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond());
           failureIntermediateItems.add(failureBuilder.build());
         }
+
         intermediateBuilder.clear();
+
         intermediateBuilder.setEbId(ebId.getProto())
             
.setHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName()
 + ":" +
-                workerContext.getPullService().getPort())
+                workerContext.getPullServerPort())
             .setTaskId(-1)
             .setAttemptId(-1)
             .setPartId(eachShuffle.getPartId())
             .setVolume(eachShuffle.getVolume())
             .addAllPages(pages)
             .addAllFailures(failureIntermediateItems);
+
         intermediateEntries.add(intermediateBuilder.build());
       }
 
@@ -191,7 +195,11 @@ public class TaskRunnerManager extends CompositeService {
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       reporterBuilder.setReportSuccess(false);
-      reporterBuilder.setReportErrorMessage(e.getMessage());
+      if (e.getMessage() == null) {
+        reporterBuilder.setReportErrorMessage(e.getClass().getSimpleName());
+      } else {
+        reporterBuilder.setReportErrorMessage(e.getMessage());
+      }
     }
     lastTaskRunner.sendExecutionBlockReport(reporterBuilder.build());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index 5ab5b5d..b337754 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
@@ -141,24 +142,6 @@ public class WorkerHeartbeatService extends 
AbstractService {
       LOG.info("Worker Resource Heartbeat Thread start.");
       int sendDiskInfoCount = 0;
       int pullServerPort = 0;
-      if(context.getPullService()!= null) {
-        long startTime = System.currentTimeMillis();
-        while(true) {
-          pullServerPort = context.getPullService().getPort();
-          if(pullServerPort > 0) {
-            break;
-          }
-          //waiting while pull server init
-          try {
-            Thread.sleep(100);
-          } catch (InterruptedException e) {
-          }
-          if(System.currentTimeMillis() - startTime > 30 * 1000) {
-            LOG.fatal("Too long push server init.");
-            System.exit(0);
-          }
-        }
-      }
 
       String hostName = null;
       int peerRpcPort = 0;
@@ -176,9 +159,8 @@ public class WorkerHeartbeatService extends AbstractService 
{
       if(context.getTajoWorkerClientService() != null) {
         clientPort = 
context.getTajoWorkerClientService().getBindAddr().getPort();
       }
-      if (context.getPullService() != null) {
-        pullServerPort = context.getPullService().getPort();
-      }
+
+      pullServerPort = context.getPullServerPort();
 
       while(!stopped.get()) {
         if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java 
b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 346fa69..b07ba96 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -103,6 +103,7 @@ public class TajoTestingCluster {
   }
 
   void initPropertiesAndConfigs() {
+    System.setProperty("TAJO_PULLSERVER_STANDALONE", "false");
     if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) {
       String testResourceManager = 
System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname);
       
Preconditions.checkState(testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-dist/src/main/bin/start-tajo.sh
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/bin/start-tajo.sh 
b/tajo-dist/src/main/bin/start-tajo.sh
index 63b7488..8fcf1a6 100755
--- a/tajo-dist/src/main/bin/start-tajo.sh
+++ b/tajo-dist/src/main/bin/start-tajo.sh
@@ -45,6 +45,9 @@ if [ -f "${TAJO_CONF_DIR}/tajo-env.sh" ]; then
 fi
 
 if [ "$TAJO_WORKER_STANDBY_MODE" = "true" ]; then
+  if [ "$TAJO_PULLSERVER_STANDALONE" = "true" ]; then
+    "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start 
pullserver
+  fi
   "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start worker
   if [ -f "${TAJO_CONF_DIR}/querymasters" ]; then
     "$bin/tajo-daemons.sh" --hosts querymasters cd "$TAJO_HOME" \; 
"$bin/tajo-daemon.sh" start querymaster

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-dist/src/main/bin/stop-tajo.sh
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/bin/stop-tajo.sh 
b/tajo-dist/src/main/bin/stop-tajo.sh
index f50ae3a..770034b 100755
--- a/tajo-dist/src/main/bin/stop-tajo.sh
+++ b/tajo-dist/src/main/bin/stop-tajo.sh
@@ -46,6 +46,9 @@ fi
 
 if [ "$TAJO_WORKER_STANDBY_MODE" = "true" ]; then
   "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop worker
+  if [ "$TAJO_PULLSERVER_STANDALONE" = "true" ]; then
+    "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop 
pullserver
+  fi
   if [ -f "${TAJO_CONF_DIR}/querymasters" ]; then
     "$bin/tajo-daemons.sh" --hosts querymasters cd "$TAJO_HOME" \; 
"$bin/tajo-daemon.sh" stop querymaster
   fi

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-dist/src/main/bin/tajo
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/bin/tajo b/tajo-dist/src/main/bin/tajo
index f579864..3fc2ae0 100755
--- a/tajo-dist/src/main/bin/tajo
+++ b/tajo-dist/src/main/bin/tajo
@@ -64,6 +64,7 @@ if [ $# = 0 ]; then
   echo "where COMMAND is one of:"
   echo "  master               run the Master Server"
   echo "  worker               run the Worker Server"
+  echo "  pullserver           run the Pull Server"
   echo "  catalog              run the Catalog server"
   echo "  catutil              catalog utility"
   echo "  cli                  run the tajo cli"
@@ -121,6 +122,11 @@ if [ "$TAJO_WORKER_HEAPSIZE" != "" ]; then
   JAVA_WORKER_HEAP_MAX="-Xmx""$TAJO_WORKER_HEAPSIZE""m"
   #echo $JAVA_WORKER_HEAP_MAX
 fi
+if [ "$TAJO_PULLSERVER_HEAPSIZE" != "" ]; then
+  #echo "run with heapsize $TAJO_PULLSERVER_HEAPSIZE"
+  JAVA_PULLSERVER_HEAP_MAX="-Xmx""$TAJO_PULLSERVER_HEAPSIZE""m"
+  #echo $JAVA_PULLSERVER_HEAP_MAX
+fi
 if [ "$TAJO_QUERYMASTER_HEAPSIZE" != "" ]; then
   #echo "run with heapsize $TAJO_QUERYMASTER_HEAPSIZE"
   JAVA_QUERYMASTER_HEAP_MAX="-Xmx""$TAJO_QUERYMASTER_HEAPSIZE""m"
@@ -330,6 +336,9 @@ elif [ "$COMMAND" = "master" ] ; then
 elif [ "$COMMAND" = "worker" ] ; then
   CLASS='org.apache.tajo.worker.TajoWorker'
   TAJO_OPTS="$TAJO_OPTS $JAVA_WORKER_HEAP_MAX $TAJO_WORKER_OPTS"
+elif [ "$COMMAND" = "pullserver" ] ; then
+  CLASS='org.apache.tajo.pullserver.TajoPullServer'
+  TAJO_OPTS="$TAJO_OPTS $JAVA_PULLSERVER_HEAP_MAX $TAJO_PULLSERVER_OPTS"
 elif [ "$COMMAND" = "querymaster" ] ; then
   CLASS='org.apache.tajo.worker.TajoWorker'
   TAJO_OPTS="$TAJO_OPTS $JAVA_QUERYMASTER_HEAP_MAX $TAJO_QUERYMASTER_OPTS"

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-dist/src/main/conf/tajo-env.sh
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/conf/tajo-env.sh 
b/tajo-dist/src/main/conf/tajo-env.sh
index 747ee1c..bd14af6 100755
--- a/tajo-dist/src/main/conf/tajo-env.sh
+++ b/tajo-dist/src/main/conf/tajo-env.sh
@@ -37,6 +37,9 @@
 # export TAJO_WORKER_HEAPSIZE=1000
 
 # The maximum amount of heap to use, in MB. Default is 1000.
+# export TAJO_PULLSERVER_HEAPSIZE=1000
+
+# The maximum amount of heap to use, in MB. Default is 1000.
 # export TAJO_QUERYMASTER_HEAPSIZE=1000
 
 # Extra Java runtime options.  Empty by default.
@@ -45,9 +48,12 @@
 # Extra TajoMaster's java runtime options for TajoMaster. Empty by default
 # export TAJO_MASTER_OPTS=
 
-# Extra TajoWorker's java runtime options for TajoMaster. Empty by default
+# Extra TajoWorker's java runtime options. Empty by default
 # export TAJO_WORKER_OPTS=
 
+# Extra TajoPullServer's java runtime options. Empty by default
+# export TAJO_PULLSERVER_OPTS=
+
 # Extra  QueryMaster mode TajoWorker's java runtime options for TajoMaster. 
Empty by default
 # export TAJO_QUERYMASTER_OPTS=
 
@@ -68,4 +74,7 @@ export TAJO_WORKER_STANDBY_MODE=true
 
 # It must be required to use HCatalogStore
 # export HIVE_HOME=
-# export HIVE_JDBC_DRIVER_DIR=
\ No newline at end of file
+# export HIVE_JDBC_DRIVER_DIR=
+
+# Tajo PullServer mode. the default mode is standalone mode
+export TAJO_PULLSERVER_STANDALONE=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
----------------------------------------------------------------------
diff --git 
a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
 
b/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
index 0933167..2e36644 100644
--- 
a/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
+++ 
b/tajo-yarn-pullserver/src/main/hadoop-2.2.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
@@ -21,14 +21,24 @@ package org.apache.tajo.pullserver.listener;
 import org.apache.hadoop.mapred.FadvisedFileRegion;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
+import org.apache.tajo.pullserver.TajoPullServerService;
 
 @Deprecated
 public class FileCloseListener implements ChannelFutureListener {
 
   private FadvisedFileRegion filePart;
+  private String requestUri;
+  private TajoPullServerService pullServerService;
+  private long startTime;
 
-  public FileCloseListener(FadvisedFileRegion filePart) {
+  public FileCloseListener(FadvisedFileRegion filePart,
+                           String requestUri,
+                           long startTime,
+                           TajoPullServerService pullServerService) {
     this.filePart = filePart;
+    this.requestUri = requestUri;
+    this.pullServerService = pullServerService;
+    this.startTime = startTime;
   }
 
   // TODO error handling; distinguish IO/connection failures,
@@ -36,5 +46,8 @@ public class FileCloseListener implements 
ChannelFutureListener {
   @Override
   public void operationComplete(ChannelFuture future) {
     filePart.releaseExternalResources();
+    if (pullServerService != null) {
+      pullServerService.completeFileChunk(filePart, requestUri, startTime);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
----------------------------------------------------------------------
diff --git 
a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
 
b/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
index 5b2d1b3..be599c3 100644
--- 
a/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
+++ 
b/tajo-yarn-pullserver/src/main/hadoop-2.3.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
@@ -19,15 +19,25 @@
 package org.apache.tajo.pullserver.listener;
 
 import org.apache.hadoop.mapred.FadvisedFileRegion;
+import org.apache.tajo.pullserver.TajoPullServerService;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
 
 public class FileCloseListener implements ChannelFutureListener {
 
   private FadvisedFileRegion filePart;
+  private String requestUri;
+  private TajoPullServerService pullServerService;
+  private long startTime;
 
-  public FileCloseListener(FadvisedFileRegion filePart) {
+  public FileCloseListener(FadvisedFileRegion filePart,
+                           String requestUri,
+                           long startTime,
+                           TajoPullServerService pullServerService) {
     this.filePart = filePart;
+    this.requestUri = requestUri;
+    this.pullServerService = pullServerService;
+    this.startTime = startTime;
   }
 
   // TODO error handling; distinguish IO/connection failures,
@@ -38,5 +48,8 @@ public class FileCloseListener implements 
ChannelFutureListener {
       filePart.transferSuccessful();
     }
     filePart.releaseExternalResources();
+    if (pullServerService != null) {
+      pullServerService.completeFileChunk(filePart, requestUri, startTime);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
----------------------------------------------------------------------
diff --git 
a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
 
b/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
index 5b2d1b3..7d4ca3a 100644
--- 
a/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
+++ 
b/tajo-yarn-pullserver/src/main/hadoop-2.4.0/org/apache/tajo/pullserver/listener/FileCloseListener.java
@@ -21,13 +21,23 @@ package org.apache.tajo.pullserver.listener;
 import org.apache.hadoop.mapred.FadvisedFileRegion;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
+import org.apache.tajo.pullserver.TajoPullServerService;
 
 public class FileCloseListener implements ChannelFutureListener {
 
   private FadvisedFileRegion filePart;
+  private String requestUri;
+  private TajoPullServerService pullServerService;
+  private long startTime;
 
-  public FileCloseListener(FadvisedFileRegion filePart) {
+  public FileCloseListener(FadvisedFileRegion filePart,
+                           String requestUri,
+                           long startTime,
+                           TajoPullServerService pullServerService) {
     this.filePart = filePart;
+    this.requestUri = requestUri;
+    this.pullServerService = pullServerService;
+    this.startTime = startTime;
   }
 
   // TODO error handling; distinguish IO/connection failures,
@@ -38,5 +48,8 @@ public class FileCloseListener implements 
ChannelFutureListener {
       filePart.transferSuccessful();
     }
     filePart.releaseExternalResources();
+    if (pullServerService != null) {
+      pullServerService.completeFileChunk(filePart, requestUri, startTime);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git 
a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
index dd3bee3..e6e7ce3 100644
--- 
a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
@@ -493,7 +493,7 @@ public class PullServerAuxService extends AuxiliaryService {
             file.startOffset, file.length(), manageOsCache, readaheadLength,
             readaheadPool, file.getFile().getAbsolutePath());
         writeFuture = ch.write(partition);
-        writeFuture.addListener(new FileCloseListener(partition));
+        writeFuture.addListener(new FileCloseListener(partition, null, 0, 
null));
       } else {
         // HTTPS cannot be done with zero copy.
         final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
----------------------------------------------------------------------
diff --git 
a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
new file mode 100644
index 0000000..7d7065e
--- /dev/null
+++ 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.PullServerAuxService.PullServer;
+import org.apache.tajo.util.StringUtils;
+
+public class TajoPullServer extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(TajoPullServer.class);
+
+  private TajoPullServerService pullService;
+  private TajoConf systemConf;
+
+  public TajoPullServer() {
+    super(TajoPullServer.class.getName());
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.systemConf = (TajoConf)conf;
+    pullService = new TajoPullServerService();
+    addService(pullService);
+
+    super.init(conf);
+  }
+
+  public void startPullServer(TajoConf systemConf) {
+    init(systemConf);
+    start();
+  }
+
+  public void start() {
+    super.start();
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    StringUtils.startupShutdownMessage(PullServer.class, args, LOG);
+
+    if (!TajoPullServerService.isStandaloneMode()) {
+      LOG.fatal("TAJO_PULLSERVER_STANDALONE env variable is not 'true'");
+      return;
+    }
+
+    TajoConf tajoConf = new TajoConf();
+    tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
+
+    (new TajoPullServer()).startPullServer(tajoConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c5ef87f/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git 
a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index e68e351..150ac85 100644
--- 
a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.mapred.FadvisedChunkedFile;
+import org.apache.hadoop.mapred.FadvisedFileRegion;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
@@ -63,10 +64,7 @@ import org.jboss.netty.handler.ssl.SslHandler;
 import org.jboss.netty.handler.stream.ChunkedWriteHandler;
 import org.jboss.netty.util.CharsetUtil;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
+import java.io.*;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
@@ -76,6 +74,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
 import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
@@ -245,6 +244,72 @@ public class TajoPullServerService extends AbstractService 
{
 
     sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
                                     DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+
+    if (isStandaloneMode()) {
+      File pullServerPortFile = getPullServerPortFile();
+      if (pullServerPortFile.exists()) {
+        pullServerPortFile.delete();
+      }
+      pullServerPortFile.getParentFile().mkdirs();
+      LOG.info("Write PullServerPort to " + pullServerPortFile);
+      try {
+        FileOutputStream out = new FileOutputStream(pullServerPortFile);
+        out.write(("" + port).getBytes());
+        out.close();
+      } catch (Exception e) {
+        LOG.fatal("PullServer exists cause can't write PullServer port to " + 
pullServerPortFile +
+            ", " + e.getMessage(), e);
+        System.exit(-1);
+      }
+    }
+    LOG.info("TajoPullServerService started: port=" + port);
+  }
+
+  private static File getPullServerPortFile() {
+    String pullServerPortInfoFile = System.getenv("TAJO_PID_DIR");
+    if (pullServerPortInfoFile == null || pullServerPortInfoFile.isEmpty()) {
+      pullServerPortInfoFile = "/tmp";
+    }
+
+    return new File(pullServerPortInfoFile + "/pullserver.port");
+  }
+
+  public static boolean isStandaloneMode() {
+    String mode = System.getenv("TAJO_PULLSERVER_STANDALONE");
+    if (mode == null || mode.trim().isEmpty()) {
+      mode = System.getProperty("TAJO_PULLSERVER_STANDALONE");
+    }
+
+    if (mode == null || mode.trim().isEmpty()) {
+      return true;
+    } else {
+      return mode.equalsIgnoreCase("true");
+    }
+  }
+
+  public static int readPullServerPort() {
+    FileInputStream in = null;
+    try {
+      File pullServerPortFile = getPullServerPortFile();
+
+      if (!pullServerPortFile.exists() || pullServerPortFile.isDirectory()) {
+        return -1;
+      }
+      in = new FileInputStream(pullServerPortFile);
+      byte[] buf = new byte[1024];
+      int readBytes = in.read(buf);
+      return Integer.parseInt(new String(buf, 0, readBytes));
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      return -1;
+    } finally {
+      if (in != null) {
+        try {
+          in.close();
+        } catch (IOException e) {
+        }
+      }
+    }
   }
 
   public int getPort() {
@@ -317,6 +382,61 @@ public class TajoPullServerService extends AbstractService 
{
     }
   }
 
+
+  Map<String, ProcessingStatus> processingStatusMap = new 
ConcurrentHashMap<String, ProcessingStatus>();
+
+  public void completeFileChunk(FadvisedFileRegion filePart,
+                                   String requestUri,
+                                   long startTime) {
+    ProcessingStatus status = processingStatusMap.get(requestUri);
+    if (status != null) {
+      status.decrementRemainFiles(filePart, startTime);
+    }
+  }
+
+  class ProcessingStatus {
+    String requestUri;
+    int numFiles;
+    AtomicInteger remainFiles;
+    long startTime;
+    long makeFileListTime;
+    long minTime = Long.MAX_VALUE;
+    long maxTime;
+    int numSlowFile;
+
+    public ProcessingStatus(String requestUri) {
+      this.requestUri = requestUri;
+      this.startTime = System.currentTimeMillis();
+    }
+
+    public void setNumFiles(int numFiles) {
+      this.numFiles = numFiles;
+      this.remainFiles = new AtomicInteger(numFiles);
+    }
+    public void decrementRemainFiles(FadvisedFileRegion filePart, long 
fileStartTime) {
+      synchronized(remainFiles) {
+        long fileSendTime = System.currentTimeMillis() - fileStartTime;
+        if (fileSendTime > 20 * 1000) {
+          LOG.info("PullServer send too long time: filePos=" + 
filePart.getPosition() + ", fileLen=" + filePart.getCount());
+          numSlowFile++;
+        }
+        if (fileSendTime > maxTime) {
+          maxTime = fileSendTime;
+        }
+        if (fileSendTime < minTime) {
+          minTime = fileSendTime;
+        }
+        int remain = remainFiles.decrementAndGet();
+        if (remain <= 0) {
+          processingStatusMap.remove(requestUri);
+          LOG.info("PullServer processing status: totalTime=" + 
(System.currentTimeMillis() - startTime) + " ms, " +
+              "makeFileListTime=" + makeFileListTime + " ms, minTime=" + 
minTime + " ms, maxTime=" + maxTime + " ms, " +
+              "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
+        }
+      }
+    }
+  }
+
   class PullServer extends SimpleChannelUpstreamHandler {
 
     private final Configuration conf;
@@ -370,6 +490,10 @@ public class TajoPullServerService extends AbstractService 
{
         return;
       }
 
+      ProcessingStatus processingStatus = new 
ProcessingStatus(request.getUri().toString());
+      synchronized(processingStatusMap) {
+        processingStatusMap.put(request.getUri().toString(), processingStatus);
+      }
       // Parsing the URL into key-values
       final Map<String, List<String>> params =
           new QueryStringDecoder(request.getUri()).getParameters();
@@ -407,13 +531,15 @@ public class TajoPullServerService extends 
AbstractService {
 
       List<String> taskIds = splitMaps(taskIdList);
 
-      LOG.info("PullServer request param: shuffleType=" + shuffleType +
-          ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList);
-
-      // the working dir of tajo worker for each query
       String queryBaseDir = queryId.toString() + "/output";
 
-      LOG.info("PullServer baseDir: " + 
conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("PullServer request param: shuffleType=" + shuffleType +
+            ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList);
+
+        // the working dir of tajo worker for each query
+        LOG.debug("PullServer baseDir: " + 
conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
+      }
 
       final List<FileChunk> chunks = Lists.newArrayList();
 
@@ -474,6 +600,8 @@ public class TajoPullServerService extends AbstractService {
         return;
       }
 
+      processingStatus.setNumFiles(chunks.size());
+      processingStatus.makeFileListTime = System.currentTimeMillis() - 
processingStatus.startTime;
       // Write the content.
       Channel ch = e.getChannel();
       if (chunks.size() == 0) {
@@ -497,7 +625,7 @@ public class TajoPullServerService extends AbstractService {
         ChannelFuture writeFuture = null;
 
         for (FileChunk chunk : file) {
-          writeFuture = sendFile(ctx, ch, chunk);
+          writeFuture = sendFile(ctx, ch, chunk, request.getUri().toString());
           if (writeFuture == null) {
             sendError(ctx, NOT_FOUND);
             return;
@@ -514,7 +642,9 @@ public class TajoPullServerService extends AbstractService {
 
     private ChannelFuture sendFile(ChannelHandlerContext ctx,
                                    Channel ch,
-                                   FileChunk file) throws IOException {
+                                   FileChunk file,
+                                   String requestUri) throws IOException {
+      long startTime = System.currentTimeMillis();
       RandomAccessFile spill = null;
       ChannelFuture writeFuture;
       try {
@@ -524,7 +654,7 @@ public class TajoPullServerService extends AbstractService {
               file.startOffset, file.length(), manageOsCache, readaheadLength,
               readaheadPool, file.getFile().getAbsolutePath());
           writeFuture = ch.write(filePart);
-          writeFuture.addListener(new FileCloseListener(filePart));
+          writeFuture.addListener(new FileCloseListener(filePart, requestUri, 
startTime, TajoPullServerService.this));
         } else {
           // HTTPS cannot be done with zero copy.
           final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,

Reply via email to