Updated Branches: refs/heads/master da1fd3264 -> 44b0d2232
TAJO-50: Cleanup SubQuery. (hyunsik) Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/44b0d223 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/44b0d223 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/44b0d223 Branch: refs/heads/master Commit: 44b0d2232fdca80338165ad0629eebbf90dcb7e2 Parents: da1fd32 Author: Hyunsik Choi <[email protected]> Authored: Sat May 4 02:16:00 2013 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Sat May 4 02:16:00 2013 +0900 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../src/main/java/tajo/master/GlobalPlanner.java | 2 +- .../src/main/java/tajo/master/Priority.java | 54 -- .../src/main/java/tajo/master/Query.java | 40 +- .../src/main/java/tajo/master/QueryMaster.java | 12 +- .../src/main/java/tajo/master/Repartitioner.java | 22 +- .../src/main/java/tajo/master/SubQuery.java | 710 +++++++-------- .../java/tajo/master/rm/RMContainerAllocator.java | 14 +- .../src/main/java/tajo/worker/TaskRunner.java | 1 + 9 files changed, 400 insertions(+), 457 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index db43b6d..413be47 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,8 @@ Release 0.2.0 - unreleased IMPROVEMENTS + TAJO-50: Cleanup SubQuery. (hyunsik) + TAJO-44: Adopt AMRMClient to RMContainerAllocator, RMCommunicator. (hyunsik) TAJO-42: Divide SubQuery into FSM and execution block parts. (hyunsik) http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java index 2709c98..b9fb587 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java @@ -922,7 +922,7 @@ public class GlobalPlanner { ExecutionBlock execBlock = subQuery.getBlock(); QueryUnit unit = new QueryUnit( QueryIdFactory.newQueryUnitId(subQuery.getId()), execBlock.isLeafBlock(), - subQuery.eventHandler); + subQuery.getEventHandler()); unit.setLogicalPlan(execBlock.getPlan()); return unit; } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/Priority.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Priority.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Priority.java deleted file mode 100644 index 7e95d17..0000000 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Priority.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package tajo.master; - -public class Priority implements Comparable<Priority> { - private int priority; - - public Priority(int prio) { - set(prio); - } - - public void set(int prio) { - this.priority = prio; - } - - public int get() { - return this.priority; - } - - @Override - public int compareTo(Priority o) { - return this.get() - o.get(); - } - - @Override - public boolean equals(Object o) { - if (o instanceof Priority) { - Priority p = (Priority) o; - return p.get() == this.get(); - } - return false; - } - - @Override - public String toString() { - return "" + priority; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java index 292b9fd..d740cbd 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java @@ -36,7 +36,6 @@ import tajo.catalog.TableDesc; import tajo.catalog.TableDescImpl; import tajo.catalog.TableMeta; import tajo.catalog.proto.CatalogProtos.StoreType; -import tajo.catalog.statistics.TableStat; import tajo.engine.json.GsonCreator; import tajo.engine.planner.global.MasterPlan; import tajo.engine.planner.logical.ExprType; @@ -47,7 +46,10 @@ import tajo.storage.StorageManager; import tajo.util.IndexUtil; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -152,9 +154,9 @@ public class Query implements EventHandler<QueryEvent> { float [] subProgresses = new float[subqueries.size()]; boolean finished = true; for (SubQuery subquery: subqueries.values()) { - if (subquery.getStateMachine().getCurrentState() != SubQueryState.NEW) { + if (subquery.getState() != SubQueryState.NEW) { subProgresses[idx] = subquery.getProgress(); - if (finished == true && subquery.getState() != SubQueryState.SUCCEEDED) { + if (finished && subquery.getState() != SubQueryState.SUCCEEDED) { finished = false; } } else { @@ -163,7 +165,7 @@ public class Query implements EventHandler<QueryEvent> { idx++; } - if (finished == true) { + if (finished) { return 1.0f; } @@ -267,15 +269,6 @@ public class Query implements EventHandler<QueryEvent> { @Override public QueryState transition(Query query, QueryEvent queryEvent) { query.setStartTime(); - - ExecutionBlockCursor cursor = query.getExecutionBlockCursor(); - while(cursor.hasNext()) { - ExecutionBlock block = cursor.nextBlock(); - System.out.println(block.getId()); - System.out.println(block.getPlan()); - System.out.println("--------------------------------"); - } - query.getExecutionBlockCursor().reset(); return QueryState.QUERY_INIT; } } @@ -313,20 +306,18 @@ public class Query implements EventHandler<QueryEvent> { query.addSubQuery(nextSubQuery); nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT)); - LOG.info("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority().get()); + LOG.info("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority()); LOG.info("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan()); - QueryState state = query.checkQueryForCompleted(); - return state; + return query.checkQueryForCompleted(); - } else { + } else { // Finish a query if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) { - SubQuerySucceeEvent succeeEvent = (SubQuerySucceeEvent) castEvent; SubQuery subQuery = query.getSubQuery(castEvent.getSubQueryId()); TableDesc desc = new TableDescImpl(query.conf.getOutputTable(), - succeeEvent.getTableMeta(), query.context.getOutputPath()); + subQuery.getTableMeta(), query.context.getOutputPath()); query.setResultDesc(desc); try { - query.writeStat(query.context.getOutputPath(), subQuery, succeeEvent.getTableMeta().getStat()); + query.writeStat(query.context.getOutputPath(), subQuery); } catch (IOException e) { e.printStackTrace(); } @@ -417,7 +408,7 @@ public class Query implements EventHandler<QueryEvent> { } } - private void writeStat(Path outputPath, SubQuery subQuery, TableStat stat) + private void writeStat(Path outputPath, SubQuery subQuery) throws IOException { ExecutionBlock execBlock = subQuery.getBlock(); if (execBlock.getPlan().getType() == ExprType.CREATE_INDEX) { @@ -438,10 +429,7 @@ public class Query implements EventHandler<QueryEvent> { sm.writeTableMeta(indexPath, meta); } else { - TableMeta meta = TCatUtil.newTableMeta(execBlock.getOutputSchema(), - StoreType.CSV); - meta.setStat(stat); - sm.writeTableMeta(outputPath, meta); + sm.writeTableMeta(outputPath, subQuery.getTableMeta()); } } } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java index 86581f0..8891861 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java @@ -45,6 +45,7 @@ import tajo.master.TajoMaster.MasterContext; import tajo.master.TaskRunnerLauncherImpl.Container; import tajo.master.event.*; import tajo.master.rm.RMContainerAllocator; +import tajo.storage.StorageManager; import tajo.storage.StorageUtil; import tajo.util.TajoIdUtils; @@ -81,6 +82,7 @@ public class QueryMaster extends CompositeService implements EventHandler { private CatalogService catalog; private boolean isCreateTableStmt; + private StorageManager storageManager; private FileSystem defaultFS; private Path outputPath; @@ -115,6 +117,7 @@ public class QueryMaster extends CompositeService implements EventHandler { rpc = masterContext.getYarnRPC(); catalog = masterContext.getCatalog(); + storageManager = masterContext.getStorageManager(); taskRunnerListener = new TaskRunnerListener(queryContext); addIfService(taskRunnerListener); @@ -124,8 +127,7 @@ public class QueryMaster extends CompositeService implements EventHandler { dispatcher.register(ContainerAllocatorEventType.class, rmAllocator); query = new Query(queryContext, queryId, clock, appSubmitTime, - "", dispatcher.getEventHandler(), masterPlan, - masterContext.getStorageManager()); + "", dispatcher.getEventHandler(), masterPlan, storageManager); initStagingDir(); // QueryEventDispatcher is already registered in TajoMaster @@ -212,7 +214,7 @@ public class QueryMaster extends CompositeService implements EventHandler { implements EventHandler<TaskSchedulerEvent> { public void handle(TaskSchedulerEvent event) { SubQuery subQuery = query.getSubQuery(event.getSubQueryId()); - subQuery.taskScheduler.handle(event); + subQuery.getTaskScheduler().handle(event); } } @@ -337,6 +339,10 @@ public class QueryMaster extends CompositeService implements EventHandler { public long getFinishTime() { return query.getFinishTime(); } + + public StorageManager getStorageManager() { + return storageManager; + } } private class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> { http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java index 7cf4422..51aff94 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java @@ -58,7 +58,7 @@ public class Repartitioner { public static QueryUnit [] createJoinTasks(SubQuery subQuery) throws IOException { ExecutionBlock execBlock = subQuery.getBlock(); - CatalogService catalog = subQuery.queryContext.getCatalog(); + CatalogService catalog = subQuery.getContext().getCatalog(); ScanNode[] scans = execBlock.getScanNodes(); Path tablePath; @@ -85,7 +85,7 @@ public class Repartitioner { } // Getting a table stat for each scan - stats[i] = subQuery.getChildQuery(scans[i]).getStats(); + stats[i] = subQuery.getChildQuery(scans[i]).getTableStat(); } // Assigning either fragments or fetch urls to query units @@ -93,7 +93,7 @@ public class Repartitioner { if (scans[0].isBroadcast() || scans[1].isBroadcast()) { tasks = new QueryUnit[1]; tasks[0] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), 0), - false, subQuery.eventHandler); + false, subQuery.getEventHandler()); tasks[0].setLogicalPlan(execBlock.getPlan()); tasks[0].setFragment(scans[0].getTableId(), fragments[0]); tasks[0].setFragment(scans[1].getTableId(), fragments[1]); @@ -135,7 +135,7 @@ public class Repartitioner { // Getting the desire number of join tasks according to the volumn // of a larger table int largerIdx = stats[0].getNumBytes() >= stats[1].getNumBytes() ? 0 : 1; - int desireJoinTaskVolumn = subQuery.queryContext.getConf(). + int desireJoinTaskVolumn = subQuery.getContext().getConf(). getIntVar(ConfVars.JOIN_TASK_VOLUME); // calculate the number of tasks according to the data size @@ -181,7 +181,7 @@ public class Repartitioner { for (int i = 0; i < taskNum; i++) { tasks[i] = new QueryUnit( QueryIdFactory.newQueryUnitId(subQuery.getId(), i), execBlock.isLeafBlock(), - subQuery.eventHandler); + subQuery.getEventHandler()); tasks[i].setLogicalPlan(execBlock.getPlan()); for (Fragment fragment : fragments) { tasks[i].setFragment2(fragment); @@ -253,14 +253,14 @@ public class Repartitioner { int maxNum) throws InternalException { ExecutionBlock execBlock = subQuery.getBlock(); - TableStat stat = childSubQuery.getStats(); + TableStat stat = childSubQuery.getTableStat(); if (stat.getNumRows() == 0) { return new QueryUnit[0]; } ScanNode scan = execBlock.getScanNodes()[0]; Path tablePath; - tablePath = subQuery.sm.getTablePath(scan.getTableId()); + tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableId()); StoreTableNode store = (StoreTableNode) childSubQuery.getBlock().getPlan(); SortNode sort = (SortNode) store.getSubNode(); @@ -296,7 +296,7 @@ public class Repartitioner { List<String> basicFetchURIs = new ArrayList<String>(); - SubQuery child = childSubQuery.queryContext.getSubQuery( + SubQuery child = childSubQuery.getContext().getSubQuery( subQuery.getBlock().getChildBlock(scan).getId()); for (QueryUnit qu : child.getQueryUnits()) { for (IntermediateEntry p : qu.getIntermediateData()) { @@ -372,14 +372,14 @@ public class Repartitioner { SubQuery childSubQuery, int maxNum) { ExecutionBlock execBlock = subQuery.getBlock(); - TableStat stat = childSubQuery.getStats(); + TableStat stat = childSubQuery.getTableStat(); if (stat.getNumRows() == 0) { return new QueryUnit[0]; } ScanNode scan = execBlock.getScanNodes()[0]; Path tablePath; - tablePath = subQuery.sm.getTablePath(scan.getTableId()); + tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableId()); List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>(); for (QueryUnit tasks : childSubQuery.getQueryUnits()) { @@ -513,7 +513,7 @@ public class Repartitioner { QueryUnit [] tasks = new QueryUnit[num]; for (int i = 0; i < num; i++) { tasks[i] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), i), - false, subQuery.eventHandler); + false, subQuery.getEventHandler()); tasks[i].setFragment2(frag); tasks[i].setLogicalPlan(plan); } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java index ff73334..398e1ae 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java @@ -26,27 +26,32 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.state.*; +import org.apache.hadoop.yarn.util.Records; import tajo.QueryIdFactory; import tajo.QueryUnitId; import tajo.SubQueryId; -import tajo.catalog.*; -import tajo.catalog.proto.CatalogProtos.StoreType; +import tajo.catalog.CatalogService; +import tajo.catalog.TCatUtil; +import tajo.catalog.TableDesc; +import tajo.catalog.TableMeta; import tajo.catalog.statistics.ColumnStat; import tajo.catalog.statistics.StatisticsUtil; import tajo.catalog.statistics.TableStat; import tajo.conf.TajoConf; -import tajo.engine.json.GsonCreator; import tajo.engine.planner.PlannerUtil; -import tajo.engine.planner.logical.*; +import tajo.engine.planner.logical.ExprType; +import tajo.engine.planner.logical.GroupbyNode; +import tajo.engine.planner.logical.ScanNode; +import tajo.engine.planner.logical.StoreTableNode; import tajo.master.QueryMaster.QueryContext; import tajo.master.event.*; import tajo.storage.Fragment; import tajo.storage.StorageManager; -import tajo.util.IndexUtil; import java.io.IOException; import java.util.*; @@ -60,19 +65,19 @@ import static tajo.conf.TajoConf.ConfVars; /** - * SubQuery is an instance of an ExecutionBlock. + * SubQuery plays a role in controlling an ExecutionBlock and is a finite state machine. */ public class SubQuery implements EventHandler<SubQueryEvent> { private static final Log LOG = LogFactory.getLog(SubQuery.class); private ExecutionBlock block; - private Priority priority; - private TableStat stats; - EventHandler eventHandler; - final StorageManager sm; - TaskSchedulerImpl taskScheduler; - QueryContext queryContext; + private int priority; + private TableMeta meta; + private EventHandler eventHandler; + private final StorageManager sm; + private TaskSchedulerImpl taskScheduler; + private QueryContext context; private long startTime; private long finishTime; @@ -134,7 +139,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { private int completedTaskCount = 0; public SubQuery(QueryContext context, ExecutionBlock block, StorageManager sm) { - this.queryContext = context; + this.context = context; this.block = block; this.sm = sm; this.eventHandler = context.getEventHandler(); @@ -145,10 +150,32 @@ public class SubQuery implements EventHandler<SubQueryEvent> { stateMachine = stateMachineFactory.make(this); } + public QueryContext getContext() { + return context; + } + + public EventHandler getEventHandler() { + return eventHandler; + } + + public TaskScheduler getTaskScheduler() { + return taskScheduler; + } + + public void setStartTime() { + startTime = context.getClock().getTime(); + } + + @SuppressWarnings("UnusedDeclaration") public long getStartTime() { return this.startTime; } + public void setFinishTime() { + finishTime = context.getClock().getTime(); + } + + @SuppressWarnings("UnusedDeclaration") public long getFinishTime() { return this.finishTime; } @@ -156,7 +183,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { public float getProgress() { readLock.lock(); try { - if (getStateMachine().getCurrentState() == SubQueryState.NEW) { + if (getState() == SubQueryState.NEW) { return 0; } else { if (completedTaskCount == 0) { @@ -193,21 +220,20 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } public void setPriority(int priority) { - if (this.priority == null) { - this.priority = new Priority(priority); - } + this.priority = priority; } - public StorageManager getStorageManager() { - return sm; + + public int getPriority() { + return this.priority; } - public void setStats(TableStat stat) { - this.stats = stat; + public StorageManager getStorageManager() { + return sm; } public SubQuery getChildQuery(ScanNode scanForChild) { - return queryContext.getSubQuery(block.getChildBlock(scanForChild).getId()); + return context.getSubQuery(block.getChildBlock(scanForChild).getId()); } public SubQueryId getId() { @@ -215,7 +241,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } public QueryUnit[] getQueryUnits() { - // TODO - to be changed to unified getter return tasks.values().toArray(new QueryUnit[tasks.size()]); } @@ -223,12 +248,17 @@ public class SubQuery implements EventHandler<SubQueryEvent> { return tasks.get(qid); } - public Priority getPriority() { - return this.priority; + public void setTableMeta(TableMeta meta) { + this.meta = meta; + } + + @SuppressWarnings("UnusedDeclaration") + public TableMeta getTableMeta() { + return meta; } - public TableStat getStats() { - return this.stats; + public TableStat getTableStat() { + return this.meta.getStat(); } public String toString() { @@ -264,15 +294,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } } - private void finishUnionUnit() throws IOException { - // write meta and continue - TableStat stat = generateUnionStat(this); - setStats(stat); - writeStat(this, stat); - //unit.setState(QueryStatus.QUERY_FINISHED); - } - - private static TableStat generateUnionStat(SubQuery unit) { + private static TableStat computeStatFromUnionBlock(SubQuery unit) { TableStat stat = new TableStat(); TableStat childStat; long avgRows = 0, numBytes = 0, numRows = 0; @@ -282,8 +304,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> { Iterator<ExecutionBlock> it = unit.getBlock().getChildBlocks().iterator(); while (it.hasNext()) { ExecutionBlock block = it.next(); - SubQuery childSubQuery = unit.queryContext.getSubQuery(block.getId()); - childStat = childSubQuery.getStats(); + SubQuery childSubQuery = unit.context.getSubQuery(block.getId()); + childStat = childSubQuery.getTableStat(); avgRows += childStat.getAvgRows(); columnStats.addAll(childStat.getColumnStats()); numBlocks += childStat.getNumBlocks(); @@ -301,129 +323,136 @@ public class SubQuery implements EventHandler<SubQueryEvent> { return stat; } - public void cleanUp() { + public TableMeta buildTableMeta() throws IOException { + finishTime = context.getClock().getTime(); + + TableStat stat; if (block.hasUnion()) { - try { - // write meta and continue - TableStat stat = generateUnionStat(this); - setStats(stat); - writeStat(this, stat); - //unit.setState(QueryStatus.QUERY_FINISHED); - } catch (IOException e) { - e.printStackTrace(); - } + stat = computeStatFromUnionBlock(this); + } else { + stat = computeStatFromTasks(); + } + TableMeta meta = writeStat(this, stat); + meta.setStat(stat); + setTableMeta(meta); + return meta; + } + + private TableStat computeStatFromTasks() { + List<TableStat> stats = Lists.newArrayList(); + for (QueryUnit unit : getQueryUnits()) { + stats.add(unit.getStats()); + } + TableStat tableStat = StatisticsUtil.aggregateTableStat(stats); + return tableStat; + } + + private TableMeta writeStat(SubQuery subQuery, TableStat stat) + throws IOException { + ExecutionBlock execBlock = subQuery.getBlock(); + StoreTableNode storeTableNode = execBlock.getStoreTableNode(); + TableMeta meta = toTableMeta(storeTableNode); + meta.setStat(stat); + sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta); + return meta; + } + + private static TableMeta toTableMeta(StoreTableNode store) { + if (store.hasOptions()) { + return TCatUtil.newTableMeta(store.getOutSchema(), + store.getStorageType(), store.getOptions()); } else { - LOG.info("SubQuery: " + getId() + " sets TableStat"); - TableStat stat = generateStat(); + return TCatUtil.newTableMeta(store.getOutSchema(), + store.getStorageType()); + } + } + + private void stopScheduler() { + // If there are launched TaskRunners, send the 'shouldDie' message to all r + // via received task requests. + if (taskScheduler != null) { + taskScheduler.stop(); + } + } + + private void releaseContainers() { + // If there are still live TaskRunners, try to kill the containers. + for (Entry<ContainerId, Container> entry : containers.entrySet()) { + eventHandler.handle(new TaskRunnerStopEvent(getId(), entry.getValue())); + } + } + + private void finish() { + TableMeta meta = null; + try { + meta = buildTableMeta(); + } catch (IOException e) { + e.printStackTrace(); + } + + setTableMeta(meta); + setFinishTime(); + eventHandler.handle(new SubQuerySucceeEvent(getId(), meta)); + } + + @Override + public void handle(SubQueryEvent event) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing " + event.getSubQueryId() + " of type " + event.getType()); + } + + try { + writeLock.lock(); + SubQueryState oldState = getState(); try { - writeStat(this, stat); - } catch (IOException e) { + getStateMachine().doTransition(event.getType(), event); + } catch (InvalidStateTransitonException e) { + LOG.error("Can't handle this event at current state", e); + eventHandler.handle(new SubQueryEvent(getId(), + SubQueryEventType.SQ_INTERNAL_ERROR)); + } + + // notify the eventhandler of state change + if (LOG.isDebugEnabled()) { + if (oldState != getState()) { + LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to " + + getState()); + } } } - finishTime = queryContext.getClock().getTime(); + finally { + writeLock.unlock(); + } } - private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> { @Override public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) { - subQuery.startTime = subQuery.queryContext.getClock().getTime(); - subQuery.taskScheduler = new TaskSchedulerImpl(subQuery.queryContext); - subQuery.taskScheduler.init(subQuery.queryContext.getConf()); - subQuery.taskScheduler.start(); - + subQuery.setStartTime(); ExecutionBlock execBlock = subQuery.getBlock(); + SubQueryState state; try { - // if subquery is dummy, which means it requires only a logical step - // instead of actual query. An 'union all' is an example of - // a dummy subquery. + // Union operator does not require actual query processing. It is performed logically. if (execBlock.hasUnion()) { - subQuery.finishUnionUnit(); - subQuery.cleanUp(); - TableMeta meta = new TableMetaImpl(execBlock.getOutputSchema(), - StoreType.CSV, new Options(), subQuery.getStats()); - subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(), - meta)); - return SubQueryState.SUCCEEDED; + subQuery.finish(); + state = SubQueryState.SUCCEEDED; } else { - QueryUnit [] tasks; - // TODO - should be improved - if (execBlock.isLeafBlock() && execBlock.getScanNodes().length == 1) { - - // if parent is join, this subquery is for partitioning data. - if (execBlock.hasParentBlock()) { - int numTasks = calculatePartitionNum(subQuery); - Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks); - } - - tasks = createLeafTasks(subQuery); - } else if (execBlock.getScanNodes().length > 1) { - // if parent is join, this subquery is for partitioning data. - if (execBlock.hasParentBlock()) { - int numTasks = calculatePartitionNum(subQuery); - Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks); - } - - if (subQuery.getId().getId() == 15) { - System.out.println("error point!"); - } - - tasks = Repartitioner.createJoinTasks(subQuery); + setRepartitionIfNecessary(subQuery); + createTasks(subQuery); - } else { - // if parent is join, this subquery is for partitioning data. - if (execBlock.hasParentBlock()) { - int partitionNum = calculatePartitionNum(subQuery); - Repartitioner.setPartitionNumberForTwoPhase(subQuery, partitionNum); - } - int numTasks = getNonLeafTaskNum(subQuery); - - SubQueryId childId = subQuery.getBlock().getChildBlocks().iterator().next().getId(); - SubQuery child = subQuery.queryContext.getSubQuery(childId); - tasks = Repartitioner.createNonLeafTask(subQuery, child, numTasks); - } - for (QueryUnit task : tasks) { - subQuery.addTask(task); - } - LOG.info("Create " + tasks.length + " Tasks"); - - // if there is no tasks - if (subQuery.tasks.size() == 0) { - subQuery.cleanUp(); - TableMeta meta = toTableMeta(execBlock.getStoreTableNode()); - meta.setStat(subQuery.getStats()); - subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(), - meta)); + if (subQuery.tasks.size() == 0) { // if there is no tasks + subQuery.finish(); return SubQueryState.SUCCEEDED; - } else { - int numRequest = Math.min(tasks.length, - subQuery.queryContext.getNumClusterNode() * 4); - - final Resource resource = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance( - Resource.class); - if (tasks.length <= subQuery.queryContext.getNumClusterNode()) { - resource.setMemory(subQuery.queryContext.getMaxContainerCapability()); - } else { - resource.setMemory(2000); - } - - org.apache.hadoop.yarn.api.records.Priority priority = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance( - org.apache.hadoop.yarn.api.records.Priority.class); - priority.setPriority(subQuery.getPriority().get()); - ContainerAllocationEvent event = - new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ, - subQuery.getId(), priority, resource, numRequest, execBlock.isLeafBlock(), 0.0f); - subQuery.eventHandler.handle(event); + initTaskScheduler(subQuery); + allocateContainers(subQuery); + return SubQueryState.INIT; } } - return SubQueryState.INIT; } catch (Exception e) { LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e); subQuery.eventHandler.handle( @@ -432,9 +461,181 @@ public class SubQuery implements EventHandler<SubQueryEvent> { new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.FAILED)); return SubQueryState.FAILED; } + + return state; } - public QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException { + private void initTaskScheduler(SubQuery subQuery) { + subQuery.taskScheduler = new TaskSchedulerImpl(subQuery.context); + subQuery.taskScheduler.init(subQuery.context.getConf()); + subQuery.taskScheduler.start(); + } + + /** + * If a parent block requires a repartition operation, the method sets proper repartition + * methods and the number of partitions to a given subquery. + */ + private static void setRepartitionIfNecessary(SubQuery subQuery) { + if (subQuery.getBlock().hasParentBlock()) { + int numTasks = calculatePartitionNum(subQuery); + Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks); + } + } + + /** + * Getting the desire number of partitions according to the volume of input data. + * This method is only used to determine the partition key number of hash join or aggregation. + * + * @param subQuery + * @return + */ + public static int calculatePartitionNum(SubQuery subQuery) { + TajoConf conf = subQuery.context.getConf(); + ExecutionBlock parent = subQuery.getBlock().getParentBlock(); + + GroupbyNode grpNode = null; + if (parent != null) { + grpNode = (GroupbyNode) PlannerUtil.findTopNode( + parent.getPlan(), ExprType.GROUP_BY); + } + + // Is this subquery the first step of join? + if (parent != null && parent.getScanNodes().length == 2) { + Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator(); + + // for inner + ExecutionBlock outer = child.next(); + long outerVolume = getInputVolume(subQuery.context, outer); + + // for inner + ExecutionBlock inner = child.next(); + long innerVolume = getInputVolume(subQuery.context, inner); + LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576)); + LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576)); + + long smaller = Math.min(outerVolume, innerVolume); + + int mb = (int) Math.ceil((double)smaller / 1048576); + LOG.info("Smaller Table's volume is approximately " + mb + " MB"); + // determine the number of task + int taskNum = (int) Math.ceil((double)mb / + conf.getIntVar(ConfVars.JOIN_PARTITION_VOLUME)); + LOG.info("The determined number of join partitions is " + taskNum); + return taskNum; + + // Is this subquery the first step of group-by? + } else if (grpNode != null) { + + if (grpNode.getGroupingColumns().length == 0) { + return 1; + } else { + long volume = getInputVolume(subQuery.context, subQuery.block); + + int mb = (int) Math.ceil((double)volume / 1048576); + LOG.info("Table's volume is approximately " + mb + " MB"); + // determine the number of task + int taskNum = (int) Math.ceil((double)mb / + conf.getIntVar(ConfVars.AGGREGATION_PARTITION_VOLUME)); + LOG.info("The determined number of aggregation partitions is " + taskNum); + return taskNum; + } + } else { + LOG.info("============>>>>> Unexpected Case! <<<<<================"); + long volume = getInputVolume(subQuery.context, subQuery.block); + + int mb = (int) Math.ceil((double)volume / 1048576); + LOG.info("Table's volume is approximately " + mb + " MB"); + // determine the number of task per 128MB + int taskNum = (int) Math.ceil((double)mb / 128); + LOG.info("The determined number of partitions is " + taskNum); + return taskNum; + } + } + + private static void createTasks(SubQuery subQuery) throws IOException { + ExecutionBlock execBlock = subQuery.getBlock(); + QueryUnit [] tasks; + if (execBlock.isLeafBlock() && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan + tasks = createLeafTasks(subQuery); + + } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join + tasks = Repartitioner.createJoinTasks(subQuery); + + } else { // Case 3: Others (Sort or Aggregation) + int numTasks = getNonLeafTaskNum(subQuery); + SubQueryId childId = subQuery.getBlock().getChildBlocks().iterator().next().getId(); + SubQuery child = subQuery.context.getSubQuery(childId); + tasks = Repartitioner.createNonLeafTask(subQuery, child, numTasks); + } + + LOG.info("Create " + tasks.length + " Tasks"); + + for (QueryUnit task : tasks) { + subQuery.addTask(task); + } + } + + /** + * Getting the desire number of tasks according to the volume of input data + * + * @param subQuery + * @return + */ + public static int getNonLeafTaskNum(SubQuery subQuery) { + // Getting intermediate data size + long volume = getInputVolume(subQuery.context, subQuery.getBlock()); + + int mb = (int) Math.ceil((double)volume / 1048576); + LOG.info("Table's volume is approximately " + mb + " MB"); + // determine the number of task per 64MB + int maxTaskNum = (int) Math.ceil((double)mb / 64); + LOG.info("The determined number of non-leaf tasks is " + maxTaskNum); + return maxTaskNum; + } + + public static long getInputVolume(QueryContext context, ExecutionBlock execBlock) { + CatalogService catalog = context.getCatalog(); + if (execBlock.isLeafBlock()) { + ScanNode outerScan = execBlock.getScanNodes()[0]; + TableStat stat = catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat(); + return stat.getNumBytes(); + } else { + long aggregatedVolume = 0; + for (ExecutionBlock childBlock : execBlock.getChildBlocks()) { + SubQuery subquery = context.getSubQuery(childBlock.getId()); + aggregatedVolume += subquery.getTableStat().getNumBytes(); + } + + return aggregatedVolume; + } + } + + public static void allocateContainers(SubQuery subQuery) { + ExecutionBlock execBlock = subQuery.getBlock(); + QueryUnit [] tasks = subQuery.getQueryUnits(); + + int numRequest = Math.min(tasks.length, + subQuery.context.getNumClusterNode() * 4); + + final Resource resource = + RecordFactoryProvider.getRecordFactory(null).newRecordInstance( + Resource.class); + if (tasks.length <= subQuery.context.getNumClusterNode()) { + resource.setMemory(subQuery.context.getMaxContainerCapability()); + } else { + resource.setMemory(2000); + } + + Priority priority = Records.newRecord(Priority.class); + priority.setPriority(subQuery.getPriority()); + ContainerAllocationEvent event = + new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ, + subQuery.getId(), priority, resource, numRequest, + execBlock.isLeafBlock(), 0.0f); + subQuery.eventHandler.handle(event); + } + + private static QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException { ExecutionBlock execBlock = subQuery.getBlock(); ScanNode[] scans = execBlock.getScanNodes(); Preconditions.checkArgument(scans.length == 1, "Must be Scan Query"); @@ -442,13 +643,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> { Path inputPath; ScanNode scan = scans[0]; - TableDesc desc = subQuery.queryContext.getCatalog().getTableDesc(scan.getTableId()); + TableDesc desc = subQuery.context.getCatalog().getTableDesc(scan.getTableId()); inputPath = desc.getPath(); meta = desc.getMeta(); // TODO - should be change the inner directory Path oldPath = new Path(inputPath, "data"); - FileSystem fs = inputPath.getFileSystem(subQuery.queryContext.getConf()); + FileSystem fs = inputPath.getFileSystem(subQuery.context.getConf()); if (fs.exists(oldPath)) { inputPath = oldPath; } @@ -459,129 +660,24 @@ public class SubQuery implements EventHandler<SubQueryEvent> { int i = 0; for (Fragment fragment : fragments) { - queryUnit = newQueryUnit(subQuery, i++); - queryUnit.setFragment(scan.getTableId(), fragment); + queryUnit = newQueryUnit(subQuery, i++, fragment); queryUnits.add(queryUnit); } return queryUnits.toArray(new QueryUnit[queryUnits.size()]); } - private QueryUnit newQueryUnit(SubQuery subQuery, int taskId) { + private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, Fragment fragment) { ExecutionBlock execBlock = subQuery.getBlock(); QueryUnit unit = new QueryUnit( QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(), subQuery.eventHandler); unit.setLogicalPlan(execBlock.getPlan()); + unit.setFragment2(fragment); return unit; } } - /** - * Getting the desire number of partitions according to the volume of input data. - * This method is only used to determine the partition key number of hash join or aggregation. - * - * @param subQuery - * @return - */ - public static int calculatePartitionNum(SubQuery subQuery) { - TajoConf conf = subQuery.queryContext.getConf(); - ExecutionBlock parent = subQuery.getBlock().getParentBlock(); - - GroupbyNode grpNode = null; - if (parent != null) { - grpNode = (GroupbyNode) PlannerUtil.findTopNode( - parent.getPlan(), ExprType.GROUP_BY); - } - - // Is this subquery the first step of join? - if (parent != null && parent.getScanNodes().length == 2) { - Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator(); - - // for inner - ExecutionBlock outer = child.next(); - long outerVolume = getInputVolume(subQuery.queryContext, outer); - - // for inner - ExecutionBlock inner = child.next(); - long innerVolume = getInputVolume(subQuery.queryContext, inner); - LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576)); - LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576)); - - long smaller = Math.min(outerVolume, innerVolume); - - int mb = (int) Math.ceil((double)smaller / 1048576); - LOG.info("Smaller Table's volume is approximately " + mb + " MB"); - // determine the number of task - int taskNum = (int) Math.ceil((double)mb / - conf.getIntVar(ConfVars.JOIN_PARTITION_VOLUME)); - LOG.info("The determined number of join partitions is " + taskNum); - return taskNum; - - // Is this subquery the first step of group-by? - } else if (grpNode != null) { - - if (grpNode.getGroupingColumns().length == 0) { - return 1; - } else { - long volume = getInputVolume(subQuery.queryContext, subQuery.block); - - int mb = (int) Math.ceil((double)volume / 1048576); - LOG.info("Table's volume is approximately " + mb + " MB"); - // determine the number of task - int taskNum = (int) Math.ceil((double)mb / - conf.getIntVar(ConfVars.AGGREGATION_PARTITION_VOLUME)); - LOG.info("The determined number of aggregation partitions is " + taskNum); - return taskNum; - } - } else { - LOG.info("============>>>>> Unexpected Case! <<<<<================"); - long volume = getInputVolume(subQuery.queryContext, subQuery.block); - - int mb = (int) Math.ceil((double)volume / 1048576); - LOG.info("Table's volume is approximately " + mb + " MB"); - // determine the number of task per 128MB - int taskNum = (int) Math.ceil((double)mb / 128); - LOG.info("The determined number of partitions is " + taskNum); - return taskNum; - } - } - - public static long getInputVolume(QueryContext context, ExecutionBlock execBlock) { - CatalogService catalog = context.getCatalog(); - if (execBlock.isLeafBlock()) { - ScanNode outerScan = execBlock.getScanNodes()[0]; - TableStat stat = catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat(); - return stat.getNumBytes(); - } else { - long aggregatedVolume = 0; - for (ExecutionBlock childBlock : execBlock.getChildBlocks()) { - SubQuery subquery = context.getSubQuery(childBlock.getId()); - aggregatedVolume += subquery.getStats().getNumBytes(); - } - - return aggregatedVolume; - } - } - - /** - * Getting the desire number of tasks according to the volume of input data - * - * @param subQuery - * @return - */ - public static int getNonLeafTaskNum(SubQuery subQuery) { - // Getting intermediate data size - long volume = getInputVolume(subQuery.queryContext, subQuery.getBlock()); - - int mb = (int) Math.ceil((double)volume / 1048576); - LOG.info("Table's volume is approximately " + mb + " MB"); - // determine the number of task per 64MB - int maxTaskNum = (int) Math.ceil((double)mb / 64); - LOG.info("The determined number of non-leaf tasks is " + maxTaskNum); - return maxTaskNum; - } - int i = 0; private static class ContainerLaunchTransition implements SingleArcTransition<SubQuery, SubQueryEvent> { @@ -617,6 +713,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { @Override public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) { + // schedule tasks try { for (QueryUnitId taskId : subQuery.tasks.keySet()) { subQuery.eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE)); @@ -630,9 +727,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } } - private class TaskCompletedTransition implements - SingleArcTransition<SubQuery, SubQueryEvent> { - + private static class TaskCompletedTransition + implements SingleArcTransition<SubQuery, SubQueryEvent> { @Override public void transition(SubQuery subQuery, @@ -641,7 +737,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { SubQueryTaskEvent taskEvent = (SubQueryTaskEvent) event; QueryUnitAttempt task = subQuery.getQueryUnit(taskEvent.getTaskId()).getSuccessfulAttempt(); - LOG.info(getId() + " SubQuery Succeeded " + completedTaskCount + "/" + LOG.info(subQuery.getId() + " SubQuery Succeeded " + subQuery.completedTaskCount + "/" + subQuery.tasks.size() + " on " + task.getHost()); if (subQuery.completedTaskCount == subQuery.tasks.size()) { subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), @@ -658,31 +754,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> { // TODO - Commit subQuery & do cleanup // TODO - records succeeded, failed, killed completed task // TODO - records metrics - - ExecutionBlock execBlock = subQuery.getBlock(); - - for (Entry<ContainerId, Container> entry : subQuery.containers.entrySet()) { - subQuery.eventHandler.handle(new TaskRunnerStopEvent(subQuery.getId(), - entry.getValue())); - } - subQuery.cleanUp(); - subQuery.taskScheduler.stop(); - - StoreTableNode storeTableNode = execBlock.getStoreTableNode(); - TableMeta meta = toTableMeta(storeTableNode); - meta.setStat(subQuery.getStats()); - - subQuery.eventHandler.handle(new SubQuerySucceeEvent(subQuery.getId(), - meta)); - subQuery.finishTime = subQuery.queryContext.getClock().getTime(); + subQuery.stopScheduler(); + subQuery.releaseContainers(); + subQuery.finish(); } } - SubQueryState finished(SubQueryState state) { - return state; - } - - class InternalErrorTransition + private static class InternalErrorTransition implements SingleArcTransition<SubQuery, SubQueryEvent> { @Override @@ -691,84 +769,4 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } } - - private TableStat generateStat() { - List<TableStat> stats = Lists.newArrayList(); - for (QueryUnit unit : getQueryUnits()) { - stats.add(unit.getStats()); - } - TableStat tableStat = StatisticsUtil.aggregateTableStat(stats); - setStats(tableStat); - return tableStat; - } - - private void writeStat(SubQuery subQuery, TableStat stat) - throws IOException { - ExecutionBlock execBlock = subQuery.getBlock(); - - if (execBlock.getPlan().getType() == ExprType.CREATE_INDEX) { - IndexWriteNode index = (IndexWriteNode) execBlock.getPlan(); - Path indexPath = new Path(sm.getTablePath(index.getTableName()), "index"); - TableMeta meta; - if (sm.getFileSystem().exists(new Path(indexPath, ".meta"))) { - meta = sm.getTableMeta(indexPath); - } else { - StoreTableNode storeTableNode = execBlock.getStoreTableNode(); - meta = toTableMeta(storeTableNode); - } - String indexName = IndexUtil.getIndexName(index.getTableName(), - index.getSortSpecs()); - String json = GsonCreator.getInstance().toJson(index.getSortSpecs()); - meta.putOption(indexName, json); - - sm.writeTableMeta(indexPath, meta); - - } else { - StoreTableNode storeTableNode = execBlock.getStoreTableNode(); - TableMeta meta = toTableMeta(storeTableNode); - meta.setStat(stat); - sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta); - } - } - - private static TableMeta toTableMeta(StoreTableNode store) { - if (store.hasOptions()) { - return TCatUtil.newTableMeta(store.getOutSchema(), - store.getStorageType(), store.getOptions()); - } else { - return TCatUtil.newTableMeta(store.getOutSchema(), - store.getStorageType()); - } - } - - @Override - public void handle(SubQueryEvent event) { - //if (LOG.isDebugEnabled()) { - LOG.debug("Processing " + event.getSubQueryId() + " of type " + event.getType()); - //} - - try { - writeLock.lock(); - SubQueryState oldState = getState(); - try { - getStateMachine().doTransition(event.getType(), event); - } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state", e); - eventHandler.handle(new SubQueryEvent(getId(), - SubQueryEventType.SQ_INTERNAL_ERROR)); - } - - //notify the eventhandler of state change - if (LOG.isDebugEnabled()) { - if (oldState != getState()) { - LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to " - + getState()); - } - } - } - - finally { - writeLock.unlock(); - } - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java index 4c78c26..0a1ad42 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java @@ -24,10 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMResponse; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.AMRMClientImpl; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; @@ -159,8 +156,8 @@ public class RMContainerAllocator extends AMRMClientImpl if (allocatedContainers.size() > 0) { for (Container container : allocatedContainers) { SubQueryId subQueryId = subQueryMap.get(container.getPriority()); - if (!subQueryMap.containsKey(container.getPriority()) || - context.getSubQuery(subQueryId).getState() == SubQueryState.SUCCEEDED) { + SubQueryState state = context.getSubQuery(subQueryId).getState(); + if (!(isRunningState(state) && subQueryMap.containsKey(container.getPriority()))) { releaseAssignedContainer(container.getId()); synchronized (subQueryMap) { subQueryMap.remove(container.getPriority()); @@ -180,6 +177,11 @@ public class RMContainerAllocator extends AMRMClientImpl } } + private static boolean isRunningState(SubQueryState state) { + return state == SubQueryState.INIT || state == SubQueryState.NEW || + state == SubQueryState.CONTAINER_ALLOCATED || state == SubQueryState.RUNNING; + } + @Override public void handle(ContainerAllocationEvent event) { http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/44b0d223/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java index ff5ca86..0fd534e 100644 --- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java +++ b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java @@ -257,6 +257,7 @@ public class TaskRunner extends AbstractService { // if there has been no assigning task for a given period, // TaskRunner will retry to request an assigning task. LOG.error(te); + continue; }
