http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp index 5c3ce7b..88b7c24 100644 --- a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp +++ b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp @@ -20,43 +20,30 @@ <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ page import="org.apache.commons.lang.StringUtils" %> +<%@ page import="org.apache.tajo.ResourceProtos.FetcherHistoryProto" %> <%@ page import="org.apache.tajo.TaskAttemptId" %> -<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.TajoIdUtils" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> -<%@ page import="org.apache.tajo.worker.*" %> +<%@ page import="org.apache.tajo.worker.Fetcher" %> +<%@ page import="org.apache.tajo.worker.TajoWorker" %> +<%@ page import="org.apache.tajo.worker.Task" %> +<%@ page import="org.apache.tajo.worker.TaskHistory" %> <%@ page import="java.text.SimpleDateFormat" %> <%@ page import="java.util.List" %> <% TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); - String containerId = request.getParameter("containerId"); String quAttemptId = request.getParameter("taskAttemptId"); TaskAttemptId taskAttemptId = TajoIdUtils.parseTaskAttemptId(quAttemptId); - Task task = null; - TaskHistory taskHistory = null; - if(containerId == null || containerId.isEmpty() || "null".equals(containerId)) { - task = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskByTaskAttemptId(taskAttemptId); - if (task != null) { - taskHistory = task.createTaskHistory(); - } else { - taskHistory = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskHistoryByTaskAttemptId(taskAttemptId); - } + + TaskHistory taskHistory; + Task task = tajoWorker.getWorkerContext().getTaskManager().getTaskByTaskAttemptId(taskAttemptId); + if (task != null) { + taskHistory = task.createTaskHistory(); } else { - TaskRunner runner = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunner(containerId); - if(runner != null) { - task = runner.getContext().getTask(taskAttemptId); - if (task != null) { - taskHistory = task.createTaskHistory(); - } else { - TaskRunnerHistory history = tajoWorker.getWorkerContext().getTaskRunnerManager().getExcutionBlockHistoryByTaskRunnerId(containerId); - if(history != null) { - taskHistory = history.getTaskHistory(taskAttemptId); - } - } - } + taskHistory = tajoWorker.getWorkerContext().getTaskManager().getTaskHistory(taskAttemptId.getTaskId()); } SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); %> @@ -112,7 +99,7 @@ int index = 1; int pageSize = 1000; //TODO pagination - List<TajoWorkerProtocol.FetcherHistoryProto> fetcherHistories = taskHistory.getFetcherHistories(); + List<FetcherHistoryProto> fetcherHistories = taskHistory.getFetcherHistories(); if (fetcherHistories.size() > 0) { %> @@ -128,7 +115,7 @@ <th># Messages</th> </tr> <% - for (TajoWorkerProtocol.FetcherHistoryProto eachFetcher : fetcherHistories) { + for (FetcherHistoryProto eachFetcher : fetcherHistories) { %> <tr> <td><%=index%>
http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp b/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp index b7774e8..5dd52d3 100644 --- a/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp +++ b/tajo-core/src/main/resources/webapps/worker/taskhistory.jsp @@ -19,7 +19,7 @@ %> <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> -<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %> +<%@ page import="org.apache.tajo.ResourceProtos.FetcherHistoryProto" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="org.apache.tajo.worker.*" %> @@ -86,13 +86,13 @@ int index = 1; int pageSize = 1000; //TODO pagination - List<TajoWorkerProtocol.FetcherHistoryProto> fetcherHistories = taskHistory.getFetcherHistories(); + List<FetcherHistoryProto> fetcherHistories = taskHistory.getFetcherHistories(); if (fetcherHistories.size() > 0) { %> <table border="1" width="100%" class="border_table"> <tr><th>No</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th><th>File Length</th><th># Messages</th></tr> <% - for (TajoWorkerProtocol.FetcherHistoryProto eachFetcher : fetcherHistories) { + for (FetcherHistoryProto eachFetcher : fetcherHistories) { %> <tr> <td><%=index%></td> http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/worker/tasks.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/tasks.jsp b/tajo-core/src/main/resources/webapps/worker/tasks.jsp deleted file mode 100644 index ab873cd..0000000 --- a/tajo-core/src/main/resources/webapps/worker/tasks.jsp +++ /dev/null @@ -1,107 +0,0 @@ -<% - /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -%> -<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> - -<%@ page import="org.apache.tajo.TaskAttemptId" %> -<%@ page import="org.apache.tajo.util.JSPUtil" %> -<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> -<%@ page import="java.text.SimpleDateFormat" %> -<%@ page import="java.util.Map" %> -<%@ page import="org.apache.tajo.worker.*" %> - -<% - String containerId = request.getParameter("taskRunnerId"); - TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); - - TaskRunner taskRunner = tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunner(containerId); - org.apache.tajo.worker.TaskRunnerHistory history = tajoWorker.getWorkerContext().getTaskRunnerManager().getExcutionBlockHistoryByTaskRunnerId(containerId); - SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); -%> -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<head> - <link rel="stylesheet" type="text/css" href="/static/style.css"/> - <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> - <title>tajo worker</title> - <% - if (taskRunner == null && history == null) { - %> - <script type="text/javascript"> - alert("No Task Container for" + containerId); - document.history.back(); - </script> -</head> -</body> -</html> -<% - return; - } -%> -</head> -<body> -<%@ include file="header.jsp"%> -<div class='contents'> - <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2> - <hr/> - <h3>Tasks</h3> - <table width="100%" border="1" class="border_table"> - <tr><th>Id</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr> - <% - if (taskRunner != null) { - ExecutionBlockContext context = taskRunner.getContext(); - - for (Map.Entry<TaskAttemptId, Task> entry : context.getTasks().entrySet()) { - TaskAttemptId taskAttemptId = entry.getKey(); - TaskHistory eachTask = entry.getValue().createTaskHistory(); - %> - <tr> - <td> - <a href="taskdetail.jsp?containerId=<%=containerId%>&taskAttemptId=<%=taskAttemptId%>"><%=taskAttemptId%></a></td> - <td><%=df.format(eachTask.getStartTime())%></td> - <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td> - <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td> - <td><%=eachTask.getState()%></td> - </tr> - <% - } - } - - if (history != null) { - - - for (Map.Entry<TaskAttemptId, TaskHistory> entry : history.getTaskHistoryMap().entrySet()) { - TaskAttemptId taskAttemptId = entry.getKey(); - TaskHistory eachTask = entry.getValue(); - %> - <tr> - <td><a href="taskdetail.jsp?containerId=<%=containerId%>&taskAttemptId=<%=taskAttemptId%>"><%=taskAttemptId%></a></td> - <td><%=df.format(eachTask.getStartTime())%></td> - <td><%=eachTask.getFinishTime() == 0 ? "-" : df.format(eachTask.getFinishTime())%></td> - <td><%=JSPUtil.getElapsedTime(eachTask.getStartTime(), eachTask.getFinishTime())%></td> - <td><%=eachTask.getState()%></td> - </tr> - <% - } - } - %> - </table> -</div> -</body> -</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index a323f25..995d448 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -231,7 +231,7 @@ public class QueryTestCaseBase { /* protect a travis stalled build */ System.out.println("Run: " + name.getMethodName() + " Used memory: " + ((Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) - / (1024 * 1024)) + "MBytes"); + / (1024 * 1024)) + " MBytes, Active Threads:" + Thread.activeCount()); } public QueryTestCaseBase() { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 973f1e8..7be0cab 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -19,7 +19,6 @@ package org.apache.tajo; import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; import com.google.common.io.Closeables; import com.google.common.io.Files; import org.apache.commons.lang.StringUtils; @@ -40,7 +39,6 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider; import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.plan.rewrite.LogicalPlanTestRuleProvider; import org.apache.tajo.querymaster.Query; import org.apache.tajo.querymaster.QueryMasterTask; @@ -128,16 +126,10 @@ public class TajoTestingCluster { conf.setClassVar(ConfVars.LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, LogicalPlanTestRuleProvider.class); conf.setClassVar(ConfVars.GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, GlobalPlanTestRuleProvider.class); - - // default resource manager - if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) { - String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname); - Preconditions.checkState(testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName())); - conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname)); - } - conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 2048); - conf.setFloat(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.varname, 2.0f); - + conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES.varname, 4); + conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 2000); + conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM.varname, 3); + conf.setInt(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM.varname, 2); // Client API RPC conf.setIntVar(ConfVars.RPC_CLIENT_WORKER_THREAD_NUM, 2); @@ -145,6 +137,7 @@ public class TajoTestingCluster { //Client API service RPC Server conf.setIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); conf.setIntVar(ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(ConfVars.REST_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); // Internal RPC Client conf.setIntVar(ConfVars.INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM, 2); @@ -157,10 +150,6 @@ public class TajoTestingCluster { conf.setIntVar(ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2); conf.setIntVar(ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2); - // Resource allocator - conf.setIntVar(ConfVars.$QUERY_EXECUTE_PARALLEL_MAX, 3); - conf.setIntVar(ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM, 6); // make twice of parallel_max - // Memory cache termination conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1); @@ -512,7 +501,8 @@ public class TajoTestingCluster { startMiniDFSCluster(numDataNodes, clusterTestBuildDir, dataNodeHosts); this.dfsCluster.waitClusterUp(); - conf.setInt("hbase.hconnection.threads.core", 50); + conf.setInt("hbase.hconnection.threads.core", 5); + conf.setInt("hbase.hconnection.threads.max", 50); hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir); startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false); http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java index ace3d0d..d198326 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java @@ -50,7 +50,7 @@ import java.io.IOException; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; -import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java index dc4dd04..3c9177e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java @@ -50,7 +50,7 @@ import java.io.IOException; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; -import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; import static org.junit.Assert.*; public class TestFullOuterHashJoinExec { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java index 8fd61d0..6b32db0 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java @@ -52,7 +52,7 @@ import java.io.IOException; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; -import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; import static org.junit.Assert.*; public class TestFullOuterMergeJoinExec { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java index b9ee06a..f7bc1dd 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java @@ -51,7 +51,7 @@ import org.junit.Test; import java.io.IOException; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; -import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; import static org.junit.Assert.*; public class TestHashJoinExec { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java index c93a1b4..87bcd20 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java @@ -50,7 +50,7 @@ import java.io.IOException; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; -import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java index c4e7752..d4189bb 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java @@ -52,7 +52,7 @@ import org.junit.Test; import java.io.IOException; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; -import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index dff0cbe..ca4b6b7 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -69,8 +69,8 @@ import java.util.Set; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; -import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm; -import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce.SortAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.ColumnPartitionEnforcer.ColumnPartitionAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.SortEnforce.SortAlgorithm; import static org.junit.Assert.*; public class TestPhysicalPlanner { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java index d86b229..51f0e76 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java @@ -50,7 +50,7 @@ import java.io.IOException; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; -import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; import static org.junit.Assert.*; public class TestRightOuterMergeJoinExec { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java index da0f59d..d769c13 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinOnPartitionedTables.java @@ -46,6 +46,7 @@ import static org.junit.Assert.assertTrue; @Category(IntegrationTest.class) @RunWith(Parameterized.class) @NamedTest("TestJoinQuery") [email protected] public class TestJoinOnPartitionedTables extends TestJoinQuery { public TestJoinOnPartitionedTables(String joinOption) throws Exception { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java index 53dce3c..18eba41 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestOuterJoinQuery.java @@ -28,8 +28,6 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.sql.ResultSet; - @Category(IntegrationTest.class) @RunWith(Parameterized.class) @NamedTest("TestJoinQuery") http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java index e8d59d0..207f64d 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java @@ -18,49 +18,9 @@ package org.apache.tajo.master; -import static org.junit.Assert.*; -import static org.hamcrest.CoreMatchers.*; - -import java.io.File; -import java.sql.ResultSet; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.UUID; - -import org.apache.tajo.*; -import org.apache.tajo.algebra.Expr; -import org.apache.tajo.benchmark.TPCH; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.client.ResultSetUtil; -import org.apache.tajo.client.TajoClient; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.engine.parser.SQLAnalyzer; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.master.TajoMaster.MasterContext; -import org.apache.tajo.master.exec.NonForwardQueryResultScanner; -import org.apache.tajo.master.exec.NonForwardQueryResultSystemScanner; -import org.apache.tajo.plan.LogicalOptimizer; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.LogicalPlanner; -import org.apache.tajo.plan.logical.LimitNode; -import org.apache.tajo.plan.logical.NodeType; -import org.apache.tajo.storage.RowStoreUtil; -import org.apache.tajo.storage.StorageConstants; -import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.util.KeyValueSet; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeDiagnosingMatcher; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.apache.tajo.QueryTestCaseBase; import org.junit.Test; -import com.google.protobuf.ByteString; - public class TestNonForwardQueryResultSystemScanner extends QueryTestCaseBase { @Test public void testGetNextRowsForAggregateFunction() throws Exception { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java index 9910d79..3c378dd 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java @@ -20,29 +20,28 @@ package org.apache.tajo.master; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import io.netty.handler.codec.http.QueryStringDecoder; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.QueryId; +import org.apache.tajo.ResourceProtos.FetchProto; import org.apache.tajo.TestTajoIds; -import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.querymaster.Repartitioner; import org.apache.tajo.querymaster.Task; import org.apache.tajo.querymaster.Task.IntermediateEntry; -import org.apache.tajo.querymaster.Repartitioner; import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.FetchImpl; import org.junit.Test; -import io.netty.handler.codec.http.QueryStringDecoder; - import java.net.URI; import java.util.*; import static junit.framework.Assert.assertEquals; -import static org.apache.tajo.querymaster.Repartitioner.FetchGroupMeta; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.HASH_SHUFFLE; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE; +import static org.apache.tajo.querymaster.Repartitioner.FetchGroupMeta; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; @@ -76,7 +75,7 @@ public class TestRepartitioner { fetch.setName(sid.toString()); - TajoWorkerProtocol.FetchProto proto = fetch.getProto(); + FetchProto proto = fetch.getProto(); fetch = new FetchImpl(proto); assertEquals(proto, fetch.getProto()); http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java deleted file mode 100644 index 2c997a3..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java +++ /dev/null @@ -1,454 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.rm; - -import com.google.protobuf.RpcCallback; -import org.apache.tajo.QueryId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat; -import static org.junit.Assert.*; - -public class TestTajoResourceManager { - private final PrimitiveProtos.BoolProto BOOL_TRUE = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build(); - private final PrimitiveProtos.BoolProto BOOL_FALSE = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build(); - - TajoConf tajoConf; - - long queryIdTime = System.currentTimeMillis(); - int numWorkers = 5; - float workerDiskSlots = 5.0f; - int workerMemoryMB = 512 * 10; - WorkerResourceAllocationResponse response; - - private TajoWorkerResourceManager initResourceManager() throws Exception { - tajoConf = new org.apache.tajo.conf.TajoConf(); - - tajoConf.setFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT, 0.0f); - tajoConf.setIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB, 512); - tajoConf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, "localhost:0"); - TajoWorkerResourceManager tajoWorkerResourceManager = new TajoWorkerResourceManager(tajoConf); - tajoWorkerResourceManager.init(tajoConf); - tajoWorkerResourceManager.start(); - - for(int i = 0; i < numWorkers; i++) { - ServerStatusProto.System system = ServerStatusProto.System.newBuilder() - .setAvailableProcessors(1) - .setFreeMemoryMB(workerMemoryMB) - .setMaxMemoryMB(workerMemoryMB) - .setTotalMemoryMB(workerMemoryMB) - .build(); - - ServerStatusProto.JvmHeap jvmHeap = ServerStatusProto.JvmHeap.newBuilder() - .setFreeHeap(workerMemoryMB) - .setMaxHeap(workerMemoryMB) - .setTotalHeap(workerMemoryMB) - .build(); - - ServerStatusProto.Disk disk = ServerStatusProto.Disk.newBuilder() - .setAbsolutePath("/") - .setFreeSpace(0) - .setTotalSpace(0) - .setUsableSpace(0) - .build(); - - List<ServerStatusProto.Disk> disks = new ArrayList<ServerStatusProto.Disk>(); - - disks.add(disk); - - ServerStatusProto serverStatus = ServerStatusProto.newBuilder() - .setDiskSlots(workerDiskSlots) - .setMemoryResourceMB(workerMemoryMB) - .setJvmHeap(jvmHeap) - .setSystem(system) - .addAllDisk(disks) - .setRunningTaskNum(0) - .build(); - - WorkerConnectionInfo connectionInfo = - new WorkerConnectionInfo("host" + (i + 1), 28091, 28092, 21000 + i, 28093, 28080); - NodeHeartbeat tajoHeartbeat = NodeHeartbeat.newBuilder() - .setConnectionInfo(connectionInfo.getProto()) - .setServerStatus(serverStatus) - .build(); - - tajoWorkerResourceManager.getResourceTracker().heartbeat(null, tajoHeartbeat, NullCallback.get()); - } - - return tajoWorkerResourceManager; - } - - - @Test - public void testHeartbeat() throws Exception { - TajoWorkerResourceManager tajoWorkerResourceManager = null; - try { - tajoWorkerResourceManager = initResourceManager(); - assertEquals(numWorkers, tajoWorkerResourceManager.getWorkers().size()); - for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { - WorkerResource resource = worker.getResource(); - assertEquals(workerMemoryMB, resource.getAvailableMemoryMB()); - assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0); - } - } finally { - if (tajoWorkerResourceManager != null) { - tajoWorkerResourceManager.stop(); - } - } - } - - @Test - public void testMemoryResource() throws Exception { - TajoWorkerResourceManager tajoWorkerResourceManager = null; - try { - tajoWorkerResourceManager = initResourceManager(); - - final int minMemory = 256; - final int maxMemory = 512; - float diskSlots = 1.0f; - - QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 1); - - WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder() - .setResourceRequestPriority(ResourceRequestPriority.MEMORY) - .setNumContainers(60) - .setQueryId(queryId.getProto()) - .setMaxDiskSlotPerContainer(diskSlots) - .setMinDiskSlotPerContainer(diskSlots) - .setMinMemoryMBPerContainer(minMemory) - .setMaxMemoryMBPerContainer(maxMemory) - .build(); - - final CountDownLatch barrier = new CountDownLatch(1); - final List<ContainerProtocol.TajoContainerIdProto> containerIds = new - ArrayList<ContainerProtocol.TajoContainerIdProto>(); - - RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() { - - @Override - public void run(WorkerResourceAllocationResponse response) { - TestTajoResourceManager.this.response = response; - barrier.countDown(); - } - }; - - tajoWorkerResourceManager.allocateWorkerResources(request, callBack); - assertTrue(barrier.await(3, TimeUnit.SECONDS)); - - - // assert after callback - int totalUsedMemory = 0; - int totalUsedDisks = 0; - - for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { - WorkerResource resource = worker.getResource(); - assertEquals(0, resource.getAvailableMemoryMB()); - assertEquals(0, resource.getAvailableDiskSlots(), 0); - assertEquals(5.0f, resource.getUsedDiskSlots(), 0); - - totalUsedMemory += resource.getUsedMemoryMB(); - totalUsedDisks += resource.getUsedDiskSlots(); - } - - assertEquals(workerMemoryMB * numWorkers, totalUsedMemory); - assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0); - - assertEquals(numWorkers * 10, response.getWorkerAllocatedResourceList().size()); - - for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) { - assertTrue( - eachResource.getAllocatedMemoryMB() >= minMemory && eachResource.getAllocatedMemoryMB() <= maxMemory); - containerIds.add(eachResource.getContainerId()); - } - - for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) { - tajoWorkerResourceManager.releaseWorkerResource(eachContainerId); - } - - for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { - WorkerResource resource = worker.getResource(); - assertEquals(workerMemoryMB, resource.getAvailableMemoryMB()); - assertEquals(0, resource.getUsedMemoryMB()); - - assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0); - assertEquals(0.0f, resource.getUsedDiskSlots(), 0); - } - } finally { - if (tajoWorkerResourceManager != null) { - tajoWorkerResourceManager.stop(); - } - } - } - - @Test - public void testMemoryNotCommensurable() throws Exception { - TajoWorkerResourceManager tajoWorkerResourceManager = null; - - try { - tajoWorkerResourceManager = initResourceManager(); - - final int minMemory = 200; - final int maxMemory = 500; - float diskSlots = 1.0f; - - QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 2); - - int requiredContainers = 60; - - int numAllocatedContainers = 0; - - int loopCount = 0; - while(true) { - WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder() - .setResourceRequestPriority(ResourceRequestPriority.MEMORY) - .setNumContainers(requiredContainers - numAllocatedContainers) - .setQueryId(queryId.getProto()) - .setMaxDiskSlotPerContainer(diskSlots) - .setMinDiskSlotPerContainer(diskSlots) - .setMinMemoryMBPerContainer(minMemory) - .setMaxMemoryMBPerContainer(maxMemory) - .build(); - - final CountDownLatch barrier = new CountDownLatch(1); - - RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() { - @Override - public void run(WorkerResourceAllocationResponse response) { - TestTajoResourceManager.this.response = response; - barrier.countDown(); - } - }; - - tajoWorkerResourceManager.allocateWorkerResources(request, callBack); - - assertTrue(barrier.await(3, TimeUnit.SECONDS)); - - numAllocatedContainers += TestTajoResourceManager.this.response.getWorkerAllocatedResourceList().size(); - - //release resource - for(WorkerAllocatedResource eachResource: - TestTajoResourceManager.this.response.getWorkerAllocatedResourceList()) { - assertTrue( - eachResource.getAllocatedMemoryMB() >= minMemory && eachResource.getAllocatedMemoryMB() <= maxMemory); - tajoWorkerResourceManager.releaseWorkerResource(eachResource.getContainerId()); - } - - for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { - WorkerResource resource = worker.getResource(); - assertEquals(0, resource.getUsedMemoryMB()); - assertEquals(workerMemoryMB, resource.getAvailableMemoryMB()); - - assertEquals(0.0f, resource.getUsedDiskSlots(), 0); - assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0); - } - - loopCount++; - - if(loopCount == 2) { - assertEquals(requiredContainers, numAllocatedContainers); - break; - } - } - - for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { - WorkerResource resource = worker.getResource(); - assertEquals(0, resource.getUsedMemoryMB()); - assertEquals(workerMemoryMB, resource.getAvailableMemoryMB()); - - assertEquals(0.0f, resource.getUsedDiskSlots(), 0); - assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0); - } - } finally { - if (tajoWorkerResourceManager != null) { - tajoWorkerResourceManager.stop(); - } - } - } - - @Test - public void testDiskResource() throws Exception { - TajoWorkerResourceManager tajoWorkerResourceManager = null; - - try { - tajoWorkerResourceManager = initResourceManager(); - - final float minDiskSlots = 1.0f; - final float maxDiskSlots = 2.0f; - int memoryMB = 256; - - QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 3); - - WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder() - .setResourceRequestPriority(ResourceRequestPriority.DISK) - .setNumContainers(60) - .setQueryId(queryId.getProto()) - .setMaxDiskSlotPerContainer(maxDiskSlots) - .setMinDiskSlotPerContainer(minDiskSlots) - .setMinMemoryMBPerContainer(memoryMB) - .setMaxMemoryMBPerContainer(memoryMB) - .build(); - - final CountDownLatch barrier = new CountDownLatch(1); - final List<ContainerProtocol.TajoContainerIdProto> containerIds = new - ArrayList<ContainerProtocol.TajoContainerIdProto>(); - - - RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() { - - @Override - public void run(WorkerResourceAllocationResponse response) { - TestTajoResourceManager.this.response = response; - barrier.countDown(); - } - }; - - tajoWorkerResourceManager.allocateWorkerResources(request, callBack); - assertTrue(barrier.await(3, TimeUnit.SECONDS)); - - for(WorkerAllocatedResource eachResource: response.getWorkerAllocatedResourceList()) { - assertTrue("AllocatedDiskSlot:" + eachResource.getAllocatedDiskSlots(), - eachResource.getAllocatedDiskSlots() >= minDiskSlots && - eachResource.getAllocatedDiskSlots() <= maxDiskSlots); - containerIds.add(eachResource.getContainerId()); - } - - // assert after callback - int totalUsedDisks = 0; - for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { - WorkerResource resource = worker.getResource(); - //each worker allocated 3 container (2 disk slot = 2, 1 disk slot = 1) - assertEquals(0, resource.getAvailableDiskSlots(), 0); - assertEquals(5.0f, resource.getUsedDiskSlots(), 0); - assertEquals(256 * 3, resource.getUsedMemoryMB()); - - totalUsedDisks += resource.getUsedDiskSlots(); - } - - assertEquals(workerDiskSlots * numWorkers, totalUsedDisks, 0); - - assertEquals(numWorkers * 3, response.getWorkerAllocatedResourceList().size()); - - for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) { - tajoWorkerResourceManager.releaseWorkerResource(eachContainerId); - } - - for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { - WorkerResource resource = worker.getResource(); - assertEquals(workerMemoryMB, resource.getAvailableMemoryMB()); - assertEquals(0, resource.getUsedMemoryMB()); - - assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0); - assertEquals(0.0f, resource.getUsedDiskSlots(), 0); - } - } finally { - if (tajoWorkerResourceManager != null) { - tajoWorkerResourceManager.stop(); - } - } - } - - @Test - public void testDiskResourceWithStoppedQuery() throws Exception { - TajoWorkerResourceManager tajoWorkerResourceManager = null; - - try { - tajoWorkerResourceManager = initResourceManager(); - - final float minDiskSlots = 1.0f; - final float maxDiskSlots = 2.0f; - int memoryMB = 256; - - QueryId queryId = QueryIdFactory.newQueryId(queryIdTime, 3); - - WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder() - .setResourceRequestPriority(ResourceRequestPriority.DISK) - .setNumContainers(60) - .setQueryId(queryId.getProto()) - .setMaxDiskSlotPerContainer(maxDiskSlots) - .setMinDiskSlotPerContainer(minDiskSlots) - .setMinMemoryMBPerContainer(memoryMB) - .setMaxMemoryMBPerContainer(memoryMB) - .build(); - - final CountDownLatch barrier = new CountDownLatch(1); - final List<ContainerProtocol.TajoContainerIdProto> containerIds = new - ArrayList<ContainerProtocol.TajoContainerIdProto>(); - - - RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() { - - @Override - public void run(WorkerResourceAllocationResponse response) { - TestTajoResourceManager.this.response = response; - barrier.countDown(); - } - }; - - tajoWorkerResourceManager.getRMContext().getStoppedQueryIds().add(queryId); - tajoWorkerResourceManager.allocateWorkerResources(request, callBack); - assertFalse(barrier.await(3, TimeUnit.SECONDS)); - - assertNull(response); - - // assert after callback - int totalUsedDisks = 0; - for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { - WorkerResource resource = worker.getResource(); - //each worker allocated 3 container (2 disk slot = 2, 1 disk slot = 1) - assertEquals(5.0f, resource.getAvailableDiskSlots(), 0); - assertEquals(0, resource.getUsedDiskSlots(), 0); - assertEquals(0, resource.getUsedMemoryMB()); - - totalUsedDisks += resource.getUsedDiskSlots(); - } - - assertEquals(0, totalUsedDisks, 0); - - for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) { - tajoWorkerResourceManager.releaseWorkerResource(eachContainerId); - } - - for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { - WorkerResource resource = worker.getResource(); - assertEquals(workerMemoryMB, resource.getAvailableMemoryMB()); - assertEquals(0, resource.getUsedMemoryMB()); - - assertEquals(workerDiskSlots, resource.getAvailableDiskSlots(), 0); - assertEquals(0.0f, resource.getUsedDiskSlots(), 0); - } - } finally { - if (tajoWorkerResourceManager != null) { - tajoWorkerResourceManager.stop(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java deleted file mode 100644 index 874461f..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.scheduler; - -import org.apache.tajo.QueryId; -import org.apache.tajo.TajoProtos; -import org.apache.tajo.TajoTestingCluster; -import org.apache.tajo.benchmark.TPCH; -import org.apache.tajo.client.TajoClient; -import org.apache.tajo.client.TajoClientUtil; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.ClientProtos; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.File; -import java.sql.ResultSet; - -import static org.junit.Assert.*; - -public class TestFifoScheduler { - private static TajoTestingCluster cluster; - private static TajoConf conf; - private static TajoClient client; - private static String query = - "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; - - @BeforeClass - public static void setUp() throws Exception { - cluster = new TajoTestingCluster(); - cluster.startMiniClusterInLocal(1); - conf = cluster.getConfiguration(); - client = cluster.newTajoClient(); - File file = TPCH.getDataFile("lineitem"); - client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " - + "using text location 'file://" + file.getAbsolutePath() + "'"); - assertTrue(client.existTable("default.lineitem")); - } - - @AfterClass - public static void tearDown() throws Exception { - if (client != null) client.close(); - if (cluster != null) cluster.shutdownMiniCluster(); - } - - @Test - public final void testKillScheduledQuery() throws Exception { - ClientProtos.SubmitQueryResponse res = client.executeQuery(query); - ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query); - QueryId queryId = new QueryId(res.getQueryId()); - QueryId queryId2 = new QueryId(res2.getQueryId()); - - cluster.waitForQuerySubmitted(queryId); - client.killQuery(queryId2); - assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId2).getState()); - } - - @Test - public final void testForwardedQuery() throws Exception { - ClientProtos.SubmitQueryResponse res = client.executeQuery(query); - ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select * from lineitem limit 1"); - assertTrue(res.getIsForwarded()); - assertFalse(res2.getIsForwarded()); - - QueryId queryId = new QueryId(res.getQueryId()); - QueryId queryId2 = new QueryId(res2.getQueryId()); - cluster.waitForQuerySubmitted(queryId); - - assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState()); - ResultSet resSet = TajoClientUtil.createResultSet(client, res2, 1); - assertNotNull(resSet); - } - - @Test - public final void testScheduledQuery() throws Exception { - ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem"); - ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query); - ClientProtos.SubmitQueryResponse res3 = client.executeQuery(query); - ClientProtos.SubmitQueryResponse res4 = client.executeQuery(query); - - QueryId queryId = new QueryId(res.getQueryId()); - QueryId queryId2 = new QueryId(res2.getQueryId()); - QueryId queryId3 = new QueryId(res3.getQueryId()); - QueryId queryId4 = new QueryId(res4.getQueryId()); - - cluster.waitForQuerySubmitted(queryId); - - assertFalse(TajoClientUtil.isQueryComplete(client.getQueryStatus(queryId).getState())); - - assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState()); - assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState()); - assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId4).getState()); - - client.killQuery(queryId4); - client.killQuery(queryId3); - client.killQuery(queryId2); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java new file mode 100644 index 0000000..36fd939 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestSimpleScheduler.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import net.jcip.annotations.NotThreadSafe; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.rm.*; +import org.apache.tajo.master.scheduler.event.ResourceReserveSchedulerEvent; +import org.apache.tajo.master.scheduler.event.SchedulerEvent; +import org.apache.tajo.master.scheduler.event.SchedulerEventType; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.rpc.CallFuture; +import org.junit.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +import static org.apache.tajo.ResourceProtos.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@NotThreadSafe +public class TestSimpleScheduler { + private CompositeService service; + private SimpleScheduler scheduler; + private TajoRMContext rmContext; + private AsyncDispatcher dispatcher; + private TajoConf conf; + private int workerNum = 3; + private NodeResource nodeResource; + private NodeResource totalResource; + private Semaphore barrier; + private int testDelay = 50; + private static ScheduledExecutorService executorService; + + @BeforeClass + public static void setupClass() { + executorService = Executors.newScheduledThreadPool(10); + } + + @AfterClass + public static void tearDownClass() { + executorService.shutdown(); + } + + @Before + public void setup() { + conf = new TajoConf(); + nodeResource = NodeResource.createResource(1500, 2, 3); + service = new CompositeService(TestSimpleScheduler.class.getSimpleName()) { + + @Override + protected void serviceInit(Configuration conf) throws Exception { + dispatcher = new AsyncDispatcher(); + addService(dispatcher); + + rmContext = new TajoRMContext(dispatcher); + rmContext.getDispatcher().register(NodeEventType.class, + new TajoResourceManager.WorkerEventDispatcher(rmContext)); + + barrier = new Semaphore(0); + scheduler = new MySimpleScheduler(rmContext, barrier); + addService(scheduler); + rmContext.getDispatcher().register(SchedulerEventType.class, scheduler); + + for (int i = 0; i < workerNum; i++) { + WorkerConnectionInfo conn = new WorkerConnectionInfo("host" + i, 28091 + i, 28092, 21000, 28093, 28080); + rmContext.getNodes().putIfAbsent(conn.getId(), + new NodeStatus(rmContext, NodeResources.clone(nodeResource), conn)); + rmContext.getDispatcher().getEventHandler().handle(new NodeEvent(conn.getId(), NodeEventType.STARTED)); + } + super.serviceInit(conf); + } + }; + service.init(conf); + service.start(); + + assertEquals(workerNum, rmContext.getNodes().size()); + totalResource = NodeResources.createResource(0); + for(NodeStatus nodeStatus : rmContext.getNodes().values()) { + NodeResources.addTo(totalResource, nodeStatus.getTotalResourceCapability()); + } + } + + @After + public void tearDown() { + service.stop(); + } + + @Test + public void testInitialCapacity() throws InterruptedException { + assertEquals(workerNum, scheduler.getNumClusterNodes()); + assertEquals(0, scheduler.getRunningQuery()); + + assertEquals(totalResource, scheduler.getMaximumResourceCapability()); + assertEquals(totalResource, scheduler.getClusterResource()); + + assertEquals(TajoConf.ConfVars.QUERYMASTER_MINIMUM_MEMORY.defaultIntVal, + scheduler.getQMMinimumResourceCapability().getMemory()); + + assertEquals(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY.defaultIntVal, + scheduler.getMinimumResourceCapability().getMemory()); + } + + @Test(timeout = 10000) + public void testSubmitOneQuery() throws InterruptedException { + QuerySchedulingInfo schedulingInfo = new QuerySchedulingInfo("default", + "user", + QueryIdFactory.newQueryId(System.nanoTime(), 0), + 1, + System.currentTimeMillis()); + + assertEquals(0, scheduler.getRunningQuery()); + + scheduler.submitQuery(schedulingInfo); + barrier.acquire(); + assertEquals(1, scheduler.getRunningQuery()); + + assertEquals(totalResource, scheduler.getMaximumResourceCapability()); + assertEquals(totalResource, + NodeResources.add(scheduler.getQMMinimumResourceCapability(), scheduler.getClusterResource())); + } + + @Test(timeout = 10000) + public void testMaximumSubmitQuery() throws InterruptedException { + assertEquals(0, scheduler.getRunningQuery()); + int maximumParallelQuery = scheduler.getResourceCalculator().computeAvailableContainers( + scheduler.getMaximumResourceCapability(), scheduler.getQMMinimumResourceCapability()); + + int testParallelNum = 10; + for (int i = 0; i < testParallelNum; i++) { + QuerySchedulingInfo schedulingInfo = new QuerySchedulingInfo("default", + "user", + QueryIdFactory.newQueryId(System.nanoTime(), 0), + 1, + System.currentTimeMillis()); + scheduler.submitQuery(schedulingInfo); + } + + barrier.acquire(); + // allow 50% parallel running + assertEquals(Math.floor(maximumParallelQuery * 0.5f), (double) scheduler.getRunningQuery(), 1.0f); + assertEquals(testParallelNum, scheduler.getRunningQuery() + scheduler.getQueryQueue().size()); + } + + @Test(timeout = 10000) + public void testReserveResource() throws InterruptedException, ExecutionException { + int requestNum = 3; + assertEquals(totalResource, scheduler.getMaximumResourceCapability()); + assertEquals(totalResource, scheduler.getClusterResource()); + + QueryId queryId = QueryIdFactory.newQueryId(System.nanoTime(), 0); + CallFuture<NodeResourceResponse> callBack = new CallFuture<NodeResourceResponse>(); + rmContext.getDispatcher().getEventHandler().handle(new ResourceReserveSchedulerEvent( + createResourceRequest(queryId, requestNum, new ArrayList<Integer>()), callBack)); + + NodeResourceResponse responseProto = callBack.get(); + assertEquals(queryId, new QueryId(responseProto.getQueryId())); + assertEquals(requestNum, responseProto.getResourceCount()); + + NodeResource allocations = NodeResources.createResource(0); + for (AllocationResourceProto resourceProto : responseProto.getResourceList()) { + NodeResources.addTo(allocations, new NodeResource(resourceProto.getResource())); + } + + assertEquals(NodeResources.subtract(totalResource, allocations), scheduler.getClusterResource()); + } + + @Test(timeout = 10000) + public void testReserveResourceWithWorkerPriority() throws InterruptedException, ExecutionException { + int requestNum = 2; + assertEquals(totalResource, scheduler.getMaximumResourceCapability()); + assertEquals(totalResource, scheduler.getClusterResource()); + + List<Integer> targetWorkers = Lists.newArrayList(); + Map.Entry<Integer, NodeStatus> workerEntry = rmContext.getNodes().entrySet().iterator().next(); + targetWorkers.add(workerEntry.getKey()); + + NodeResource expectResource = NodeResources.multiply(scheduler.getMinimumResourceCapability(), requestNum); + assertTrue(NodeResources.fitsIn(expectResource, workerEntry.getValue().getAvailableResource())); + + QueryId queryId = QueryIdFactory.newQueryId(System.nanoTime(), 0); + NodeResourceRequest requestProto = createResourceRequest(queryId, requestNum, targetWorkers); + CallFuture<NodeResourceResponse> callBack = new CallFuture<NodeResourceResponse>(); + rmContext.getDispatcher().getEventHandler().handle(new ResourceReserveSchedulerEvent( + requestProto, callBack)); + + NodeResourceResponse responseProto = callBack.get(); + assertEquals(queryId, new QueryId(responseProto.getQueryId())); + assertEquals(requestNum, responseProto.getResourceCount()); + + for (AllocationResourceProto resourceProto : responseProto.getResourceList()) { + assertEquals(workerEntry.getKey().intValue(), resourceProto.getWorkerId()); + } + } + + private NodeResourceRequest + createResourceRequest(QueryId queryId, int containerNum, List<Integer> candidateWorkers) { + NodeResourceRequest.Builder request = + NodeResourceRequest.newBuilder(); + request.setCapacity(scheduler.getMinimumResourceCapability().getProto()) + .setNumContainers(containerNum) + .setPriority(1) + .setQueryId(queryId.getProto()) + .setType(ResourceType.LEAF) + .setUserId("test user") + .setRunningTasks(0) + .addAllCandidateNodes(candidateWorkers) + .setQueue("default"); + return request.build(); + } + + class MySimpleScheduler extends SimpleScheduler { + Semaphore barrier; + Map<QueryId, QueryInfo> queryInfoMap = Maps.newHashMap(); + Map<QueryId, AllocationResourceProto> qmAllocationMap = Maps.newHashMap(); + + public MySimpleScheduler(TajoRMContext rmContext, Semaphore barrier) { + super(null, rmContext); + this.barrier = barrier; + } + + @Override + public void submitQuery(QuerySchedulingInfo schedulingInfo) { + queryInfoMap.put(schedulingInfo.getQueryId(), new QueryInfo(schedulingInfo.getQueryId()) { + QueryContext context; + @Override + public QueryContext getQueryContext() { + if(context == null) { + context = new QueryContext(conf); + context.setUser("user"); + } + return context; + } + }); + super.submitQuery(schedulingInfo); + } + + @Override + protected boolean startQuery(final QueryId queryId, final AllocationResourceProto allocation) { + executorService.schedule(new Runnable() { + @Override + public void run() { + barrier.release(); + qmAllocationMap.put(queryId, allocation); + rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE)); + } + }, testDelay, TimeUnit.MILLISECONDS); + return true; + } + + @Override + public void handle(SchedulerEvent event) { + super.handle(event); + barrier.release(); + } + + @Override + protected QueryInfo getQueryInfo(QueryId queryId) { + return queryInfoMap.get(queryId); + } + + @Override + public void stopQuery(QueryId queryId) { + queryInfoMap.remove(queryId); + AllocationResourceProto allocationResourceProto = qmAllocationMap.remove(queryId); + NodeResources.addTo(rmContext.getNodes().get(allocationResourceProto.getWorkerId()).getAvailableResource(), + new NodeResource(allocationResourceProto.getResource())); + super.stopQuery(queryId); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index 1351716..3e8e230 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -19,10 +19,11 @@ package org.apache.tajo.querymaster; import com.google.common.collect.Lists; -import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.tajo.*; +import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse; import org.apache.tajo.algebra.Expr; import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.catalog.CatalogService; @@ -34,26 +35,19 @@ import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.TaskRequestImpl; -import org.apache.tajo.ipc.QueryCoordinatorProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.event.*; +import org.apache.tajo.master.event.QueryEvent; +import org.apache.tajo.master.event.QueryEventType; +import org.apache.tajo.master.event.StageEvent; +import org.apache.tajo.master.event.StageEventType; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.serder.PlanProto; -import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.resource.NodeResources; import org.apache.tajo.session.Session; -import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.TablespaceManager; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.history.HistoryReader; -import org.apache.tajo.util.history.HistoryWriter; -import org.apache.tajo.util.metrics.TajoSystemMetrics; -import org.apache.tajo.worker.ExecutionBlockContext; -import org.apache.tajo.worker.LegacyTaskImpl; -import org.apache.tajo.worker.TajoWorker; -import org.apache.tajo.worker.TaskRunnerManager; +import org.apache.tajo.worker.*; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -123,7 +117,7 @@ public class TestKillQuery { QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster(); QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(), - queryId, session, defaultContext, expr.toJson(), dispatch); + queryId, session, defaultContext, expr.toJson(), NodeResources.createResource(512), dispatch); queryMasterTask.init(conf); queryMasterTask.getQueryTaskContext().getDispatcher().start(); @@ -187,7 +181,7 @@ public class TestKillQuery { QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster(); QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(), - queryId, session, defaultContext, expr.toJson(), dispatch); + queryId, session, defaultContext, expr.toJson(), NodeResources.createResource(512), dispatch); queryMasterTask.init(conf); queryMasterTask.getQueryTaskContext().getDispatcher().start(); @@ -223,9 +217,6 @@ public class TestKillQuery { lastStage.getStateMachine().doTransition(StageEventType.SQ_KILL, new StageEvent(lastStage.getId(), StageEventType.SQ_KILL)); - lastStage.getStateMachine().doTransition(StageEventType.SQ_CONTAINER_ALLOCATED, - new StageEvent(lastStage.getId(), StageEventType.SQ_CONTAINER_ALLOCATED)); - lastStage.getStateMachine().doTransition(StageEventType.SQ_SHUFFLE_REPORT, new StageEvent(lastStage.getId(), StageEventType.SQ_SHUFFLE_REPORT)); @@ -243,22 +234,21 @@ public class TestKillQuery { TaskId tid = QueryIdFactory.newTaskId(eid); final TajoConf conf = new TajoConf(); TaskRequestImpl taskRequest = new TaskRequestImpl(); + WorkerConnectionInfo queryMaster = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); - taskRequest.set(null, new ArrayList<CatalogProtos.FragmentProto>(), - null, false, PlanProto.LogicalNodeTree.newBuilder().build(), new QueryContext(conf), null, null); - taskRequest.setInterQuery(); TaskAttemptId attemptId = new TaskAttemptId(tid, 1); + taskRequest.set(attemptId, new ArrayList<CatalogProtos.FragmentProto>(), + null, false, PlanProto.LogicalNodeTree.newBuilder().build(), new QueryContext(conf), + null, null, queryMaster.getHostAndQMPort()); + taskRequest.setInterQuery(); - WorkerConnectionInfo queryMaster = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); - TajoWorkerProtocol.RunExecutionBlockRequestProto.Builder - requestProto = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder(); - requestProto.setExecutionBlockId(eid.getProto()) - .setQueryMaster(queryMaster.getProto()) - .setNodeId(queryMaster.getHost()+":" + queryMaster.getQueryMasterPort()) - .setContainerId("test") - .setQueryContext(new QueryContext(conf).getProto()) + ExecutionBlockContextResponse.Builder requestProtoBuilder = + ExecutionBlockContextResponse.newBuilder(); + requestProtoBuilder.setExecutionBlockId(eid.getProto()) .setPlanJson("test") + .setQueryContext(new QueryContext(conf).getProto()) + .setQueryOutputPath("testpath") .setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE); TajoWorker.WorkerContext workerContext = new MockWorkerContext() { @@ -266,13 +256,31 @@ public class TestKillQuery { public TajoConf getConf() { return conf; } + + @Override + public TaskManager getTaskManager() { + return null; + } + + @Override + public TaskExecutor getTaskExecuor() { + return null; + } + + @Override + public NodeResourceManager getNodeResourceManager() { + return null; + } }; - ExecutionBlockContext context = - new ExecutionBlockContext(workerContext, null, requestProto.build()); + ExecutionBlockContext context = new MockExecutionBlock(workerContext, requestProtoBuilder.build()) { + @Override + public Path createBaseDir() throws IOException { + return new Path("test"); + } + }; - org.apache.tajo.worker.Task task = new LegacyTaskImpl("test", CommonTestingUtil.getTestDir(), attemptId, - conf, context, taskRequest); + org.apache.tajo.worker.Task task = new TaskImpl(taskRequest, context, null); task.kill(); assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState()); try { @@ -301,94 +309,4 @@ public class TestKillQuery { super.dispatch(event); } } - - abstract class MockWorkerContext implements TajoWorker.WorkerContext { - - @Override - public QueryMaster getQueryMaster() { - return null; - } - - public abstract TajoConf getConf(); - - @Override - public ServiceTracker getServiceTracker() { - return null; - } - - @Override - public QueryMasterManagerService getQueryMasterManagerService() { - return null; - } - - @Override - public TaskRunnerManager getTaskRunnerManager() { - return null; - } - - @Override - public CatalogService getCatalog() { - return null; - } - - @Override - public WorkerConnectionInfo getConnectionInfo() { - return null; - } - - @Override - public String getWorkerName() { - return null; - } - - @Override - public LocalDirAllocator getLocalDirAllocator() { - return null; - } - - @Override - public QueryCoordinatorProtocol.ClusterResourceSummary getClusterResource() { - return null; - } - - @Override - public TajoSystemMetrics getWorkerSystemMetrics() { - return null; - } - - @Override - public HashShuffleAppenderManager getHashShuffleAppenderManager() { - return null; - } - - @Override - public HistoryWriter getTaskHistoryWriter() { - return null; - } - - @Override - public HistoryReader getHistoryReader() { - return null; - } - - @Override - public void cleanup(String strPath) { - - } - - @Override - public void cleanupTemporalDirectories() { - - } - - @Override - public void setClusterResource(QueryCoordinatorProtocol.ClusterResourceSummary clusterResource) { - - } - - @Override - public void setNumClusterNodes(int numClusterNodes) { - - } - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java index a822e42..d2241b4 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestQueryState.java @@ -18,28 +18,36 @@ package org.apache.tajo.querymaster; +import net.jcip.annotations.NotThreadSafe; import org.apache.tajo.*; +import org.apache.tajo.client.QueryStatus; import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientUtil; import org.apache.tajo.ipc.ClientProtos; -import org.apache.tajo.master.QueryManager; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.*; @Category(IntegrationTest.class) +@NotThreadSafe public class TestQueryState { private static TajoTestingCluster cluster; private static TajoClient client; @BeforeClass - public static void setUp() throws Exception { + public static void setUpClass() throws Exception { cluster = TpchTestBase.getInstance().getTestingCluster(); client = cluster.newTajoClient(); } + @AfterClass + public static void tearDownClass() { + client.close(); + } + @Test(timeout = 10000) public void testSucceededState() throws Exception { String queryStr = "select l_orderkey from lineitem group by l_orderkey order by l_orderkey"; @@ -61,16 +69,21 @@ public class TestQueryState { ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr); QueryId queryId = new QueryId(res.getQueryId()); - cluster.waitForQuerySubmitted(queryId); + + QueryStatus queryState = client.getQueryStatus(queryId); + while (!TajoClientUtil.isQueryComplete(queryState.getState())) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + fail("Query state : " + queryState); + } + queryState = client.getQueryStatus(queryId); + } QueryMasterTask qmt = cluster.getQueryMasterTask(queryId); Query query = qmt.getQuery(); - // wait for query complete - cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_SUCCEEDED, 100); - assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, qmt.getState()); - assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getSynchronizedState()); assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, query.getState()); @@ -80,13 +93,6 @@ public class TestQueryState { assertEquals(StageState.SUCCEEDED, stage.getState()); } - /* wait for heartbeat from QueryMaster */ - QueryManager queryManager = cluster.getMaster().getContext().getQueryJobManager(); - for (; ; ) { - if (queryManager.getFinishedQuery(queryId) != null) break; - else Thread.sleep(100); - } - /* get status from TajoMaster */ assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId).getState()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java b/tajo-core/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java index 51622b5..0ce2646 100644 --- a/tajo-core/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java +++ b/tajo-core/src/test/java/org/apache/tajo/util/metrics/TestSystemMetrics.java @@ -124,6 +124,7 @@ public class TestSystemMetrics { } assertEquals(2, lines.size()); + tajoSystemMetrics.stop(); } @After http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java index 9d4e1f3..7d7fb1a 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java @@ -18,16 +18,16 @@ package org.apache.tajo.worker; +import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse; import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.ipc.TajoWorkerProtocol; import java.io.IOException; public class MockExecutionBlock extends ExecutionBlockContext { public MockExecutionBlock(TajoWorker.WorkerContext workerContext, - TajoWorkerProtocol.RunExecutionBlockRequestProto request) throws IOException { - super(workerContext, null, request); + ExecutionBlockContextResponse request) throws IOException { + super(workerContext, request, null); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java index 18b9405..8c8427d 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeResourceManager.java @@ -19,11 +19,11 @@ package org.apache.tajo.worker; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.ResourceProtos.TaskAllocationProto; +import org.apache.tajo.ResourceProtos.TaskRequestProto; import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.resource.NodeResource; import org.apache.tajo.resource.NodeResources; @@ -38,8 +38,8 @@ public class MockNodeResourceManager extends NodeResourceManager { volatile boolean enableTaskHandlerEvent = true; private final Semaphore barrier; - public MockNodeResourceManager(Semaphore barrier, Dispatcher dispatcher, EventHandler taskEventHandler) { - super(dispatcher, taskEventHandler); + public MockNodeResourceManager(Semaphore barrier, Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) { + super(dispatcher, workerContext); this.barrier = barrier; } @@ -50,14 +50,7 @@ public class MockNodeResourceManager extends NodeResourceManager { } @Override - protected void startExecutionBlock(TajoWorkerProtocol.RunExecutionBlockRequestProto request) { - if(enableTaskHandlerEvent) { - super.startExecutionBlock(request); - } - } - - @Override - protected void startTask(TajoWorkerProtocol.TaskRequestProto request, NodeResource resource) { + protected void startTask(TaskRequestProto request, NodeResource resource) { if(enableTaskHandlerEvent) { super.startTask(request, resource); } @@ -70,24 +63,23 @@ public class MockNodeResourceManager extends NodeResourceManager { enableTaskHandlerEvent = flag; } - protected static Queue<TajoWorkerProtocol.TaskAllocationRequestProto> createTaskRequests( + protected static Queue<TaskAllocationProto> createTaskRequests( ExecutionBlockId ebId, int memory, int size) { - Queue<TajoWorkerProtocol.TaskAllocationRequestProto> - requestProtoList = new LinkedBlockingQueue<TajoWorkerProtocol.TaskAllocationRequestProto>(); + Queue<TaskAllocationProto> + requestProtoList = new LinkedBlockingQueue<TaskAllocationProto>(); for (int i = 0; i < size; i++) { TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId, i), 0); - TajoWorkerProtocol.TaskRequestProto.Builder builder = - TajoWorkerProtocol.TaskRequestProto.newBuilder(); + TaskRequestProto.Builder builder = TaskRequestProto.newBuilder(); + builder.setQueryMasterHostAndPort("localhost:0"); builder.setId(taskAttemptId.getProto()); - builder.setShouldDie(true); builder.setOutputTable(""); builder.setPlan(PlanProto.LogicalNodeTree.newBuilder()); builder.setClusteredOutput(false); - requestProtoList.add(TajoWorkerProtocol.TaskAllocationRequestProto.newBuilder() + requestProtoList.add(TaskAllocationProto.newBuilder() .setResource(NodeResources.createResource(memory).getProto()) .setTaskRequest(builder.build()).build()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java index dfcfd4f..634398f 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java @@ -21,8 +21,7 @@ package org.apache.tajo.worker; import com.google.common.collect.Maps; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; -import org.apache.tajo.ipc.QueryCoordinatorProtocol; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService; import org.apache.tajo.resource.NodeResource; import org.apache.tajo.resource.NodeResources; @@ -30,7 +29,7 @@ import java.net.ConnectException; import java.util.Map; import java.util.concurrent.CountDownLatch; -import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*; +import static org.apache.tajo.ResourceProtos.*; public class MockNodeStatusUpdater extends NodeStatusUpdater { @@ -39,9 +38,8 @@ public class MockNodeStatusUpdater extends NodeStatusUpdater { private Map<Integer, NodeResource> resources = Maps.newHashMap(); private MockResourceTracker resourceTracker; - public MockNodeStatusUpdater(CountDownLatch barrier, TajoWorker.WorkerContext workerContext, - NodeResourceManager resourceManager) { - super(workerContext, resourceManager); + public MockNodeStatusUpdater(CountDownLatch barrier, TajoWorker.WorkerContext workerContext) { + super(workerContext); this.barrier = barrier; this.resourceTracker = new MockResourceTracker(); } @@ -58,7 +56,7 @@ public class MockNodeStatusUpdater extends NodeStatusUpdater { } class MockResourceTracker implements TajoResourceTrackerProtocolService.Interface { - private NodeHeartbeatRequestProto lastRequest; + private NodeHeartbeatRequest lastRequest; protected Map<Integer, NodeResource> getTotalResource() { return membership; @@ -68,21 +66,15 @@ public class MockNodeStatusUpdater extends NodeStatusUpdater { return membership; } - protected NodeHeartbeatRequestProto getLastRequest() { + protected NodeHeartbeatRequest getLastRequest() { return lastRequest; } @Override - public void heartbeat(RpcController controller, NodeHeartbeat request, - RpcCallback<QueryCoordinatorProtocol.TajoHeartbeatResponse> done) { + public void nodeHeartbeat(RpcController controller, NodeHeartbeatRequest request, + RpcCallback<NodeHeartbeatResponse> done) { - } - - @Override - public void nodeHeartbeat(RpcController controller, NodeHeartbeatRequestProto request, - RpcCallback<NodeHeartbeatResponseProto> done) { - - NodeHeartbeatResponseProto.Builder response = NodeHeartbeatResponseProto.newBuilder(); + NodeHeartbeatResponse.Builder response = NodeHeartbeatResponse.newBuilder(); if (membership.containsKey(request.getWorkerId())) { if (request.hasAvailableResource()) { NodeResource resource = resources.get(request.getWorkerId());
