Repository: tajo
Updated Branches:
  refs/heads/master 64416dea6 -> 15450e868


http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 8009ce3..5eb66b8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -18,38 +18,45 @@
 
 package org.apache.tajo.worker;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.Pair;
+import org.apache.tajo.engine.utils.TupleCache;
+import org.apache.tajo.worker.event.TaskRunnerEvent;
+import org.apache.tajo.worker.event.TaskRunnerStartEvent;
+import org.apache.tajo.worker.event.TaskRunnerStopEvent;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.*;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
-public class TaskRunnerManager extends CompositeService {
+public class TaskRunnerManager extends CompositeService implements 
EventHandler<TaskRunnerEvent> {
   private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
 
-  private final Map<String, TaskRunner> taskRunnerMap = new HashMap<String, 
TaskRunner>();
-  private final Map<String, TaskRunnerHistory> taskRunnerHistoryMap = 
Maps.newConcurrentMap();
+  private final ConcurrentMap<ExecutionBlockId, ExecutionBlockContext> 
executionBlockContextMap = Maps.newConcurrentMap();
+  private final ConcurrentMap<String, TaskRunner> taskRunnerMap = 
Maps.newConcurrentMap();
+  private final ConcurrentMap<String, TaskRunnerHistory> taskRunnerHistoryMap 
= Maps.newConcurrentMap();
   private TajoWorker.WorkerContext workerContext;
   private TajoConf tajoConf;
   private AtomicBoolean stop = new AtomicBoolean(false);
   private FinishedTaskCleanThread finishedTaskCleanThread;
+  private Dispatcher dispatcher;
 
-  public TaskRunnerManager(TajoWorker.WorkerContext workerContext) {
+  public TaskRunnerManager(TajoWorker.WorkerContext workerContext, Dispatcher 
dispatcher) {
     super(TaskRunnerManager.class.getName());
 
     this.workerContext = workerContext;
+    this.dispatcher = dispatcher;
   }
 
   public TajoWorker.WorkerContext getWorkerContext() {
@@ -58,7 +65,9 @@ public class TaskRunnerManager extends CompositeService {
 
   @Override
   public void init(Configuration conf) {
+    Preconditions.checkArgument(conf instanceof TajoConf);
     tajoConf = (TajoConf)conf;
+    dispatcher.register(TaskRunnerEvent.EventType.class, this);
     super.init(tajoConf);
   }
 
@@ -82,6 +91,9 @@ public class TaskRunnerManager extends CompositeService {
         }
       }
     }
+    for(ExecutionBlockContext context: executionBlockContextMap.values()) {
+      context.stop();
+    }
 
     if(finishedTaskCleanThread != null) {
       finishedTaskCleanThread.interrupted();
@@ -92,148 +104,35 @@ public class TaskRunnerManager extends CompositeService {
     }
   }
 
-  public void stopTask(String id) {
+  public void stopTaskRunner(String id) {
     LOG.info("Stop Task:" + id);
-    synchronized(taskRunnerMap) {
-      TaskRunner taskRunner = taskRunnerMap.remove(id);
-      if (taskRunner != null) {
-        synchronized(taskRunnerCompleteCounter) {
-          ExecutionBlockId ebId = 
taskRunner.getContext().getExecutionBlockId();
-          AtomicInteger ebSuccessedTaskNums = successedTaskNums.get(ebId);
-          if (ebSuccessedTaskNums == null) {
-            ebSuccessedTaskNums = new 
AtomicInteger(taskRunner.getContext().succeededTasksNum.get());
-            successedTaskNums.put(ebId, ebSuccessedTaskNums);
-          } else {
-            
ebSuccessedTaskNums.addAndGet(taskRunner.getContext().succeededTasksNum.get());
-          }
-
-          Pair<AtomicInteger, AtomicInteger> counter = 
taskRunnerCompleteCounter.get(ebId);
-
-          if (counter != null) {
-            if (counter.getSecond().decrementAndGet() <= 0) {
-              LOG.info(ebId + "'s all tasks are completed.");
-              try {
-                closeExecutionBlock(ebId, ebSuccessedTaskNums.get(), 
taskRunner);
-              } catch (Exception e) {
-                LOG.error(ebId + ", closing error:" + e.getMessage(), e);
-              }
-              successedTaskNums.remove(ebId);
-              taskRunnerCompleteCounter.remove(ebId);
-            }
-          }
-        }
-      }
-    }
+    TaskRunner taskRunner = taskRunnerMap.remove(id);
+    taskRunner.stop();
     if(workerContext.isYarnContainerMode()) {
       stop();
     }
   }
 
-  private void closeExecutionBlock(ExecutionBlockId ebId, int succeededTasks, 
TaskRunner lastTaskRunner) throws Exception {
-    TajoWorkerProtocol.ExecutionBlockReport.Builder reporterBuilder =
-        TajoWorkerProtocol.ExecutionBlockReport.newBuilder();
-    reporterBuilder.setEbId(ebId.getProto());
-    reporterBuilder.setReportSuccess(true);
-    reporterBuilder.setSucceededTasks(succeededTasks);
-    try {
-      List<TajoWorkerProtocol.IntermediateEntryProto> intermediateEntries =
-          new ArrayList<TajoWorkerProtocol.IntermediateEntryProto>();
-      List<HashShuffleAppenderManager.HashShuffleIntermediate> shuffles =
-          workerContext.getHashShuffleAppenderManager().close(ebId);
-      if (shuffles == null) {
-        reporterBuilder.addAllIntermediateEntries(intermediateEntries);
-        lastTaskRunner.sendExecutionBlockReport(reporterBuilder.build());
-        return;
-      }
-
-      TajoWorkerProtocol.IntermediateEntryProto.Builder intermediateBuilder =
-          TajoWorkerProtocol.IntermediateEntryProto.newBuilder();
-      TajoWorkerProtocol.IntermediateEntryProto.PageProto.Builder pageBuilder =
-          TajoWorkerProtocol.IntermediateEntryProto.PageProto.newBuilder();
-      TajoWorkerProtocol.FailureIntermediateProto.Builder failureBuilder =
-          TajoWorkerProtocol.FailureIntermediateProto.newBuilder();
-
-      for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: 
shuffles) {
-        List<TajoWorkerProtocol.IntermediateEntryProto.PageProto> pages =
-            new 
ArrayList<TajoWorkerProtocol.IntermediateEntryProto.PageProto>();
-        List<TajoWorkerProtocol.FailureIntermediateProto> 
failureIntermediateItems =
-            new ArrayList<TajoWorkerProtocol.FailureIntermediateProto>();
-
-        for (Pair<Long, Integer> eachPage: eachShuffle.getPages()) {
-          pageBuilder.clear();
-          pageBuilder.setPos(eachPage.getFirst());
-          pageBuilder.setLength(eachPage.getSecond());
-          pages.add(pageBuilder.build());
-        }
-
-        for(Pair<Long, Pair<Integer, Integer>> eachFailure: 
eachShuffle.getFailureTskTupleIndexes()) {
-          failureBuilder.clear();
-          failureBuilder.setPagePos(eachFailure.getFirst());
-          failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst());
-          failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond());
-          failureIntermediateItems.add(failureBuilder.build());
-        }
-
-        intermediateBuilder.clear();
-
-        intermediateBuilder.setEbId(ebId.getProto())
-            
.setHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName()
 + ":" +
-                workerContext.getPullServerPort())
-            .setTaskId(-1)
-            .setAttemptId(-1)
-            .setPartId(eachShuffle.getPartId())
-            .setVolume(eachShuffle.getVolume())
-            .addAllPages(pages)
-            .addAllFailures(failureIntermediateItems);
-
-        intermediateEntries.add(intermediateBuilder.build());
-      }
-
-      // send intermediateEntries to QueryMaster
-      reporterBuilder.addAllIntermediateEntries(intermediateEntries);
-
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      reporterBuilder.setReportSuccess(false);
-      if (e.getMessage() == null) {
-        reporterBuilder.setReportErrorMessage(e.getClass().getSimpleName());
-      } else {
-        reporterBuilder.setReportErrorMessage(e.getMessage());
-      }
-    }
-    lastTaskRunner.sendExecutionBlockReport(reporterBuilder.build());
-  }
-
   public Collection<TaskRunner> getTaskRunners() {
-    synchronized(taskRunnerMap) {
-      return Collections.unmodifiableCollection(taskRunnerMap.values());
-    }
+    return Collections.unmodifiableCollection(taskRunnerMap.values());
   }
 
   public Collection<TaskRunnerHistory> getExecutionBlockHistories() {
-    synchronized(taskRunnerHistoryMap) {
-      return Collections.unmodifiableCollection(taskRunnerHistoryMap.values());
-    }
+    return Collections.unmodifiableCollection(taskRunnerHistoryMap.values());
   }
 
   public TaskRunnerHistory getExcutionBlockHistoryByTaskRunnerId(String 
taskRunnerId) {
-    synchronized(taskRunnerHistoryMap) {
-      return taskRunnerHistoryMap.get(taskRunnerId);
-    }
+    return taskRunnerHistoryMap.get(taskRunnerId);
   }
 
   public TaskRunner getTaskRunner(String taskRunnerId) {
-    synchronized(taskRunnerMap) {
-      return taskRunnerMap.get(taskRunnerId);
-    }
+    return taskRunnerMap.get(taskRunnerId);
   }
 
-  public Task getTaskByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
-    synchronized(taskRunnerMap) {
-      for (TaskRunner eachTaskRunner: taskRunnerMap.values()) {
-        Task task = eachTaskRunner.getContext().getTask(quAttemptId);
-        if (task != null) return task;
-      }
+  public Task getTaskByQueryUnitAttemptId(QueryUnitAttemptId 
queryUnitAttemptId) {
+    ExecutionBlockContext context = 
executionBlockContextMap.get(queryUnitAttemptId.getQueryUnitId().getExecutionBlockId());
+    if (context != null) {
+      return context.getTask(queryUnitAttemptId);
     }
     return null;
   }
@@ -250,53 +149,61 @@ public class TaskRunnerManager extends CompositeService {
   }
 
   public int getNumTasks() {
-    synchronized(taskRunnerMap) {
-      return taskRunnerMap.size();
-    }
+    return taskRunnerMap.size();
   }
 
-  //<# tasks, # running tasks>
-  Map<ExecutionBlockId, Pair<AtomicInteger, AtomicInteger>> 
taskRunnerCompleteCounter =
-      new HashMap<ExecutionBlockId, Pair<AtomicInteger, AtomicInteger>>();
-
-  Map<ExecutionBlockId, AtomicInteger> successedTaskNums = new 
HashMap<ExecutionBlockId, AtomicInteger>();
-
-  public void startTask(final String[] params) {
-    //TODO change to use event dispatcher
-    Thread t = new Thread() {
-      public void run() {
+  @Override
+  public void handle(TaskRunnerEvent event) {
+    LOG.info("======================== Processing " + 
event.getExecutionBlockId() + " of type " + event.getType());
+    if (event instanceof TaskRunnerStartEvent) {
+      TaskRunnerStartEvent startEvent = (TaskRunnerStartEvent) event;
+      ExecutionBlockContext context = 
executionBlockContextMap.get(event.getExecutionBlockId());
+      String[] params = startEvent.getParams();
+      if(context == null){
         try {
-          TajoConf systemConf = new TajoConf(tajoConf);
-          TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, 
systemConf, params);
-          LOG.info("Start TaskRunner:" + taskRunner.getId());
-          synchronized(taskRunnerMap) {
-            taskRunnerMap.put(taskRunner.getId(), taskRunner);
-          }
+          // QueryMaster's address
+          String host = params[4];
+          int port = Integer.parseInt(params[5]);
+
+          context = new ExecutionBlockContext(this, startEvent, new 
InetSocketAddress(host, port));
+        } catch (Throwable e) {
+          LOG.fatal(e.getMessage(), e);
+          throw new RuntimeException(e);
+        }
+        executionBlockContextMap.put(event.getExecutionBlockId(), context);
+      }
 
-          synchronized (taskRunnerHistoryMap){
-            taskRunnerHistoryMap.put(taskRunner.getId(), 
taskRunner.getContext().getExcutionBlockHistory());
-          }
+      TaskRunner taskRunner = new TaskRunner(context, params);
+      LOG.info("Start TaskRunner:" + taskRunner.getId());
+      taskRunnerMap.put(taskRunner.getId(), taskRunner);
+      taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory());
 
-          synchronized(taskRunnerCompleteCounter) {
-            ExecutionBlockId ebId = 
taskRunner.getContext().getExecutionBlockId();
-            Pair<AtomicInteger, AtomicInteger> counter = 
taskRunnerCompleteCounter.get(ebId);
-            if (counter == null) {
-              counter = new Pair(new AtomicInteger(0), new AtomicInteger(0));
-              taskRunnerCompleteCounter.put(ebId, counter);
-            }
-            counter.getFirst().incrementAndGet();
-            counter.getSecond().incrementAndGet();
-          }
-          taskRunner.init(systemConf);
-          taskRunner.start();
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-          throw new RuntimeException(e.getMessage(), e);
+      taskRunner.init(context.getConf());
+      taskRunner.start();
+
+    } else if (event instanceof TaskRunnerStopEvent) {
+      ExecutionBlockContext executionBlockContext =  
executionBlockContextMap.remove(event.getExecutionBlockId());
+      if(executionBlockContext != null){
+        
TupleCache.getInstance().removeBroadcastCache(event.getExecutionBlockId());
+        
executionBlockContext.reportExecutionBlock(event.getExecutionBlockId());
+        executionBlockContext.stop();
+        try {
+          
workerContext.getHashShuffleAppenderManager().close(event.getExecutionBlockId());
+        } catch (IOException e) {
+          LOG.fatal(e.getMessage(), e);
+          throw new RuntimeException(e);
         }
       }
-    };
+      LOG.info("Stopped execution block:" + event.getExecutionBlockId());
+    }
+  }
+
+  public EventHandler getEventHandler(){
+    return dispatcher.getEventHandler();
+  }
 
-    t.start();
+  public TajoConf getTajoConf() {
+    return tajoConf;
   }
 
   class FinishedTaskCleanThread extends Thread {

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
new file mode 100644
index 0000000..aac8973
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.ExecutionBlockId;
+
+public class TaskRunnerEvent extends AbstractEvent<TaskRunnerEvent.EventType> {
+  public enum EventType {
+    START,
+    STOP
+  }
+
+  protected final ExecutionBlockId executionBlockId;
+
+  public TaskRunnerEvent(EventType eventType,
+                         ExecutionBlockId executionBlockId) {
+    super(eventType);
+    this.executionBlockId = executionBlockId;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
new file mode 100644
index 0000000..8c9fa51
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.engine.query.QueryContext;
+
+public class TaskRunnerStartEvent extends TaskRunnerEvent {
+
+  private final QueryContext queryContext;
+  private final String[] params;
+  private final String plan;
+
+  public TaskRunnerStartEvent(String[] params,
+                              ExecutionBlockId executionBlockId,
+                              QueryContext context,
+                              String plan) {
+    super(EventType.START, executionBlockId);
+    this.params = params;
+    this.queryContext = context;
+    this.plan = plan;
+  }
+
+  public String[] getParams(){
+    return this.params;
+  }
+
+  public QueryContext getQueryContext() {
+    return queryContext;
+  }
+
+  public String getPlan() {
+    return plan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
new file mode 100644
index 0000000..c8ec20d
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStopEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.event;
+
+import org.apache.tajo.ExecutionBlockId;
+
+public class TaskRunnerStopEvent extends TaskRunnerEvent {
+
+  public TaskRunnerStopEvent(ExecutionBlockId executionBlockId) {
+    super(EventType.STOP, executionBlockId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto 
b/tajo-core/src/main/proto/QueryMasterProtocol.proto
index 0119a88..06d2a42 100644
--- a/tajo-core/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -32,7 +32,7 @@ service QueryMasterProtocolService {
   //from Worker
   rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto);
   rpc statusUpdate (TaskStatusProto) returns (BoolProto);
-  rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
+  rpc ping (ExecutionBlockIdProto) returns (BoolProto);
   rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
   rpc done (TaskCompletionReport) returns (BoolProto);
   rpc doneExecutionBlock(ExecutionBlockReport) returns (BoolProto);

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto 
b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index e100c48..dff2733 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -201,7 +201,7 @@ message DataChannelProto {
 }
 
 message RunExecutionBlockRequestProto {
-    required string executionBlockId = 1;
+    required ExecutionBlockIdProto executionBlockId = 1;
     required string queryMasterHost = 2;
     required int32 queryMasterPort = 3;
     required string nodeId = 4;
@@ -220,7 +220,8 @@ service TajoWorkerProtocolService {
   rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
 
   // from QueryMaster(Worker)
-  rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
+  rpc startExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
+  rpc stopExecutionBlock(ExecutionBlockIdProto) returns (BoolProto);
   rpc killTaskAttempt(QueryUnitAttemptIdProto) returns (BoolProto);
   rpc cleanup(QueryIdProto) returns (BoolProto);
   rpc cleanupExecutionBlocks(ExecutionBlockListProto) returns (BoolProto);

http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 95c06bb..b755e02 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -106,6 +106,8 @@ public class TestFetcher {
 
   @Test
   public void testAdjustFetchProcess() {
+    assertEquals(0.0f, Task.adjustFetchProcess(0, 0), 0);
+    assertEquals(0.0f, Task.adjustFetchProcess(10, 10), 0);
     assertEquals(0.05f, Task.adjustFetchProcess(10, 9), 0);
     assertEquals(0.1f, Task.adjustFetchProcess(10, 8), 0);
     assertEquals(0.25f, Task.adjustFetchProcess(10, 5), 0);

Reply via email to