Repository: tajo Updated Branches: refs/heads/master 42d79cf5b -> 161ee9ebc
TAJO-1336: Fix task failure of stopped task. (jinho) Closes #376 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/161ee9eb Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/161ee9eb Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/161ee9eb Branch: refs/heads/master Commit: 161ee9ebc641f7e2ab326360b7fc26ab318ef72c Parents: 42d79cf Author: jhkim <[email protected]> Authored: Fri Feb 6 12:29:37 2015 +0900 Committer: jhkim <[email protected]> Committed: Fri Feb 6 12:29:37 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/worker/ExecutionBlockContext.java | 28 ++++++------ .../main/java/org/apache/tajo/worker/Task.java | 47 +++++++++++++------- .../apache/tajo/worker/TaskAttemptContext.java | 2 +- .../apache/tajo/worker/TaskRunnerManager.java | 4 +- .../apache/tajo/querymaster/TestKillQuery.java | 34 ++++++++++++++ 6 files changed, 84 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 06ced2d..9043a98 100644 --- a/CHANGES +++ b/CHANGES @@ -183,6 +183,8 @@ Release 0.10.0 - unreleased BUG FIXES + TAJO-1336: Fix task failure of stopped task. (jinho) + TAJO-1316: NPE occurs when performing window functions after join. (jihun) http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index b120d5b..8cf94eb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -28,8 +28,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TajoProtos; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; @@ -42,7 +42,6 @@ import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.Pair; -import org.apache.tajo.worker.event.TaskRunnerStartEvent; import org.jboss.netty.channel.ConnectTimeoutException; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.util.Timer; @@ -75,6 +74,7 @@ public class ExecutionBlockContext { private FileSystem defaultFS; private ExecutionBlockId executionBlockId; private QueryContext queryContext; + private TajoWorker.WorkerContext workerContext; private String plan; private ExecutionBlockSharedResource resource; @@ -96,13 +96,14 @@ public class ExecutionBlockContext { private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap(); - public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, WorkerConnectionInfo queryMaster) - throws Throwable { + public ExecutionBlockContext(TajoConf conf, TajoWorker.WorkerContext workerContext, + TaskRunnerManager manager, QueryContext queryContext, String plan, + ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster) throws Throwable { this.manager = manager; - this.executionBlockId = event.getExecutionBlockId(); + this.executionBlockId = executionBlockId; this.connPool = RpcConnectionPool.getPool(); this.queryMaster = queryMaster; - this.systemConf = manager.getTajoConf(); + this.systemConf = conf; this.reporter = new Reporter(); this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); this.localFS = FileSystem.getLocal(systemConf); @@ -110,11 +111,10 @@ public class ExecutionBlockContext { // Setup QueryEngine according to the query plan // Here, we can setup row-based query engine or columnar query engine. this.queryEngine = new TajoQueryEngine(systemConf); - this.queryContext = event.getQueryContext(); - this.plan = event.getPlan(); + this.queryContext = queryContext; + this.plan = plan; this.resource = new ExecutionBlockSharedResource(); - - init(); + this.workerContext = workerContext; } public void init() throws Throwable { @@ -193,7 +193,7 @@ public class ExecutionBlockContext { } public TajoConf getConf() { - return manager.getTajoConf(); + return systemConf; } public FileSystem getLocalFS() { @@ -205,7 +205,7 @@ public class ExecutionBlockContext { } public LocalDirAllocator getLocalDirAllocator() { - return manager.getWorkerContext().getLocalDirAllocator(); + return workerContext.getLocalDirAllocator(); } public TajoQueryEngine getTQueryEngine() { @@ -267,8 +267,8 @@ public class ExecutionBlockContext { return histories.get(runner.getId()); } - public TajoWorker.WorkerContext getWorkerContext(){ - return manager.getWorkerContext(); + public TajoWorker.WorkerContext getWorkerContext() { + return workerContext; } protected ClientSocketChannelFactory getShuffleChannelFactory(){ http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index e9ad838..8f84a9d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -106,11 +106,20 @@ public class Task { TaskAttemptId taskId, final ExecutionBlockContext executionBlockContext, final TaskRequest request) throws IOException { + this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request); + } + + public Task(String taskRunnerId, + Path baseDir, + TaskAttemptId taskId, + TajoConf conf, + final ExecutionBlockContext executionBlockContext, + final TaskRequest request) throws IOException { this.taskRunnerId = taskRunnerId; this.request = request; this.taskId = taskId; - this.systemConf = executionBlockContext.getConf(); + this.systemConf = conf; this.queryContext = request.getQueryContext(systemConf); this.executionBlockContext = executionBlockContext; this.taskDir = StorageUtil.concatPath(baseDir, @@ -120,8 +129,11 @@ public class Task { request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir); this.context.setDataChannel(request.getDataChannel()); this.context.setEnforcer(request.getEnforcer()); + this.context.setState(TaskAttemptState.TA_PENDING); this.inputStats = new TableStats(); + } + public void initPlan() throws IOException { plan = LogicalNodeDeserializer.deserialize(queryContext, request.getPlan()); LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); if (scanNode != null) { @@ -157,8 +169,6 @@ public class Task { } this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>()); - - context.setState(TaskAttemptState.TA_PENDING); LOG.info("=================================="); LOG.info("* Stage " + request.getId() + " is initialized"); LOG.info("* InterQuery: " + interQuery @@ -180,6 +190,8 @@ public class Task { } public void init() throws IOException { + initPlan(); + if (context.getState() == TaskAttemptState.TA_PENDING) { // initialize a task temporal dir FileSystem localFS = executionBlockContext.getLocalFS(); @@ -384,22 +396,23 @@ public class Task { startTime = System.currentTimeMillis(); Throwable error = null; try { - context.setState(TaskAttemptState.TA_RUNNING); - - if (context.hasFetchPhase()) { - // If the fetch is still in progress, the query unit must wait for - // complete. - waitForFetch(); - context.setFetcherProgress(FETCHER_PROGRESS); - context.setProgressChanged(true); - updateProgress(); - } + if(!context.isStopped()) { + context.setState(TaskAttemptState.TA_RUNNING); + if (context.hasFetchPhase()) { + // If the fetch is still in progress, the query unit must wait for + // complete. + waitForFetch(); + context.setFetcherProgress(FETCHER_PROGRESS); + context.setProgressChanged(true); + updateProgress(); + } - this.executor = executionBlockContext.getTQueryEngine(). - createPlan(context, plan); - this.executor.init(); + this.executor = executionBlockContext.getTQueryEngine(). + createPlan(context, plan); + this.executor.init(); - while(!context.isStopped() && executor.next() != null) { + while(!context.isStopped() && executor.next() != null) { + } } } catch (Throwable e) { error = e ; http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 1f2c325..50cd20a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -57,7 +57,7 @@ public class TaskAttemptContext { private static final Log LOG = LogFactory.getLog(TaskAttemptContext.class); private final Map<String, List<FragmentProto>> fragmentMap = Maps.newHashMap(); - private TaskAttemptState state; + private volatile TaskAttemptState state; private TableStats resultStats; private TaskAttemptId queryId; private final Path workDir; http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index 4b10203..57ae566 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -169,7 +169,9 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< if(context == null){ try { - context = new ExecutionBlockContext(this, startEvent, startEvent.getQueryMaster()); + context = new ExecutionBlockContext(getTajoConf(), getWorkerContext(), this, startEvent.getQueryContext(), + startEvent.getPlan(), startEvent.getExecutionBlockId(), startEvent.getQueryMaster()); + context.init(); } catch (Throwable e) { LOG.fatal(e.getMessage(), e); throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index 0574bea..1edaa15 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -23,6 +23,7 @@ import org.apache.tajo.*; import org.apache.tajo.algebra.Expr; import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; @@ -30,6 +31,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.engine.query.TaskRequestImpl; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.master.event.QueryEvent; import org.apache.tajo.master.event.QueryEventType; @@ -38,13 +40,18 @@ import org.apache.tajo.master.event.StageEventType; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.session.Session; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.ExecutionBlockContext; +import org.apache.tajo.worker.Task; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import static org.junit.Assert.*; @@ -190,4 +197,31 @@ public class TestKillQuery { lastStage.getStateMachine().doTransition(StageEventType.SQ_FAILED, new StageEvent(lastStage.getId(), StageEventType.SQ_FAILED)); } + + @Test + public void testKillTask() throws Throwable { + QueryId qid = LocalTajoTestingUtility.newQueryId(); + ExecutionBlockId eid = QueryIdFactory.newExecutionBlockId(qid, 1); + TaskId tid = QueryIdFactory.newTaskId(eid); + TajoConf conf = new TajoConf(); + TaskRequestImpl taskRequest = new TaskRequestImpl(); + + taskRequest.set(null, new ArrayList<CatalogProtos.FragmentProto>(), + null, false, PlanProto.LogicalNodeTree.newBuilder().build(), new QueryContext(conf), null, null); + taskRequest.setInterQuery(); + TaskAttemptId attemptId = new TaskAttemptId(tid, 1); + + ExecutionBlockContext context = new ExecutionBlockContext(conf, null, null, new QueryContext(conf), null, eid, null); + + org.apache.tajo.worker.Task task = new Task("test", CommonTestingUtil.getTestDir(), attemptId, + conf, context, taskRequest); + task.kill(); + assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus()); + try { + task.run(); + assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus()); + } catch (Exception e) { + assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus()); + } + } }
