Repository: tajo
Updated Branches:
  refs/heads/master 4841c656f -> 62534a877


TAJO-1695: Shuffle fetcher executor should be consider random writing. (jinho)

Closes #639


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/62534a87
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/62534a87
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/62534a87

Branch: refs/heads/master
Commit: 62534a8775ede8949b4b738b99807d4f7a91bd03
Parents: 4841c65
Author: Jinho Kim <[email protected]>
Authored: Wed Jul 22 13:14:49 2015 +0900
Committer: Jinho Kim <[email protected]>
Committed: Wed Jul 22 13:14:49 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../java/org/apache/tajo/conf/TajoConf.java     |  2 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  3 +-
 .../org/apache/tajo/worker/TaskContainer.java   |  8 ++++--
 .../org/apache/tajo/worker/TaskExecutor.java    | 29 ++++++++++----------
 .../java/org/apache/tajo/worker/TaskImpl.java   |  7 ++---
 .../apache/tajo/querymaster/TestKillQuery.java  |  2 +-
 .../apache/tajo/worker/MockTaskExecutor.java    |  3 +-
 .../apache/tajo/worker/TestTaskExecutor.java    |  3 +-
 9 files changed, 33 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/62534a87/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c0a5e38..cbef908 100644
--- a/CHANGES
+++ b/CHANGES
@@ -29,6 +29,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1695: Shuffle fetcher executor should be consider random writing.
+    (jinho)
+
     TAJO-1397: Resource allocation should be fine grained. (jinho)
 
     TAJO-1352: Improve the join order algorithm to consider missed cases of 

http://git-wip-us.apache.org/repos/asf/tajo/blob/62534a87/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java 
b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 0436116..b876737 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -208,7 +208,7 @@ public class TajoConf extends Configuration {
     SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, 
Validators.bool()),
     SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"),
     
SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",
-        Runtime.getRuntime().availableProcessors() * 2, Validators.min("1")),
+        2, Validators.min("1")),
     SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size",  
8192),
     SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 120),
     
SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 
20),

http://git-wip-us.apache.org/repos/asf/tajo/blob/62534a87/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 66216ee..19d5da4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -22,12 +22,13 @@ import org.apache.tajo.ResourceProtos.TaskStatusProto;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 public interface Task {
 
   void init() throws IOException;
 
-  void fetch();
+  void fetch(ExecutorService fetcherExecutor);
 
   void run() throws Exception;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/62534a87/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
index 761bf52..f717c07 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
@@ -21,6 +21,8 @@ package org.apache.tajo.worker;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * The driver class for Tajo Task processing.
  */
@@ -29,10 +31,12 @@ public class TaskContainer implements Runnable {
 
   private final TaskExecutor executor;
   private final int sequenceId;
+  private final ExecutorService fetchExecutor;
 
-  public TaskContainer(int sequenceId, TaskExecutor executor) {
+  public TaskContainer(int sequenceId, TaskExecutor executor, ExecutorService 
fetchExecutor) {
     this.sequenceId = sequenceId;
     this.executor = executor;
+    this.fetchExecutor = fetchExecutor;
   }
 
   @Override
@@ -56,7 +60,7 @@ public class TaskContainer implements Runnable {
         task.init();
 
         if (task.hasFetchPhase()) {
-          task.fetch(); // The fetch is performed in an asynchronous way.
+          task.fetch(fetchExecutor); // The fetch is performed in an 
asynchronous way.
         }
 
         if (!taskAttemptContext.isStopped()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/62534a87/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
index 5e1ccc1..eef4a2d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.worker;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
@@ -25,19 +26,19 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ResourceProtos.TaskRequestProto;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.TaskRequestImpl;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.resource.NodeResource;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
 import org.apache.tajo.worker.event.NodeResourceEvent;
 import org.apache.tajo.worker.event.TaskStartEvent;
-import org.apache.tajo.ResourceProtos.TaskRequestProto;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
@@ -55,7 +56,7 @@ public class TaskExecutor extends AbstractService implements 
EventHandler<TaskSt
   private final Map<TaskAttemptId, NodeResource> allocatedResourceMap;
   private final BlockingQueue<Task> taskQueue;
   private final AtomicInteger runningTasks;
-  private ExecutorService fetcherThreadPool;
+  private List<ExecutorService> fetcherThreadPoolList;
   private ExecutorService threadPool;
   private TajoConf tajoConf;
   private volatile boolean isStopped;
@@ -66,6 +67,7 @@ public class TaskExecutor extends AbstractService implements 
EventHandler<TaskSt
     this.allocatedResourceMap = Maps.newConcurrentMap();
     this.runningTasks = new AtomicInteger();
     this.taskQueue = new LinkedBlockingQueue<Task>();
+    this.fetcherThreadPoolList = Lists.newArrayList();
   }
 
   @Override
@@ -82,12 +84,12 @@ public class TaskExecutor extends AbstractService 
implements EventHandler<TaskSt
         new ThreadFactoryBuilder().setNameFormat("Task executor #%d").build());
 
     int maxFetcherThreads = 
tajoConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM);
-    this.fetcherThreadPool = Executors.newFixedThreadPool(nThreads,
-        new ThreadFactoryBuilder().setNameFormat("Fetcher executor 
#%d").build());
-
-
     for (int i = 0; i < nThreads; i++) {
-      threadPool.submit(new TaskContainer(i, this));
+      ExecutorService fetcherThreadPool = 
Executors.newFixedThreadPool(maxFetcherThreads,
+          new ThreadFactoryBuilder().setNameFormat("TaskContainer[" + i + "] 
fetcher executor #%d").build());
+
+      threadPool.submit(new TaskContainer(i, this, fetcherThreadPool));
+      fetcherThreadPoolList.add(fetcherThreadPool);
     }
 
     super.serviceStart();
@@ -99,7 +101,9 @@ public class TaskExecutor extends AbstractService implements 
EventHandler<TaskSt
     isStopped = true;
 
     threadPool.shutdown();
-    fetcherThreadPool.shutdown();
+    for (ExecutorService fetcherThreadPool : fetcherThreadPoolList) {
+      fetcherThreadPool.shutdown();
+    }
     super.serviceStop();
   }
 
@@ -139,11 +143,6 @@ public class TaskExecutor extends AbstractService 
implements EventHandler<TaskSt
     }
   }
 
-  protected ExecutorService getFetcherExecutor() {
-    return fetcherThreadPool;
-  }
-
-
   protected Task createTask(ExecutionBlockContext executionBlockContext,
                             TaskRequestProto taskRequest) throws IOException {
     Task task = null;
@@ -153,7 +152,7 @@ public class TaskExecutor extends AbstractService 
implements EventHandler<TaskSt
       LOG.error(errorMessage);
       executionBlockContext.fatalError(taskAttemptId, errorMessage);
     } else {
-      task = new TaskImpl(new TaskRequestImpl(taskRequest), 
executionBlockContext, getFetcherExecutor());
+      task = new TaskImpl(new TaskRequestImpl(taskRequest), 
executionBlockContext);
       executionBlockContext.getTasks().put(task.getTaskContext().getTaskId(), 
task);
     }
     return task;

http://git-wip-us.apache.org/repos/asf/tajo/blob/62534a87/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index d77c583..82ea479 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -79,7 +79,6 @@ public class TaskImpl implements Task {
   private final TaskRequest request;
   private final Map<String, TableDesc> descs;
   private final TableStats inputStats;
-  private final ExecutorService fetcherExecutor;
   private final Path taskDir;
 
   private final TaskAttemptContext context;
@@ -101,8 +100,7 @@ public class TaskImpl implements Task {
   private TupleComparator sortComp = null;
 
   public TaskImpl(final TaskRequest request,
-                  final ExecutionBlockContext executionBlockContext,
-                  final ExecutorService fetcherExecutor) throws IOException {
+                  final ExecutionBlockContext executionBlockContext) throws 
IOException {
 
     this.request = request;
     this.executionBlockContext = executionBlockContext;
@@ -110,7 +108,6 @@ public class TaskImpl implements Task {
     this.queryContext = request.getQueryContext(systemConf);
     this.inputStats = new TableStats();
     this.fetcherRunners = Lists.newArrayList();
-    this.fetcherExecutor = fetcherExecutor;
     this.descs = Maps.newHashMap();
 
     Path baseDirPath = executionBlockContext.createBaseDir();
@@ -254,7 +251,7 @@ public class TaskImpl implements Task {
   }
 
   @Override
-  public void fetch() {
+  public void fetch(ExecutorService fetcherExecutor) {
     for (Fetcher f : fetcherRunners) {
       fetcherExecutor.submit(new FetchRunner(context, f));
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/62534a87/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java 
b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 3e8e230..8738dba 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -280,7 +280,7 @@ public class TestKillQuery {
       }
     };
 
-    org.apache.tajo.worker.Task task = new TaskImpl(taskRequest, context, 
null);
+    org.apache.tajo.worker.Task task = new TaskImpl(taskRequest, context);
     task.kill();
     assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, 
task.getTaskContext().getState());
     try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/62534a87/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
index 2340551..071d26a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockTaskExecutor.java
@@ -27,6 +27,7 @@ import org.apache.tajo.worker.event.TaskStartEvent;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 
 public class MockTaskExecutor extends TaskExecutor {
@@ -70,7 +71,7 @@ public class MockTaskExecutor extends TaskExecutor {
       }
 
       @Override
-      public void fetch() {
+      public void fetch(ExecutorService executorService) {
 
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/62534a87/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
index 9b6af68..c0f5bb2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestTaskExecutor.java
@@ -35,6 +35,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -223,7 +224,7 @@ public class TestTaskExecutor {
         }
 
         @Override
-        public void fetch() {
+        public void fetch(ExecutorService fetchExecutor) {
           try {
             Thread.sleep(50);
           } catch (InterruptedException e) {

Reply via email to