Updated Branches:
  refs/heads/master da1fd3264 -> 44b0d2232

TAJO-50: Cleanup SubQuery. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/44b0d223
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/44b0d223
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/44b0d223

Branch: refs/heads/master
Commit: 44b0d2232fdca80338165ad0629eebbf90dcb7e2
Parents: da1fd32
Author: Hyunsik Choi <[email protected]>
Authored: Sat May 4 02:16:00 2013 +0900
Committer: Hyunsik Choi <[email protected]>
Committed: Sat May 4 02:16:00 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../src/main/java/tajo/master/GlobalPlanner.java   |    2 +-
 .../src/main/java/tajo/master/Priority.java        |   54 --
 .../src/main/java/tajo/master/Query.java           |   40 +-
 .../src/main/java/tajo/master/QueryMaster.java     |   12 +-
 .../src/main/java/tajo/master/Repartitioner.java   |   22 +-
 .../src/main/java/tajo/master/SubQuery.java        |  710 +++++++--------
 .../java/tajo/master/rm/RMContainerAllocator.java  |   14 +-
 .../src/main/java/tajo/worker/TaskRunner.java      |    1 +
 9 files changed, 400 insertions(+), 457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index db43b6d..413be47 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-50: Cleanup SubQuery. (hyunsik)
+
     TAJO-44: Adopt AMRMClient to RMContainerAllocator, RMCommunicator. 
(hyunsik)
     
     TAJO-42: Divide SubQuery into FSM and execution block parts. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
index 2709c98..b9fb587 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
@@ -922,7 +922,7 @@ public class GlobalPlanner {
     ExecutionBlock execBlock = subQuery.getBlock();
     QueryUnit unit = new QueryUnit(
         QueryIdFactory.newQueryUnitId(subQuery.getId()), 
execBlock.isLeafBlock(),
-        subQuery.eventHandler);
+        subQuery.getEventHandler());
     unit.setLogicalPlan(execBlock.getPlan());
     return unit;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/Priority.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Priority.java 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Priority.java
deleted file mode 100644
index 7e95d17..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Priority.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-public class Priority implements Comparable<Priority> {
-  private int priority;
-
-  public Priority(int prio) {
-    set(prio);
-  }
-
-  public void set(int prio) {
-    this.priority = prio;
-  }
-
-  public int get() {
-    return this.priority;
-  }
-
-  @Override
-  public int compareTo(Priority o) {
-    return this.get() - o.get();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof Priority) {
-      Priority p = (Priority) o;
-      return p.get() == this.get();
-    }
-    return false;
-  }
-
-  @Override
-  public String toString() {
-    return "" + priority;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
index 292b9fd..d740cbd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
@@ -36,7 +36,6 @@ import tajo.catalog.TableDesc;
 import tajo.catalog.TableDescImpl;
 import tajo.catalog.TableMeta;
 import tajo.catalog.proto.CatalogProtos.StoreType;
-import tajo.catalog.statistics.TableStat;
 import tajo.engine.json.GsonCreator;
 import tajo.engine.planner.global.MasterPlan;
 import tajo.engine.planner.logical.ExprType;
@@ -47,7 +46,10 @@ import tajo.storage.StorageManager;
 import tajo.util.IndexUtil;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -152,9 +154,9 @@ public class Query implements EventHandler<QueryEvent> {
       float [] subProgresses = new float[subqueries.size()];
       boolean finished = true;
       for (SubQuery subquery: subqueries.values()) {
-        if (subquery.getStateMachine().getCurrentState() != SubQueryState.NEW) 
{
+        if (subquery.getState() != SubQueryState.NEW) {
           subProgresses[idx] = subquery.getProgress();
-          if (finished == true && subquery.getState() != 
SubQueryState.SUCCEEDED) {
+          if (finished && subquery.getState() != SubQueryState.SUCCEEDED) {
             finished = false;
           }
         } else {
@@ -163,7 +165,7 @@ public class Query implements EventHandler<QueryEvent> {
         idx++;
       }
 
-      if (finished == true) {
+      if (finished) {
         return 1.0f;
       }
 
@@ -267,15 +269,6 @@ public class Query implements EventHandler<QueryEvent> {
     @Override
     public QueryState transition(Query query, QueryEvent queryEvent) {
       query.setStartTime();
-
-      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-      while(cursor.hasNext()) {
-        ExecutionBlock block = cursor.nextBlock();
-        System.out.println(block.getId());
-        System.out.println(block.getPlan());
-        System.out.println("--------------------------------");
-      }
-      query.getExecutionBlockCursor().reset();
       return QueryState.QUERY_INIT;
     }
   }
@@ -313,20 +306,18 @@ public class Query implements EventHandler<QueryEvent> {
           query.addSubQuery(nextSubQuery);
           nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
               SubQueryEventType.SQ_INIT));
-          LOG.info("Scheduling SubQuery's Priority: " + 
nextSubQuery.getPriority().get());
+          LOG.info("Scheduling SubQuery's Priority: " + 
nextSubQuery.getPriority());
           LOG.info("Scheduling SubQuery's Plan: \n" + 
nextSubQuery.getBlock().getPlan());
-          QueryState state = query.checkQueryForCompleted();
-          return state;
+          return query.checkQueryForCompleted();
 
-        } else {
+        } else { // Finish a query
           if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
-            SubQuerySucceeEvent succeeEvent = (SubQuerySucceeEvent) castEvent;
             SubQuery subQuery = query.getSubQuery(castEvent.getSubQueryId());
             TableDesc desc = new TableDescImpl(query.conf.getOutputTable(),
-                succeeEvent.getTableMeta(), query.context.getOutputPath());
+                subQuery.getTableMeta(), query.context.getOutputPath());
             query.setResultDesc(desc);
             try {
-              query.writeStat(query.context.getOutputPath(), subQuery, 
succeeEvent.getTableMeta().getStat());
+              query.writeStat(query.context.getOutputPath(), subQuery);
             } catch (IOException e) {
               e.printStackTrace();
             }
@@ -417,7 +408,7 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
-  private void writeStat(Path outputPath, SubQuery subQuery, TableStat stat)
+  private void writeStat(Path outputPath, SubQuery subQuery)
       throws IOException {
     ExecutionBlock execBlock = subQuery.getBlock();
     if (execBlock.getPlan().getType() == ExprType.CREATE_INDEX) {
@@ -438,10 +429,7 @@ public class Query implements EventHandler<QueryEvent> {
       sm.writeTableMeta(indexPath, meta);
 
     } else {
-      TableMeta meta = TCatUtil.newTableMeta(execBlock.getOutputSchema(),
-          StoreType.CSV);
-      meta.setStat(stat);
-      sm.writeTableMeta(outputPath, meta);
+      sm.writeTableMeta(outputPath, subQuery.getTableMeta());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
index 86581f0..8891861 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
@@ -45,6 +45,7 @@ import tajo.master.TajoMaster.MasterContext;
 import tajo.master.TaskRunnerLauncherImpl.Container;
 import tajo.master.event.*;
 import tajo.master.rm.RMContainerAllocator;
+import tajo.storage.StorageManager;
 import tajo.storage.StorageUtil;
 import tajo.util.TajoIdUtils;
 
@@ -81,6 +82,7 @@ public class QueryMaster extends CompositeService implements 
EventHandler {
   private CatalogService catalog;
 
   private boolean isCreateTableStmt;
+  private StorageManager storageManager;
   private FileSystem defaultFS;
   private Path outputPath;
 
@@ -115,6 +117,7 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
       rpc = masterContext.getYarnRPC();
 
       catalog = masterContext.getCatalog();
+      storageManager = masterContext.getStorageManager();
 
       taskRunnerListener = new TaskRunnerListener(queryContext);
       addIfService(taskRunnerListener);
@@ -124,8 +127,7 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
       dispatcher.register(ContainerAllocatorEventType.class, rmAllocator);
 
       query = new Query(queryContext, queryId, clock, appSubmitTime,
-          "", dispatcher.getEventHandler(), masterPlan,
-          masterContext.getStorageManager());
+          "", dispatcher.getEventHandler(), masterPlan, storageManager);
       initStagingDir();
 
       // QueryEventDispatcher is already registered in TajoMaster
@@ -212,7 +214,7 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
       implements EventHandler<TaskSchedulerEvent> {
     public void handle(TaskSchedulerEvent event) {
       SubQuery subQuery = query.getSubQuery(event.getSubQueryId());
-      subQuery.taskScheduler.handle(event);
+      subQuery.getTaskScheduler().handle(event);
     }
   }
 
@@ -337,6 +339,10 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
     public long getFinishTime() {
       return query.getFinishTime();
     }
+
+    public StorageManager getStorageManager() {
+      return storageManager;
+    }
   }
 
   private class QueryFinishEventHandler implements 
EventHandler<QueryFinishEvent> {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
index 7cf4422..51aff94 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
@@ -58,7 +58,7 @@ public class Repartitioner {
   public static QueryUnit [] createJoinTasks(SubQuery subQuery)
       throws IOException {
     ExecutionBlock execBlock = subQuery.getBlock();
-    CatalogService catalog = subQuery.queryContext.getCatalog();
+    CatalogService catalog = subQuery.getContext().getCatalog();
 
     ScanNode[] scans = execBlock.getScanNodes();
     Path tablePath;
@@ -85,7 +85,7 @@ public class Repartitioner {
       }
 
       // Getting a table stat for each scan
-      stats[i] = subQuery.getChildQuery(scans[i]).getStats();
+      stats[i] = subQuery.getChildQuery(scans[i]).getTableStat();
     }
 
     // Assigning either fragments or fetch urls to query units
@@ -93,7 +93,7 @@ public class Repartitioner {
     if (scans[0].isBroadcast() || scans[1].isBroadcast()) {
       tasks = new QueryUnit[1];
       tasks[0] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), 
0),
-          false, subQuery.eventHandler);
+          false, subQuery.getEventHandler());
       tasks[0].setLogicalPlan(execBlock.getPlan());
       tasks[0].setFragment(scans[0].getTableId(), fragments[0]);
       tasks[0].setFragment(scans[1].getTableId(), fragments[1]);
@@ -135,7 +135,7 @@ public class Repartitioner {
       // Getting the desire number of join tasks according to the volumn
       // of a larger table
       int largerIdx = stats[0].getNumBytes() >= stats[1].getNumBytes() ? 0 : 1;
-      int desireJoinTaskVolumn = subQuery.queryContext.getConf().
+      int desireJoinTaskVolumn = subQuery.getContext().getConf().
           getIntVar(ConfVars.JOIN_TASK_VOLUME);
 
       // calculate the number of tasks according to the data size
@@ -181,7 +181,7 @@ public class Repartitioner {
     for (int i = 0; i < taskNum; i++) {
       tasks[i] = new QueryUnit(
           QueryIdFactory.newQueryUnitId(subQuery.getId(), i), 
execBlock.isLeafBlock(),
-          subQuery.eventHandler);
+          subQuery.getEventHandler());
       tasks[i].setLogicalPlan(execBlock.getPlan());
       for (Fragment fragment : fragments) {
         tasks[i].setFragment2(fragment);
@@ -253,14 +253,14 @@ public class Repartitioner {
                                                          int maxNum)
       throws InternalException {
     ExecutionBlock execBlock = subQuery.getBlock();
-    TableStat stat = childSubQuery.getStats();
+    TableStat stat = childSubQuery.getTableStat();
     if (stat.getNumRows() == 0) {
       return new QueryUnit[0];
     }
 
     ScanNode scan = execBlock.getScanNodes()[0];
     Path tablePath;
-    tablePath = subQuery.sm.getTablePath(scan.getTableId());
+    tablePath = 
subQuery.getContext().getStorageManager().getTablePath(scan.getTableId());
 
     StoreTableNode store = (StoreTableNode) childSubQuery.getBlock().getPlan();
     SortNode sort = (SortNode) store.getSubNode();
@@ -296,7 +296,7 @@ public class Repartitioner {
 
     List<String> basicFetchURIs = new ArrayList<String>();
 
-    SubQuery child = childSubQuery.queryContext.getSubQuery(
+    SubQuery child = childSubQuery.getContext().getSubQuery(
         subQuery.getBlock().getChildBlock(scan).getId());
     for (QueryUnit qu : child.getQueryUnits()) {
       for (IntermediateEntry p : qu.getIntermediateData()) {
@@ -372,14 +372,14 @@ public class Repartitioner {
                                                  SubQuery childSubQuery,
                                                  int maxNum) {
     ExecutionBlock execBlock = subQuery.getBlock();
-    TableStat stat = childSubQuery.getStats();
+    TableStat stat = childSubQuery.getTableStat();
     if (stat.getNumRows() == 0) {
       return new QueryUnit[0];
     }
 
     ScanNode scan = execBlock.getScanNodes()[0];
     Path tablePath;
-    tablePath = subQuery.sm.getTablePath(scan.getTableId());
+    tablePath = 
subQuery.getContext().getStorageManager().getTablePath(scan.getTableId());
 
     List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
     for (QueryUnit tasks : childSubQuery.getQueryUnits()) {
@@ -513,7 +513,7 @@ public class Repartitioner {
     QueryUnit [] tasks = new QueryUnit[num];
     for (int i = 0; i < num; i++) {
       tasks[i] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), 
i),
-          false, subQuery.eventHandler);
+          false, subQuery.getEventHandler());
       tasks[i].setFragment2(frag);
       tasks[i].setLogicalPlan(plan);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
index ff73334..398e1ae 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
@@ -26,27 +26,32 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.state.*;
+import org.apache.hadoop.yarn.util.Records;
 import tajo.QueryIdFactory;
 import tajo.QueryUnitId;
 import tajo.SubQueryId;
-import tajo.catalog.*;
-import tajo.catalog.proto.CatalogProtos.StoreType;
+import tajo.catalog.CatalogService;
+import tajo.catalog.TCatUtil;
+import tajo.catalog.TableDesc;
+import tajo.catalog.TableMeta;
 import tajo.catalog.statistics.ColumnStat;
 import tajo.catalog.statistics.StatisticsUtil;
 import tajo.catalog.statistics.TableStat;
 import tajo.conf.TajoConf;
-import tajo.engine.json.GsonCreator;
 import tajo.engine.planner.PlannerUtil;
-import tajo.engine.planner.logical.*;
+import tajo.engine.planner.logical.ExprType;
+import tajo.engine.planner.logical.GroupbyNode;
+import tajo.engine.planner.logical.ScanNode;
+import tajo.engine.planner.logical.StoreTableNode;
 import tajo.master.QueryMaster.QueryContext;
 import tajo.master.event.*;
 import tajo.storage.Fragment;
 import tajo.storage.StorageManager;
-import tajo.util.IndexUtil;
 
 import java.io.IOException;
 import java.util.*;
@@ -60,19 +65,19 @@ import static tajo.conf.TajoConf.ConfVars;
 
 
 /**
- * SubQuery is an instance of an ExecutionBlock.
+ * SubQuery plays a role in controlling an ExecutionBlock and is a finite 
state machine.
  */
 public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private static final Log LOG = LogFactory.getLog(SubQuery.class);
 
   private ExecutionBlock block;
-  private Priority priority;
-  private TableStat stats;
-  EventHandler eventHandler;
-  final StorageManager sm;
-  TaskSchedulerImpl taskScheduler;
-  QueryContext queryContext;
+  private int priority;
+  private TableMeta meta;
+  private EventHandler eventHandler;
+  private final StorageManager sm;
+  private TaskSchedulerImpl taskScheduler;
+  private QueryContext context;
 
   private long startTime;
   private long finishTime;
@@ -134,7 +139,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
   private int completedTaskCount = 0;
 
   public SubQuery(QueryContext context, ExecutionBlock block, StorageManager 
sm) {
-    this.queryContext = context;
+    this.context = context;
     this.block = block;
     this.sm = sm;
     this.eventHandler = context.getEventHandler();
@@ -145,10 +150,32 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     stateMachine = stateMachineFactory.make(this);
   }
 
+  public QueryContext getContext() {
+    return context;
+  }
+
+  public EventHandler getEventHandler() {
+    return eventHandler;
+  }
+
+  public TaskScheduler getTaskScheduler() {
+    return taskScheduler;
+  }
+
+  public void setStartTime() {
+    startTime = context.getClock().getTime();
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
   public long getStartTime() {
     return this.startTime;
   }
 
+  public void setFinishTime() {
+    finishTime = context.getClock().getTime();
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
   public long getFinishTime() {
     return this.finishTime;
   }
@@ -156,7 +183,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
   public float getProgress() {
     readLock.lock();
     try {
-      if (getStateMachine().getCurrentState() == SubQueryState.NEW) {
+      if (getState() == SubQueryState.NEW) {
         return 0;
       } else {
         if (completedTaskCount == 0) {
@@ -193,21 +220,20 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
   }
 
   public void setPriority(int priority) {
-    if (this.priority == null) {
-      this.priority = new Priority(priority);
-    }
+    this.priority = priority;
   }
 
-  public StorageManager getStorageManager() {
-    return sm;
+
+  public int getPriority() {
+    return this.priority;
   }
 
-  public void setStats(TableStat stat) {
-    this.stats = stat;
+  public StorageManager getStorageManager() {
+    return sm;
   }
   
   public SubQuery getChildQuery(ScanNode scanForChild) {
-    return queryContext.getSubQuery(block.getChildBlock(scanForChild).getId());
+    return context.getSubQuery(block.getChildBlock(scanForChild).getId());
   }
   
   public SubQueryId getId() {
@@ -215,7 +241,6 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
   }
   
   public QueryUnit[] getQueryUnits() {
-    // TODO - to be changed to unified getter
     return tasks.values().toArray(new QueryUnit[tasks.size()]);
   }
   
@@ -223,12 +248,17 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     return tasks.get(qid);
   }
 
-  public Priority getPriority() {
-    return this.priority;
+  public void setTableMeta(TableMeta meta) {
+    this.meta = meta;
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
+  public TableMeta getTableMeta() {
+    return meta;
   }
 
-  public TableStat getStats() {
-    return this.stats;
+  public TableStat getTableStat() {
+    return this.meta.getStat();
   }
 
   public String toString() {
@@ -264,15 +294,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     }
   }
 
-  private void finishUnionUnit() throws IOException {
-    // write meta and continue
-    TableStat stat = generateUnionStat(this);
-    setStats(stat);
-    writeStat(this, stat);
-    //unit.setState(QueryStatus.QUERY_FINISHED);
-  }
-
-  private static TableStat generateUnionStat(SubQuery unit) {
+  private static TableStat computeStatFromUnionBlock(SubQuery unit) {
     TableStat stat = new TableStat();
     TableStat childStat;
     long avgRows = 0, numBytes = 0, numRows = 0;
@@ -282,8 +304,8 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     Iterator<ExecutionBlock> it = unit.getBlock().getChildBlocks().iterator();
     while (it.hasNext()) {
       ExecutionBlock block = it.next();
-      SubQuery childSubQuery = unit.queryContext.getSubQuery(block.getId());
-      childStat = childSubQuery.getStats();
+      SubQuery childSubQuery = unit.context.getSubQuery(block.getId());
+      childStat = childSubQuery.getTableStat();
       avgRows += childStat.getAvgRows();
       columnStats.addAll(childStat.getColumnStats());
       numBlocks += childStat.getNumBlocks();
@@ -301,129 +323,136 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     return stat;
   }
 
-  public void cleanUp() {
+  public TableMeta buildTableMeta() throws IOException {
+    finishTime = context.getClock().getTime();
+
+    TableStat stat;
     if (block.hasUnion()) {
-      try {
-        // write meta and continue
-        TableStat stat = generateUnionStat(this);
-        setStats(stat);
-        writeStat(this, stat);
-        //unit.setState(QueryStatus.QUERY_FINISHED);
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
+      stat = computeStatFromUnionBlock(this);
+    } else {
+      stat = computeStatFromTasks();
+    }
+    TableMeta meta = writeStat(this, stat);
+    meta.setStat(stat);
+    setTableMeta(meta);
+    return meta;
+  }
+
+  private TableStat computeStatFromTasks() {
+    List<TableStat> stats = Lists.newArrayList();
+    for (QueryUnit unit : getQueryUnits()) {
+      stats.add(unit.getStats());
+    }
+    TableStat tableStat = StatisticsUtil.aggregateTableStat(stats);
+    return tableStat;
+  }
+
+  private TableMeta writeStat(SubQuery subQuery, TableStat stat)
+      throws IOException {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    StoreTableNode storeTableNode = execBlock.getStoreTableNode();
+    TableMeta meta = toTableMeta(storeTableNode);
+    meta.setStat(stat);
+    sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
+    return meta;
+  }
+
+  private static TableMeta toTableMeta(StoreTableNode store) {
+    if (store.hasOptions()) {
+      return TCatUtil.newTableMeta(store.getOutSchema(),
+          store.getStorageType(), store.getOptions());
     } else {
-      LOG.info("SubQuery: " + getId() + " sets TableStat");
-      TableStat stat = generateStat();
+      return TCatUtil.newTableMeta(store.getOutSchema(),
+          store.getStorageType());
+    }
+  }
+
+  private void stopScheduler() {
+    // If there are launched TaskRunners, send the 'shouldDie' message to all r
+    // via received task requests.
+    if (taskScheduler != null) {
+      taskScheduler.stop();
+    }
+  }
+
+  private void releaseContainers() {
+    // If there are still live TaskRunners, try to kill the containers.
+    for (Entry<ContainerId, Container> entry : containers.entrySet()) {
+      eventHandler.handle(new TaskRunnerStopEvent(getId(), entry.getValue()));
+    }
+  }
+
+  private void finish() {
+    TableMeta meta = null;
+    try {
+      meta = buildTableMeta();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    setTableMeta(meta);
+    setFinishTime();
+    eventHandler.handle(new SubQuerySucceeEvent(getId(), meta));
+  }
+
+  @Override
+  public void handle(SubQueryEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getSubQueryId() + " of type " + 
event.getType());
+    }
+
+    try {
+      writeLock.lock();
+      SubQueryState oldState = getState();
       try {
-        writeStat(this, stat);
-      } catch (IOException e) {
+        getStateMachine().doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        eventHandler.handle(new SubQueryEvent(getId(),
+            SubQueryEventType.SQ_INTERNAL_ERROR));
+      }
+
+      // notify the eventhandler of state change
+      if (LOG.isDebugEnabled()) {
+        if (oldState != getState()) {
+          LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to 
"
+              + getState());
+        }
       }
     }
 
-    finishTime = queryContext.getClock().getTime();
+    finally {
+      writeLock.unlock();
+    }
   }
 
-
   private static class InitAndRequestContainer implements 
MultipleArcTransition<SubQuery,
       SubQueryEvent, SubQueryState> {
 
     @Override
     public SubQueryState transition(SubQuery subQuery, SubQueryEvent 
subQueryEvent) {
-      subQuery.startTime = subQuery.queryContext.getClock().getTime();
-      subQuery.taskScheduler = new TaskSchedulerImpl(subQuery.queryContext);
-      subQuery.taskScheduler.init(subQuery.queryContext.getConf());
-      subQuery.taskScheduler.start();
-
+      subQuery.setStartTime();
       ExecutionBlock execBlock = subQuery.getBlock();
+      SubQueryState state;
 
       try {
-        // if subquery is dummy, which means it requires only a logical step
-        // instead of actual query. An 'union all' is an example of
-        // a dummy subquery.
+        // Union operator does not require actual query processing. It is 
performed logically.
         if (execBlock.hasUnion()) {
-          subQuery.finishUnionUnit();
-          subQuery.cleanUp();
-          TableMeta meta = new TableMetaImpl(execBlock.getOutputSchema(),
-              StoreType.CSV, new Options(), subQuery.getStats());
-          subQuery.eventHandler.handle(new 
SubQuerySucceeEvent(subQuery.getId(),
-              meta));
-          return SubQueryState.SUCCEEDED;
+          subQuery.finish();
+          state = SubQueryState.SUCCEEDED;
         } else {
-          QueryUnit [] tasks;
-          // TODO - should be improved
-          if (execBlock.isLeafBlock() && execBlock.getScanNodes().length == 1) 
{
-
-            // if parent is join, this subquery is for partitioning data.
-            if (execBlock.hasParentBlock()) {
-              int numTasks = calculatePartitionNum(subQuery);
-              Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
-            }
-
-            tasks = createLeafTasks(subQuery);
-          } else if (execBlock.getScanNodes().length > 1) {
-            // if parent is join, this subquery is for partitioning data.
-            if (execBlock.hasParentBlock()) {
-              int numTasks = calculatePartitionNum(subQuery);
-              Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
-            }
-
-            if (subQuery.getId().getId() == 15) {
-              System.out.println("error point!");
-            }
-
-            tasks = Repartitioner.createJoinTasks(subQuery);
+          setRepartitionIfNecessary(subQuery);
+          createTasks(subQuery);
 
-          } else {
-            // if parent is join, this subquery is for partitioning data.
-            if (execBlock.hasParentBlock()) {
-              int partitionNum = calculatePartitionNum(subQuery);
-              Repartitioner.setPartitionNumberForTwoPhase(subQuery, 
partitionNum);
-            }
-            int numTasks = getNonLeafTaskNum(subQuery);
-
-            SubQueryId childId = 
subQuery.getBlock().getChildBlocks().iterator().next().getId();
-            SubQuery child = subQuery.queryContext.getSubQuery(childId);
-            tasks = Repartitioner.createNonLeafTask(subQuery, child, numTasks);
-          }
-          for (QueryUnit task : tasks) {
-            subQuery.addTask(task);
-          }
-          LOG.info("Create " + tasks.length + " Tasks");
-
-          // if there is no tasks
-          if (subQuery.tasks.size() == 0) {
-            subQuery.cleanUp();
-            TableMeta meta = toTableMeta(execBlock.getStoreTableNode());
-            meta.setStat(subQuery.getStats());
-            subQuery.eventHandler.handle(new 
SubQuerySucceeEvent(subQuery.getId(),
-                meta));
+          if (subQuery.tasks.size() == 0) { // if there is no tasks
+            subQuery.finish();
             return SubQueryState.SUCCEEDED;
-
           } else {
-            int numRequest = Math.min(tasks.length,
-                subQuery.queryContext.getNumClusterNode() * 4);
-
-            final Resource resource =
-                RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
-                    Resource.class);
-            if (tasks.length <= subQuery.queryContext.getNumClusterNode()) {
-              
resource.setMemory(subQuery.queryContext.getMaxContainerCapability());
-            } else {
-              resource.setMemory(2000);
-            }
-
-            org.apache.hadoop.yarn.api.records.Priority priority =
-                RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
-                    org.apache.hadoop.yarn.api.records.Priority.class);
-            priority.setPriority(subQuery.getPriority().get());
-            ContainerAllocationEvent event =
-                new 
ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
-                    subQuery.getId(), priority, resource, numRequest, 
execBlock.isLeafBlock(), 0.0f);
-            subQuery.eventHandler.handle(event);
+            initTaskScheduler(subQuery);
+            allocateContainers(subQuery);
+            return SubQueryState.INIT;
           }
         }
-        return  SubQueryState.INIT;
       } catch (Exception e) {
         LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e);
         subQuery.eventHandler.handle(
@@ -432,9 +461,181 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
             new SubQueryCompletedEvent(subQuery.getId(), 
SubQueryState.FAILED));
         return SubQueryState.FAILED;
       }
+
+      return state;
     }
 
-    public QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
+    private void initTaskScheduler(SubQuery subQuery) {
+      subQuery.taskScheduler = new TaskSchedulerImpl(subQuery.context);
+      subQuery.taskScheduler.init(subQuery.context.getConf());
+      subQuery.taskScheduler.start();
+    }
+
+    /**
+     * If a parent block requires a repartition operation, the method sets 
proper repartition
+     * methods and the number of partitions to a given subquery.
+     */
+    private static void setRepartitionIfNecessary(SubQuery subQuery) {
+      if (subQuery.getBlock().hasParentBlock()) {
+        int numTasks = calculatePartitionNum(subQuery);
+        Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
+      }
+    }
+
+    /**
+     * Getting the desire number of partitions according to the volume of 
input data.
+     * This method is only used to determine the partition key number of hash 
join or aggregation.
+     *
+     * @param subQuery
+     * @return
+     */
+    public static int calculatePartitionNum(SubQuery subQuery) {
+      TajoConf conf = subQuery.context.getConf();
+      ExecutionBlock parent = subQuery.getBlock().getParentBlock();
+
+      GroupbyNode grpNode = null;
+      if (parent != null) {
+        grpNode = (GroupbyNode) PlannerUtil.findTopNode(
+            parent.getPlan(), ExprType.GROUP_BY);
+      }
+
+      // Is this subquery the first step of join?
+      if (parent != null && parent.getScanNodes().length == 2) {
+        Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator();
+
+        // for inner
+        ExecutionBlock outer = child.next();
+        long outerVolume = getInputVolume(subQuery.context, outer);
+
+        // for inner
+        ExecutionBlock inner = child.next();
+        long innerVolume = getInputVolume(subQuery.context, inner);
+        LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
+        LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
+
+        long smaller = Math.min(outerVolume, innerVolume);
+
+        int mb = (int) Math.ceil((double)smaller / 1048576);
+        LOG.info("Smaller Table's volume is approximately " + mb + " MB");
+        // determine the number of task
+        int taskNum = (int) Math.ceil((double)mb /
+            conf.getIntVar(ConfVars.JOIN_PARTITION_VOLUME));
+        LOG.info("The determined number of join partitions is " + taskNum);
+        return taskNum;
+
+        // Is this subquery the first step of group-by?
+      } else if (grpNode != null) {
+
+        if (grpNode.getGroupingColumns().length == 0) {
+          return 1;
+        } else {
+          long volume = getInputVolume(subQuery.context, subQuery.block);
+
+          int mb = (int) Math.ceil((double)volume / 1048576);
+          LOG.info("Table's volume is approximately " + mb + " MB");
+          // determine the number of task
+          int taskNum = (int) Math.ceil((double)mb /
+              conf.getIntVar(ConfVars.AGGREGATION_PARTITION_VOLUME));
+          LOG.info("The determined number of aggregation partitions is " + 
taskNum);
+          return taskNum;
+        }
+      } else {
+        LOG.info("============>>>>> Unexpected Case! <<<<<================");
+        long volume = getInputVolume(subQuery.context, subQuery.block);
+
+        int mb = (int) Math.ceil((double)volume / 1048576);
+        LOG.info("Table's volume is approximately " + mb + " MB");
+        // determine the number of task per 128MB
+        int taskNum = (int) Math.ceil((double)mb / 128);
+        LOG.info("The determined number of partitions is " + taskNum);
+        return taskNum;
+      }
+    }
+
+    private static void createTasks(SubQuery subQuery) throws IOException {
+      ExecutionBlock execBlock = subQuery.getBlock();
+      QueryUnit [] tasks;
+      if (execBlock.isLeafBlock() && execBlock.getScanNodes().length == 1) { 
// Case 1: Just Scan
+        tasks = createLeafTasks(subQuery);
+
+      } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
+        tasks = Repartitioner.createJoinTasks(subQuery);
+
+      } else { // Case 3: Others (Sort or Aggregation)
+        int numTasks = getNonLeafTaskNum(subQuery);
+        SubQueryId childId = 
subQuery.getBlock().getChildBlocks().iterator().next().getId();
+        SubQuery child = subQuery.context.getSubQuery(childId);
+        tasks = Repartitioner.createNonLeafTask(subQuery, child, numTasks);
+      }
+
+      LOG.info("Create " + tasks.length + " Tasks");
+
+      for (QueryUnit task : tasks) {
+        subQuery.addTask(task);
+      }
+    }
+
+    /**
+     * Getting the desire number of tasks according to the volume of input data
+     *
+     * @param subQuery
+     * @return
+     */
+    public static int getNonLeafTaskNum(SubQuery subQuery) {
+      // Getting intermediate data size
+      long volume = getInputVolume(subQuery.context, subQuery.getBlock());
+
+      int mb = (int) Math.ceil((double)volume / 1048576);
+      LOG.info("Table's volume is approximately " + mb + " MB");
+      // determine the number of task per 64MB
+      int maxTaskNum = (int) Math.ceil((double)mb / 64);
+      LOG.info("The determined number of non-leaf tasks is " + maxTaskNum);
+      return maxTaskNum;
+    }
+
+    public static long getInputVolume(QueryContext context, ExecutionBlock 
execBlock) {
+      CatalogService catalog = context.getCatalog();
+      if (execBlock.isLeafBlock()) {
+        ScanNode outerScan = execBlock.getScanNodes()[0];
+        TableStat stat = 
catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat();
+        return stat.getNumBytes();
+      } else {
+        long aggregatedVolume = 0;
+        for (ExecutionBlock childBlock : execBlock.getChildBlocks()) {
+          SubQuery subquery = context.getSubQuery(childBlock.getId());
+          aggregatedVolume += subquery.getTableStat().getNumBytes();
+        }
+
+        return aggregatedVolume;
+      }
+    }
+
+    public static void allocateContainers(SubQuery subQuery) {
+      ExecutionBlock execBlock = subQuery.getBlock();
+      QueryUnit [] tasks = subQuery.getQueryUnits();
+
+      int numRequest = Math.min(tasks.length,
+          subQuery.context.getNumClusterNode() * 4);
+
+      final Resource resource =
+          RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
+              Resource.class);
+      if (tasks.length <= subQuery.context.getNumClusterNode()) {
+        resource.setMemory(subQuery.context.getMaxContainerCapability());
+      } else {
+        resource.setMemory(2000);
+      }
+
+      Priority priority = Records.newRecord(Priority.class);
+      priority.setPriority(subQuery.getPriority());
+      ContainerAllocationEvent event =
+          new 
ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
+              subQuery.getId(), priority, resource, numRequest,
+              execBlock.isLeafBlock(), 0.0f);
+      subQuery.eventHandler.handle(event);
+    }
+
+    private static QueryUnit [] createLeafTasks(SubQuery subQuery) throws 
IOException {
       ExecutionBlock execBlock = subQuery.getBlock();
       ScanNode[] scans = execBlock.getScanNodes();
       Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
@@ -442,13 +643,13 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
       Path inputPath;
 
       ScanNode scan = scans[0];
-      TableDesc desc = 
subQuery.queryContext.getCatalog().getTableDesc(scan.getTableId());
+      TableDesc desc = 
subQuery.context.getCatalog().getTableDesc(scan.getTableId());
       inputPath = desc.getPath();
       meta = desc.getMeta();
 
       // TODO - should be change the inner directory
       Path oldPath = new Path(inputPath, "data");
-      FileSystem fs = inputPath.getFileSystem(subQuery.queryContext.getConf());
+      FileSystem fs = inputPath.getFileSystem(subQuery.context.getConf());
       if (fs.exists(oldPath)) {
         inputPath = oldPath;
       }
@@ -459,129 +660,24 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
 
       int i = 0;
       for (Fragment fragment : fragments) {
-        queryUnit = newQueryUnit(subQuery, i++);
-        queryUnit.setFragment(scan.getTableId(), fragment);
+        queryUnit = newQueryUnit(subQuery, i++, fragment);
         queryUnits.add(queryUnit);
       }
 
       return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
     }
 
-    private QueryUnit newQueryUnit(SubQuery subQuery, int taskId) {
+    private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, 
Fragment fragment) {
       ExecutionBlock execBlock = subQuery.getBlock();
       QueryUnit unit = new QueryUnit(
           QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), 
execBlock.isLeafBlock(),
           subQuery.eventHandler);
       unit.setLogicalPlan(execBlock.getPlan());
+      unit.setFragment2(fragment);
       return unit;
     }
   }
 
-  /**
-   * Getting the desire number of partitions according to the volume of input 
data.
-   * This method is only used to determine the partition key number of hash 
join or aggregation.
-   *
-   * @param subQuery
-   * @return
-   */
-  public static int calculatePartitionNum(SubQuery subQuery) {
-    TajoConf conf = subQuery.queryContext.getConf();
-    ExecutionBlock parent = subQuery.getBlock().getParentBlock();
-
-    GroupbyNode grpNode = null;
-    if (parent != null) {
-      grpNode = (GroupbyNode) PlannerUtil.findTopNode(
-          parent.getPlan(), ExprType.GROUP_BY);
-    }
-
-    // Is this subquery the first step of join?
-    if (parent != null && parent.getScanNodes().length == 2) {
-      Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator();
-
-      // for inner
-      ExecutionBlock outer = child.next();
-      long outerVolume = getInputVolume(subQuery.queryContext, outer);
-
-      // for inner
-      ExecutionBlock inner = child.next();
-      long innerVolume = getInputVolume(subQuery.queryContext, inner);
-      LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
-      LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
-
-      long smaller = Math.min(outerVolume, innerVolume);
-
-      int mb = (int) Math.ceil((double)smaller / 1048576);
-      LOG.info("Smaller Table's volume is approximately " + mb + " MB");
-      // determine the number of task
-      int taskNum = (int) Math.ceil((double)mb /
-          conf.getIntVar(ConfVars.JOIN_PARTITION_VOLUME));
-      LOG.info("The determined number of join partitions is " + taskNum);
-      return taskNum;
-
-      // Is this subquery the first step of group-by?
-    } else if (grpNode != null) {
-
-      if (grpNode.getGroupingColumns().length == 0) {
-        return 1;
-      } else {
-        long volume = getInputVolume(subQuery.queryContext, subQuery.block);
-
-        int mb = (int) Math.ceil((double)volume / 1048576);
-        LOG.info("Table's volume is approximately " + mb + " MB");
-        // determine the number of task
-        int taskNum = (int) Math.ceil((double)mb /
-            conf.getIntVar(ConfVars.AGGREGATION_PARTITION_VOLUME));
-        LOG.info("The determined number of aggregation partitions is " + 
taskNum);
-        return taskNum;
-      }
-    } else {
-      LOG.info("============>>>>> Unexpected Case! <<<<<================");
-      long volume = getInputVolume(subQuery.queryContext, subQuery.block);
-
-      int mb = (int) Math.ceil((double)volume / 1048576);
-      LOG.info("Table's volume is approximately " + mb + " MB");
-      // determine the number of task per 128MB
-      int taskNum = (int) Math.ceil((double)mb / 128);
-      LOG.info("The determined number of partitions is " + taskNum);
-      return taskNum;
-    }
-  }
-
-  public static long getInputVolume(QueryContext context, ExecutionBlock 
execBlock) {
-    CatalogService catalog = context.getCatalog();
-    if (execBlock.isLeafBlock()) {
-      ScanNode outerScan = execBlock.getScanNodes()[0];
-      TableStat stat = 
catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat();
-      return stat.getNumBytes();
-    } else {
-      long aggregatedVolume = 0;
-      for (ExecutionBlock childBlock : execBlock.getChildBlocks()) {
-        SubQuery subquery = context.getSubQuery(childBlock.getId());
-        aggregatedVolume += subquery.getStats().getNumBytes();
-      }
-
-      return aggregatedVolume;
-    }
-  }
-
-  /**
-   * Getting the desire number of tasks according to the volume of input data
-   *
-   * @param subQuery
-   * @return
-   */
-  public static int getNonLeafTaskNum(SubQuery subQuery) {
-    // Getting intermediate data size
-    long volume = getInputVolume(subQuery.queryContext, subQuery.getBlock());
-
-    int mb = (int) Math.ceil((double)volume / 1048576);
-    LOG.info("Table's volume is approximately " + mb + " MB");
-    // determine the number of task per 64MB
-    int maxTaskNum = (int) Math.ceil((double)mb / 64);
-    LOG.info("The determined number of non-leaf tasks is " + maxTaskNum);
-    return maxTaskNum;
-  }
-
   int i = 0;
   private static class ContainerLaunchTransition
       implements SingleArcTransition<SubQuery, SubQueryEvent> {
@@ -617,6 +713,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     @Override
     public SubQueryState transition(SubQuery subQuery,
                            SubQueryEvent subQueryEvent) {
+      // schedule tasks
       try {
         for (QueryUnitId taskId : subQuery.tasks.keySet()) {
           subQuery.eventHandler.handle(new TaskEvent(taskId, 
TaskEventType.T_SCHEDULE));
@@ -630,9 +727,8 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     }
   }
 
-  private class TaskCompletedTransition implements
-      SingleArcTransition<SubQuery, SubQueryEvent> {
-
+  private static class TaskCompletedTransition
+      implements SingleArcTransition<SubQuery, SubQueryEvent> {
 
     @Override
     public void transition(SubQuery subQuery,
@@ -641,7 +737,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
       SubQueryTaskEvent taskEvent = (SubQueryTaskEvent) event;
       QueryUnitAttempt task = 
subQuery.getQueryUnit(taskEvent.getTaskId()).getSuccessfulAttempt();
 
-      LOG.info(getId() + " SubQuery Succeeded " + completedTaskCount + "/"
+      LOG.info(subQuery.getId() + " SubQuery Succeeded " + 
subQuery.completedTaskCount + "/"
           + subQuery.tasks.size() + " on " + task.getHost());
       if (subQuery.completedTaskCount == subQuery.tasks.size()) {
         subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
@@ -658,31 +754,13 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
       // TODO - Commit subQuery & do cleanup
       // TODO - records succeeded, failed, killed completed task
       // TODO - records metrics
-
-      ExecutionBlock execBlock = subQuery.getBlock();
-
-      for (Entry<ContainerId, Container> entry : 
subQuery.containers.entrySet()) {
-        subQuery.eventHandler.handle(new TaskRunnerStopEvent(subQuery.getId(),
-            entry.getValue()));
-      }
-      subQuery.cleanUp();
-      subQuery.taskScheduler.stop();
-
-      StoreTableNode storeTableNode = execBlock.getStoreTableNode();
-      TableMeta meta = toTableMeta(storeTableNode);
-      meta.setStat(subQuery.getStats());
-
-      subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(),
-          meta));
-      subQuery.finishTime = subQuery.queryContext.getClock().getTime();
+      subQuery.stopScheduler();
+      subQuery.releaseContainers();
+      subQuery.finish();
     }
   }
 
-  SubQueryState finished(SubQueryState state) {
-    return state;
-  }
-
-  class InternalErrorTransition
+  private static class InternalErrorTransition
       implements SingleArcTransition<SubQuery, SubQueryEvent> {
 
     @Override
@@ -691,84 +769,4 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
 
     }
   }
-
-  private TableStat generateStat() {
-    List<TableStat> stats = Lists.newArrayList();
-    for (QueryUnit unit : getQueryUnits()) {
-      stats.add(unit.getStats());
-    }
-    TableStat tableStat = StatisticsUtil.aggregateTableStat(stats);
-    setStats(tableStat);
-    return tableStat;
-  }
-
-  private void writeStat(SubQuery subQuery, TableStat stat)
-      throws IOException {
-    ExecutionBlock execBlock = subQuery.getBlock();
-
-    if (execBlock.getPlan().getType() == ExprType.CREATE_INDEX) {
-      IndexWriteNode index = (IndexWriteNode) execBlock.getPlan();
-      Path indexPath = new Path(sm.getTablePath(index.getTableName()), 
"index");
-      TableMeta meta;
-      if (sm.getFileSystem().exists(new Path(indexPath, ".meta"))) {
-        meta = sm.getTableMeta(indexPath);
-      } else {
-        StoreTableNode storeTableNode = execBlock.getStoreTableNode();
-        meta = toTableMeta(storeTableNode);
-      }
-      String indexName = IndexUtil.getIndexName(index.getTableName(),
-          index.getSortSpecs());
-      String json = GsonCreator.getInstance().toJson(index.getSortSpecs());
-      meta.putOption(indexName, json);
-
-      sm.writeTableMeta(indexPath, meta);
-
-    } else {
-      StoreTableNode storeTableNode = execBlock.getStoreTableNode();
-      TableMeta meta = toTableMeta(storeTableNode);
-      meta.setStat(stat);
-      sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
-    }
-  }
-
-  private static TableMeta toTableMeta(StoreTableNode store) {
-    if (store.hasOptions()) {
-      return TCatUtil.newTableMeta(store.getOutSchema(),
-          store.getStorageType(), store.getOptions());
-    } else {
-      return TCatUtil.newTableMeta(store.getOutSchema(),
-          store.getStorageType());
-    }
-  }
-
-  @Override
-  public void handle(SubQueryEvent event) {
-    //if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing " + event.getSubQueryId() + " of type " + 
event.getType());
-    //}
-
-    try {
-      writeLock.lock();
-      SubQueryState oldState = getState();
-      try {
-        getStateMachine().doTransition(event.getType(), event);
-      } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
-        eventHandler.handle(new SubQueryEvent(getId(),
-            SubQueryEventType.SQ_INTERNAL_ERROR));
-      }
-
-      //notify the eventhandler of state change
-      if (LOG.isDebugEnabled()) {
-        if (oldState != getState()) {
-          LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to 
"
-              + getState());
-        }
-      }
-    }
-
-    finally {
-      writeLock.unlock();
-    }
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
index 4c78c26..0a1ad42 100644
--- 
a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
+++ 
b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
@@ -24,10 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.YarnException;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.AMRMClientImpl;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
@@ -159,8 +156,8 @@ public class RMContainerAllocator extends AMRMClientImpl
     if (allocatedContainers.size() > 0) {
       for (Container container : allocatedContainers) {
         SubQueryId subQueryId = subQueryMap.get(container.getPriority());
-        if (!subQueryMap.containsKey(container.getPriority()) ||
-            context.getSubQuery(subQueryId).getState() == 
SubQueryState.SUCCEEDED) {
+        SubQueryState state = context.getSubQuery(subQueryId).getState();
+        if (!(isRunningState(state) && 
subQueryMap.containsKey(container.getPriority()))) {
           releaseAssignedContainer(container.getId());
           synchronized (subQueryMap) {
             subQueryMap.remove(container.getPriority());
@@ -180,6 +177,11 @@ public class RMContainerAllocator extends AMRMClientImpl
     }
   }
 
+  private static boolean isRunningState(SubQueryState state) {
+    return state == SubQueryState.INIT || state == SubQueryState.NEW ||
+        state == SubQueryState.CONTAINER_ALLOCATED || state == 
SubQueryState.RUNNING;
+  }
+
   @Override
   public void handle(ContainerAllocationEvent event) {
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java 
b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
index ff5ca86..0fd534e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
@@ -257,6 +257,7 @@ public class TaskRunner extends AbstractService {
                 // if there has been no assigning task for a given period,
                 // TaskRunner will retry to request an assigning task.
                 LOG.error(te);
+
                 continue;
               }
 

Reply via email to