TAJO-1397: Resource allocation should be fine grained. (jinho) Closes #608
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5a02873d Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5a02873d Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5a02873d Branch: refs/heads/index_support Commit: 5a02873d5f0bd8fedd8527808bd3d4ecbe7d8af3 Parents: bedce3a Author: Jinho Kim <[email protected]> Authored: Mon Jul 20 17:31:22 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Mon Jul 20 17:31:22 2015 +0900 ---------------------------------------------------------------------- .travis.yml | 2 +- CHANGES | 2 + .../org/apache/tajo/catalog/FunctionDesc.java | 7 +- .../org/apache/tajo/catalog/CatalogServer.java | 16 +- .../dictionary/ClusterTableDescriptor.java | 6 +- .../org/apache/tajo/cli/tools/TajoAdmin.java | 23 +- tajo-client/src/main/proto/ClientProtos.proto | 19 +- .../main/java/org/apache/tajo/SessionVars.java | 2 +- .../java/org/apache/tajo/conf/TajoConf.java | 55 +- .../java/org/apache/tajo/util/NumberUtil.java | 4 + .../java/org/apache/tajo/util/StringUtils.java | 9 +- .../main/java/org/apache/tajo/util/TUtil.java | 9 + tajo-core/pom.xml | 13 +- .../apache/tajo/engine/parser/SQLAnalyzer.java | 16 +- .../engine/planner/PhysicalPlannerImpl.java | 24 +- .../tajo/engine/planner/enforce/Enforcer.java | 13 +- .../tajo/engine/planner/global/DataChannel.java | 3 +- .../tajo/engine/planner/global/MasterPlan.java | 11 +- .../global/builder/DistinctGroupbyBuilder.java | 6 +- .../tajo/engine/planner/physical/ScanExec.java | 10 +- .../apache/tajo/engine/query/TaskRequest.java | 27 +- .../tajo/engine/query/TaskRequestImpl.java | 87 +- .../org/apache/tajo/master/ContainerProxy.java | 85 -- .../tajo/master/LaunchTaskRunnersEvent.java | 46 - .../tajo/master/QueryCoordinatorService.java | 77 +- .../org/apache/tajo/master/QueryInProgress.java | 102 ++- .../org/apache/tajo/master/QueryManager.java | 104 +-- .../apache/tajo/master/TajoContainerProxy.java | 177 ---- .../java/org/apache/tajo/master/TajoMaster.java | 120 +-- .../tajo/master/TajoMasterClientService.java | 62 +- .../tajo/master/TaskRunnerGroupEvent.java | 51 -- .../apache/tajo/master/TaskRunnerLauncher.java | 25 - .../master/cluster/WorkerConnectionInfo.java | 4 + .../tajo/master/container/TajoContainer.java | 173 ---- .../tajo/master/container/TajoContainerId.java | 171 ---- .../master/container/TajoContainerIdPBImpl.java | 100 --- .../master/container/TajoConverterUtils.java | 87 -- .../master/event/ContainerAllocationEvent.java | 77 -- .../event/ContainerAllocatorEventType.java | 26 - .../tajo/master/event/ContainerEvent.java | 37 - .../event/GrouppedContainerAllocatorEvent.java | 45 - .../tajo/master/event/LocalTaskEvent.java | 11 +- .../tajo/master/event/QueryStartEvent.java | 9 +- .../event/StageContainerAllocationEvent.java | 38 - .../tajo/master/event/StageEventType.java | 2 - .../master/event/StageShuffleReportEvent.java | 8 +- .../master/event/TaskAttemptAssignedEvent.java | 9 +- .../tajo/master/event/TaskAttemptEventType.java | 1 + .../event/TaskAttemptStatusUpdateEvent.java | 2 +- .../event/TaskAttemptToSchedulerEvent.java | 26 +- .../tajo/master/event/TaskCompletionEvent.java | 2 +- .../tajo/master/event/TaskFatalErrorEvent.java | 2 +- .../tajo/master/event/TaskRequestEvent.java | 35 +- .../NonForwardQueryResultSystemScanner.java | 94 +-- .../org/apache/tajo/master/rm/NodeEvent.java | 37 + .../apache/tajo/master/rm/NodeEventType.java | 30 + .../tajo/master/rm/NodeLivelinessMonitor.java | 56 ++ .../tajo/master/rm/NodeReconnectEvent.java | 35 + .../org/apache/tajo/master/rm/NodeState.java | 44 + .../org/apache/tajo/master/rm/NodeStatus.java | 295 +++++++ .../apache/tajo/master/rm/NodeStatusEvent.java | 58 ++ .../apache/tajo/master/rm/TajoRMContext.java | 42 +- .../tajo/master/rm/TajoResourceManager.java | 185 ++++ .../tajo/master/rm/TajoResourceTracker.java | 197 +++-- .../tajo/master/rm/TajoWorkerContainer.java | 125 --- .../tajo/master/rm/TajoWorkerContainerId.java | 94 --- .../master/rm/TajoWorkerResourceManager.java | 605 ------------- .../java/org/apache/tajo/master/rm/Worker.java | 290 ------- .../org/apache/tajo/master/rm/WorkerEvent.java | 37 - .../apache/tajo/master/rm/WorkerEventType.java | 30 - .../tajo/master/rm/WorkerLivelinessMonitor.java | 56 -- .../tajo/master/rm/WorkerReconnectEvent.java | 35 - .../tajo/master/rm/WorkerResourceManager.java | 115 --- .../org/apache/tajo/master/rm/WorkerState.java | 44 - .../tajo/master/rm/WorkerStatusEvent.java | 54 -- .../scheduler/AbstractQueryScheduler.java | 73 ++ .../master/scheduler/QuerySchedulingInfo.java | 60 +- .../apache/tajo/master/scheduler/QueueInfo.java | 101 +++ .../tajo/master/scheduler/QueueState.java | 50 ++ .../master/scheduler/SchedulingAlgorithms.java | 6 +- .../master/scheduler/SimpleFifoScheduler.java | 148 ---- .../tajo/master/scheduler/SimpleScheduler.java | 388 +++++++++ .../master/scheduler/TajoResourceScheduler.java | 75 ++ .../event/ResourceReserveSchedulerEvent.java | 45 + .../master/scheduler/event/SchedulerEvent.java | 27 + .../scheduler/event/SchedulerEventType.java | 26 + .../metrics/WorkerResourceMetricsGaugeSet.java | 16 +- .../tajo/querymaster/AbstractTaskScheduler.java | 17 +- .../tajo/querymaster/DefaultTaskScheduler.java | 500 ++++++----- .../java/org/apache/tajo/querymaster/Query.java | 25 +- .../apache/tajo/querymaster/QueryMaster.java | 249 +++--- .../querymaster/QueryMasterManagerService.java | 180 ++-- .../tajo/querymaster/QueryMasterTask.java | 218 +++-- .../apache/tajo/querymaster/Repartitioner.java | 4 +- .../java/org/apache/tajo/querymaster/Stage.java | 247 ++---- .../java/org/apache/tajo/querymaster/Task.java | 29 +- .../apache/tajo/querymaster/TaskAttempt.java | 34 +- .../resource/DefaultResourceCalculator.java | 4 +- .../org/apache/tajo/resource/NodeResource.java | 2 +- .../java/org/apache/tajo/session/Session.java | 2 +- .../main/java/org/apache/tajo/util/JSPUtil.java | 33 +- .../apache/tajo/util/history/HistoryReader.java | 8 +- .../apache/tajo/util/history/HistoryWriter.java | 78 +- .../tajo/worker/AbstractResourceAllocator.java | 69 -- .../tajo/worker/ExecutionBlockContext.java | 70 +- .../java/org/apache/tajo/worker/FetchImpl.java | 12 +- .../org/apache/tajo/worker/LegacyTaskImpl.java | 844 ------------------- .../apache/tajo/worker/NodeResourceManager.java | 140 +-- .../apache/tajo/worker/NodeStatusUpdater.java | 120 ++- .../apache/tajo/worker/ResourceAllocator.java | 29 - .../tajo/worker/TajoResourceAllocator.java | 415 --------- .../java/org/apache/tajo/worker/TajoWorker.java | 96 +-- .../tajo/worker/TajoWorkerManagerService.java | 79 +- .../main/java/org/apache/tajo/worker/Task.java | 9 +- .../org/apache/tajo/worker/TaskContainer.java | 1 - .../org/apache/tajo/worker/TaskExecutor.java | 117 ++- .../org/apache/tajo/worker/TaskHistory.java | 4 +- .../java/org/apache/tajo/worker/TaskImpl.java | 31 +- .../org/apache/tajo/worker/TaskManager.java | 161 +++- .../java/org/apache/tajo/worker/TaskRunner.java | 306 ------- .../apache/tajo/worker/TaskRunnerHistory.java | 152 ---- .../apache/tajo/worker/TaskRunnerManager.java | 238 ------ .../tajo/worker/WorkerHeartbeatService.java | 262 ------ .../worker/event/ExecutionBlockErrorEvent.java | 41 + .../worker/event/ExecutionBlockStartEvent.java | 35 - .../worker/event/ExecutionBlockStopEvent.java | 16 +- .../worker/event/NodeResourceAllocateEvent.java | 18 +- .../event/NodeResourceDeallocateEvent.java | 8 +- .../tajo/worker/event/NodeResourceEvent.java | 14 +- .../worker/event/QMResourceAllocateEvent.java | 45 + .../tajo/worker/event/QueryStopEvent.java | 35 + .../tajo/worker/event/TaskExecutorEvent.java | 44 - .../tajo/worker/event/TaskManagerEvent.java | 22 +- .../tajo/worker/event/TaskRunnerEvent.java | 42 - .../tajo/worker/event/TaskRunnerStartEvent.java | 39 - .../tajo/worker/event/TaskRunnerStopEvent.java | 29 - .../tajo/worker/event/TaskStartEvent.java | 18 +- .../tajo/ws/rs/resources/ClusterResource.java | 8 +- .../tajo/ws/rs/responses/WorkerResponse.java | 53 +- .../src/main/proto/ContainerProtocol.proto | 48 -- .../main/proto/QueryCoordinatorProtocol.proto | 117 +-- .../src/main/proto/QueryMasterProtocol.proto | 11 +- tajo-core/src/main/proto/ResourceProtos.proto | 311 +++++++ .../main/proto/ResourceTrackerProtocol.proto | 39 +- .../src/main/proto/TajoWorkerProtocol.proto | 348 +------- .../main/resources/webapps/admin/cluster.jsp | 99 +-- .../src/main/resources/webapps/admin/index.jsp | 34 +- .../src/main/resources/webapps/admin/query.jsp | 57 +- .../resources/webapps/admin/query_executor.jsp | 3 +- .../resources/webapps/admin/querydetail.jsp | 4 +- .../main/resources/webapps/admin/querytasks.jsp | 34 +- .../src/main/resources/webapps/admin/task.jsp | 18 +- .../src/main/resources/webapps/worker/index.jsp | 58 +- .../resources/webapps/worker/querydetail.jsp | 21 +- .../main/resources/webapps/worker/queryplan.jsp | 2 +- .../resources/webapps/worker/querytasks.jsp | 44 +- .../src/main/resources/webapps/worker/task.jsp | 16 +- .../resources/webapps/worker/taskcontainers.jsp | 93 -- .../resources/webapps/worker/taskdetail.jsp | 39 +- .../resources/webapps/worker/taskhistory.jsp | 6 +- .../src/main/resources/webapps/worker/tasks.jsp | 107 --- .../java/org/apache/tajo/QueryTestCaseBase.java | 2 +- .../org/apache/tajo/TajoTestingCluster.java | 24 +- .../planner/physical/TestBNLJoinExec.java | 2 +- .../physical/TestFullOuterHashJoinExec.java | 2 +- .../physical/TestFullOuterMergeJoinExec.java | 2 +- .../planner/physical/TestHashJoinExec.java | 2 +- .../physical/TestLeftOuterHashJoinExec.java | 2 +- .../planner/physical/TestMergeJoinExec.java | 2 +- .../planner/physical/TestPhysicalPlanner.java | 4 +- .../physical/TestRightOuterMergeJoinExec.java | 2 +- .../query/TestJoinOnPartitionedTables.java | 1 + .../tajo/engine/query/TestOuterJoinQuery.java | 2 - .../TestNonForwardQueryResultSystemScanner.java | 42 +- .../apache/tajo/master/TestRepartitioner.java | 11 +- .../tajo/master/rm/TestTajoResourceManager.java | 454 ---------- .../master/scheduler/TestFifoScheduler.java | 115 --- .../master/scheduler/TestSimpleScheduler.java | 301 +++++++ .../apache/tajo/querymaster/TestKillQuery.java | 166 +--- .../apache/tajo/querymaster/TestQueryState.java | 38 +- .../tajo/util/metrics/TestSystemMetrics.java | 1 + .../apache/tajo/worker/MockExecutionBlock.java | 6 +- .../tajo/worker/MockNodeResourceManager.java | 30 +- .../tajo/worker/MockNodeStatusUpdater.java | 26 +- .../apache/tajo/worker/MockTaskExecutor.java | 29 +- .../org/apache/tajo/worker/MockTaskManager.java | 24 +- .../apache/tajo/worker/MockWorkerContext.java | 20 - .../org/apache/tajo/worker/TestFetcher.java | 14 +- .../org/apache/tajo/worker/TestHistory.java | 124 --- .../tajo/worker/TestNodeResourceManager.java | 100 ++- .../tajo/worker/TestNodeStatusUpdater.java | 83 +- .../apache/tajo/worker/TestTaskExecutor.java | 108 +-- .../org/apache/tajo/worker/TestTaskManager.java | 75 +- .../configuration/worker_configuration.rst | 94 ++- .../org/apache/tajo/plan/util/PlannerUtil.java | 8 +- tajo-plan/src/main/proto/Plan.proto | 125 +++ tajo-project/pom.xml | 2 +- .../org/apache/tajo/rpc/AsyncRpcServer.java | 6 +- 198 files changed, 5011 insertions(+), 9450 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index ef8fd97..671a31f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,7 +33,7 @@ notifications: - [email protected] irc: "chat.freenode.net#tajo" -before_install: ulimit -t 514029 +before_install: ulimit -t 514029 -u 2048 -n 3000 install: ./dev-support/travis-install-dependencies.sh script: http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 890aecf..1529c08 100644 --- a/CHANGES +++ b/CHANGES @@ -29,6 +29,8 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1397: Resource allocation should be fine grained. (jinho) + TAJO-1352: Improve the join order algorithm to consider missed cases of associative join operators. (jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java index 6ea6ac6..9f71e8e 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/FunctionDesc.java @@ -38,7 +38,6 @@ import java.lang.reflect.Constructor; * */ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable, GsonObject, Comparable<FunctionDesc> { - private FunctionDescProto.Builder builder = FunctionDescProto.newBuilder(); @Expose private FunctionSignature signature; @Expose private FunctionInvocation invocation; @@ -184,11 +183,7 @@ public class FunctionDesc implements ProtoObject<FunctionDescProto>, Cloneable, @Override public FunctionDescProto getProto() { - if (builder == null) { - builder = FunctionDescProto.newBuilder(); - } else { - builder.clear(); - } + FunctionDescProto.Builder builder = FunctionDescProto.newBuilder(); builder.setSignature(signature.getProto()); builder.setSupplement(supplement.getProto()); builder.setInvocation(invocation.getProto()); http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java index f2e9795..b1410dd 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java @@ -172,7 +172,8 @@ public class CatalogServer extends AbstractService { } } - public void start() { + @Override + public void serviceStart() throws Exception { String serverAddr = conf.getVar(ConfVars.CATALOG_ADDRESS); InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr); int workerNum = conf.getIntVar(ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM); @@ -189,10 +190,11 @@ public class CatalogServer extends AbstractService { } LOG.info("Catalog Server startup (" + bindAddressStr + ")"); - super.start(); + super.serviceStart(); } - public void stop() { + @Override + public void serviceStop() throws Exception { LOG.info("Catalog Server (" + bindAddressStr + ") shutdown"); // If CatalogServer shutdowns before it started, rpcServer and store may be NULL. @@ -201,13 +203,9 @@ public class CatalogServer extends AbstractService { this.rpcServer.shutdown(); } if (store != null) { - try { - store.close(); - } catch (IOException ioe) { - LOG.error(ioe.getMessage(), ioe); - } + store.close(); } - super.stop(); + super.serviceStop(); } public CatalogProtocolHandler getHandler() { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java index e3c830f..69067f4 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/ClusterTableDescriptor.java @@ -31,10 +31,8 @@ class ClusterTableDescriptor extends AbstractTableDescriptor { new ColumnDescriptor("total_cpu", Type.INT4, 0), new ColumnDescriptor("used_mem", Type.INT8, 0), new ColumnDescriptor("total_mem", Type.INT8, 0), - new ColumnDescriptor("free_heap", Type.INT8, 0), - new ColumnDescriptor("max_heap", Type.INT8, 0), - new ColumnDescriptor("used_diskslots", Type.FLOAT4, 0), - new ColumnDescriptor("total_diskslots", Type.FLOAT4, 0), + new ColumnDescriptor("used_disk", Type.INT4, 0), + new ColumnDescriptor("total_disk", Type.INT4, 0), new ColumnDescriptor("running_tasks", Type.INT4, 0), new ColumnDescriptor("last_heartbeat_ts", Type.TIMESTAMP, 0) }; http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java index 739cd54..76ba7a9 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java @@ -268,7 +268,7 @@ public class TajoAdmin { } else { String fmtQueryMasterLine = "%1$-25s %2$-5s %3$-5s %4$-10s %5$-10s%n"; line = String.format(fmtQueryMasterLine, "QueryMaster", "Port", "Query", - "Heap", "Status"); + "Mem", "Status"); writer.write(line); line = String.format(fmtQueryMasterLine, DASHLINE_LEN25, DASHLINE_LEN5, DASHLINE_LEN5, DASHLINE_LEN10, DASHLINE_LEN10); @@ -276,12 +276,12 @@ public class TajoAdmin { for (WorkerResourceInfo queryMaster : liveQueryMasters) { TajoProtos.WorkerConnectionInfoProto connInfo = queryMaster.getConnectionInfo(); String queryMasterHost = String.format("%s:%d", connInfo.getHost(), connInfo.getQueryMasterPort()); - String heap = String.format("%d MB", queryMaster.getMaxHeap() / 1024 / 1024); + String memory = String.format("%d MB", queryMaster.getAvailableResource().getMemory()); line = String.format(fmtQueryMasterLine, queryMasterHost, connInfo.getClientPort(), queryMaster.getNumQueryMasterTasks(), - heap, + memory, queryMaster.getWorkerStatus()); writer.write(line); } @@ -348,7 +348,7 @@ public class TajoAdmin { String line = String.format(fmtWorkerLine, "Worker", "Port", "Tasks", "Mem", "Disk", - "Heap", "Status"); + "Cpu", "Status"); writer.write(line); line = String.format(fmtWorkerLine, DASHLINE_LEN25, DASHLINE_LEN5, DASHLINE_LEN5, @@ -359,17 +359,16 @@ public class TajoAdmin { for (WorkerResourceInfo worker : workers) { TajoProtos.WorkerConnectionInfoProto connInfo = worker.getConnectionInfo(); String workerHost = String.format("%s:%d", connInfo.getHost(), connInfo.getPeerRpcPort()); - String mem = String.format("%d/%d", worker.getUsedMemoryMB(), - worker.getMemoryMB()); - String disk = String.format("%.2f/%.2f", worker.getUsedDiskSlots(), - worker.getDiskSlots()); - String heap = String.format("%d/%d MB", worker.getFreeHeap()/1024/1024, - worker.getMaxHeap()/1024/1024); - + String mem = String.format("%d/%d", worker.getAvailableResource().getMemory(), + worker.getTotalResource().getMemory()); + String disk = String.format("%d/%d", worker.getAvailableResource().getDisks(), + worker.getTotalResource().getDisks()); + String cpu = String.format("%d/%d", worker.getAvailableResource().getVirtualCores(), + worker.getTotalResource().getVirtualCores()); line = String.format(fmtWorkerLine, workerHost, connInfo.getPullServerPort(), worker.getNumRunningTasks(), - mem, disk, heap, worker.getWorkerStatus()); + mem, disk, cpu, worker.getWorkerStatus()); writer.write(line); } writer.write("\n\n"); http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-client/src/main/proto/ClientProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index 5497faa..9c20fd8 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -177,19 +177,12 @@ message GetClusterInfoRequest { message WorkerResourceInfo { required WorkerConnectionInfoProto connectionInfo = 1; - required float diskSlots = 2; - required int32 cpuCoreSlots = 3; - required int32 memoryMB = 4; - required float usedDiskSlots = 5; - required int32 usedMemoryMB = 6; - required int32 usedCpuCoreSlots = 7; - required int64 maxHeap = 8; - required int64 freeHeap = 9; - required int64 totalHeap = 10; - required int32 numRunningTasks = 11; - required string workerStatus = 12; - required int64 lastHeartbeat = 13; - required int32 numQueryMasterTasks = 14; + required NodeResourceProto totalResource = 2; + required NodeResourceProto availableResource = 3; + required int32 numRunningTasks = 4; + required string workerStatus = 5; + required int64 lastHeartbeat = 6; + required int32 numQueryMasterTasks = 7; } message GetClusterInfoResponse { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-common/src/main/java/org/apache/tajo/SessionVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index 28fdb0b..832a5b4 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -108,7 +108,7 @@ public enum SessionVars implements ConfigKey { Boolean.class, Validators.bool()), QUERY_EXECUTE_PARALLEL(ConfVars.$QUERY_EXECUTE_PARALLEL_MAX, "Maximum parallel running of execution blocks for a query", - DEFAULT, Integer.class, Validators.min("0")), + DEFAULT, Integer.class, Validators.min("1")), // for physical Executors EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT, http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 14cfb11..0436116 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -152,12 +152,13 @@ public class TajoConf extends Configuration { // Resource tracker service RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "localhost:26003", Validators.networkAddr()), - RESOURCE_TRACKER_HEARTBEAT_TIMEOUT("tajo.resource-tracker.heartbeat.timeout-secs", 120 * 1000), // seconds + RESOURCE_TRACKER_HEARTBEAT_TIMEOUT("tajo.resource-tracker.heartbeat.timeout-secs", 120), // seconds // QueryMaster resource - TAJO_QUERYMASTER_DISK_SLOT("tajo.qm.resource.disk.slots", 0.0f, Validators.min("0.0f")), - TAJO_QUERYMASTER_MEMORY_MB("tajo.qm.resource.memory-mb", 512, Validators.min("64")), - TAJO_QUERYMASTER_ALLOCATION_TIMEOUT("tajo.qm.resource.allocation.timeout", "3 sec"), + QUERYMASTER_MINIMUM_MEMORY("tajo.qm.resource.min.memory-mb", 500, Validators.min("64")), + + // Worker task resource + TASK_RESOURCE_MINIMUM_MEMORY("tajo.task.resource.min.memory-mb", 500, Validators.min("64")), // Tajo Worker Service Addresses WORKER_INFO_ADDRESS("tajo.worker.info-http.address", "0.0.0.0:28080", Validators.networkAddr()), @@ -172,42 +173,32 @@ public class TajoConf extends Configuration { // Tajo Worker Resources WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", - Runtime.getRuntime().availableProcessors(), Validators.min("1")), - WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024, Validators.min("64")), - @Deprecated - WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f), - WORKER_RESOURCE_AVAILABLE_DISKS_NUM("tajo.worker.resource.disks.num", 1, Validators.min("1")), + Runtime.getRuntime().availableProcessors(), Validators.min("2")), // 1qm + 1task + WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1000, Validators.min("64")), + + WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1, Validators.min("1")), + WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM("tajo.worker.resource.disk.parallel-execution.num", 2, Validators.min("1")), - WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2), - WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false, Validators.bool()), - // Tajo Worker Dedicated Resources - WORKER_RESOURCE_DEDICATED("tajo.worker.resource.dedicated", false, Validators.bool()), - WORKER_RESOURCE_DEDICATED_MEMORY_RATIO("tajo.worker.resource.dedicated-memory-ratio", 0.8f, - Validators.range("0.0f", "1.0f")), + WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false, Validators.bool()), // Tajo History WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 60), // 1 hours - QUERYMASTER_HISTORY_EXPIRE_PERIOD("tajo.qm.history.expire-interval-minutes", 6 * 60), // 6 hours + QUERYMASTER_CACHE_EXPIRE_PERIOD("tajo.qm.history.expire-interval-minutes", 10), // 10 mins - WORKER_HEARTBEAT_INTERVAL("tajo.worker.heartbeat.interval", 10 * 1000), // 10 sec + WORKER_HEARTBEAT_IDLE_INTERVAL("tajo.worker.heartbeat.idle.interval", 10 * 1000), // 10 sec + WORKER_HEARTBEAT_ACTIVE_INTERVAL("tajo.worker.heartbeat.active.interval", 1000), // 1 sec - // Resource Manager - RESOURCE_MANAGER_CLASS("tajo.resource.manager", "org.apache.tajo.master.rm.TajoWorkerResourceManager", + //Default query scheduler + RESOURCE_SCHEDULER_CLASS("tajo.resource.scheduler", "org.apache.tajo.master.scheduler.SimpleScheduler", Validators.groups(Validators.notNull(), Validators.clazz())), + QUERYMASTER_TASK_SCHEDULER_DELAY("tajo.qm.task-scheduler.delay", 50), // 50 ms + // Catalog CATALOG_ADDRESS("tajo.catalog.client-rpc.address", "localhost:26005", Validators.networkAddr()), - - // for Yarn Resource Manager ---------------------------------------------- - - /** how many launching TaskRunners in parallel */ - @Deprecated - YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num", - Runtime.getRuntime().availableProcessors() * 2), - // Query Configuration QUERY_SESSION_TIMEOUT("tajo.query.session.timeout-sec", 60, Validators.min("0")), QUERY_SESSION_QUERY_CACHE_SIZE("tajo.query.session.query-cache-size-kb", 1024, Validators.min("0")), @@ -216,7 +207,8 @@ public class TajoConf extends Configuration { PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")), SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()), SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"), - SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num", 2), + SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num", + Runtime.getRuntime().availableProcessors() * 2, Validators.min("1")), SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192), SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 120), SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 20), @@ -262,8 +254,6 @@ public class TajoConf extends Configuration { Runtime.getRuntime().availableProcessors() * 1), // Task Configuration ----------------------------------------------------- - TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512), - TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 0.5f), TASK_DEFAULT_SIZE("tajo.task.size-mb", 128), // Query and Optimization ------------------------------------------------- @@ -285,8 +275,6 @@ public class TajoConf extends Configuration { HISTORY_QUERY_DIR("tajo.history.query.dir", STAGING_ROOT_DIR.defaultVal + "/history"), HISTORY_TASK_DIR("tajo.history.task.dir", "file:///tmp/tajo-${user.name}/history"), HISTORY_EXPIRY_TIME_DAY("tajo.history.expiry-time-day", 7), - HISTORY_QUERY_REPLICATION("tajo.history.query.replication", 1, Validators.min("1")), - HISTORY_TASK_REPLICATION("tajo.history.task.replication", 1, Validators.min("1")), // Misc ------------------------------------------------------------------- // Fragment @@ -329,8 +317,7 @@ public class TajoConf extends Configuration { $GROUPBY_MULTI_LEVEL_ENABLED("tajo.dist-query.groupby.multi-level-aggr", true), - // WARN "tajo.yarn-rm.parallel-task-runner-launcher-num" should be set enough to avoid deadlock - $QUERY_EXECUTE_PARALLEL_MAX("tajo.query.execute.parallel.max", 1), + $QUERY_EXECUTE_PARALLEL_MAX("tajo.query.execute.parallel.max", 10), // for physical Executors $EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L), http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java index 0d70cc2..32e086c 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/NumberUtil.java @@ -1050,4 +1050,8 @@ public class NumberUtil { return returnNumber; } + + public static int compare(long x, long y) { + return (x < y) ? -1 : ((x == y) ? 0 : 1); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java index 0a16072..018c62a 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java @@ -18,6 +18,7 @@ package org.apache.tajo.util; +import io.netty.util.CharsetUtil; import org.apache.commons.lang.CharUtils; import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang.SystemUtils; @@ -77,10 +78,12 @@ public class StringUtils { } return buf.toString(); } - - static CharsetEncoder asciiEncoder = Charset.forName("US-ASCII").newEncoder(); // or "ISO-8859-1" for ISO Latin 1 - + /** + * Check Seven-bit ASCII + */ public static boolean isPureAscii(String v) { + // get thread-safe encoder + CharsetEncoder asciiEncoder = CharsetUtil.getEncoder(CharsetUtil.US_ASCII); return asciiEncoder.canEncode(v); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java index 2293ef5..66e8acc 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java @@ -267,4 +267,13 @@ public class TUtil { StackTraceElement element = ste[2 + depth]; return element.getClassName() + ":" + element.getMethodName() + "(" + element.getLineNumber() +")"; } + + public static <T> T checkTypeAndGet(Object instance, Class<T> type) { + if (!type.isInstance(instance)) { + throw new IllegalArgumentException(instance.getClass().getSimpleName() + + " must be a " + type.getSimpleName() + " type"); + + } + return (T) instance; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml index 33e37b6..bd52e12 100644 --- a/tajo-core/pom.xml +++ b/tajo-core/pom.xml @@ -163,12 +163,12 @@ <argument>--proto_path=../tajo-client/src/main/proto</argument> <argument>--proto_path=../tajo-plan/src/main/proto</argument> <argument>--java_out=target/generated-sources/proto</argument> - <argument>src/main/proto/ContainerProtocol.proto</argument> <argument>src/main/proto/ResourceTrackerProtocol.proto</argument> <argument>src/main/proto/QueryMasterProtocol.proto</argument> <argument>src/main/proto/QueryCoordinatorProtocol.proto</argument> <argument>src/main/proto/TajoWorkerProtocol.proto</argument> <argument>src/main/proto/InternalTypes.proto</argument> + <argument>src/main/proto/ResourceProtos.proto</argument> </arguments> </configuration> <goals> @@ -390,11 +390,6 @@ <version>1.2.3</version> </dependency> <dependency> - <groupId>org.eclipse.jdt</groupId> - <artifactId>core</artifactId> - <version>3.1.1</version> - </dependency> - <dependency> <groupId>io.netty</groupId> <artifactId>netty-codec-http</artifactId> </dependency> @@ -407,6 +402,12 @@ <groupId>org.mortbay.jetty</groupId> <artifactId>jsp-2.1</artifactId> <version>6.1.14</version> + <exclusions> + <exclusion> + <artifactId>core</artifactId> + <groupId>org.eclipse.jdt</groupId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>com.codahale.metrics</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java index 9f850f9..5c62654 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java @@ -28,9 +28,7 @@ import org.apache.tajo.SessionVars; import org.apache.tajo.algebra.*; import org.apache.tajo.algebra.Aggregation.GroupType; import org.apache.tajo.algebra.LiteralValue.LiteralType; -import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.engine.parser.SQLParser.*; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.StringUtils; @@ -44,7 +42,6 @@ import static org.apache.tajo.common.TajoDataTypes.Type; import static org.apache.tajo.engine.parser.SQLParser.*; public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { - private SQLParser parser; public SQLAnalyzer() { } @@ -53,15 +50,14 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { ANTLRInputStream input = new ANTLRInputStream(sql); SQLLexer lexer = new SQLLexer(input); CommonTokenStream tokens = new CommonTokenStream(lexer); - this.parser = new SQLParser(tokens); - parser.setBuildParseTree(true); - parser.removeErrorListeners(); - - parser.setErrorHandler(new SQLErrorStrategy()); - parser.addErrorListener(new SQLErrorListener()); - SqlContext context; try { + SQLParser parser = new SQLParser(tokens); + parser.setBuildParseTree(true); + parser.removeErrorListeners(); + + parser.setErrorHandler(new SQLErrorStrategy()); + parser.addErrorListener(new SQLErrorListener()); context = parser.sql(); } catch (SQLParseError e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index c6b9b41..377aebe 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -40,11 +40,10 @@ import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.physical.*; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.InternalException; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer; -import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; -import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; -import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray; +import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer; +import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; +import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; +import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.serder.LogicalNodeDeserializer; @@ -66,12 +65,13 @@ import java.util.Stack; import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType; -import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm; -import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; -import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; -import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm; -import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm; -import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce; +import static org.apache.tajo.plan.serder.PlanProto.ColumnPartitionEnforcer.ColumnPartitionAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.EnforceProperty; +import static org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType; +import static org.apache.tajo.plan.serder.PlanProto.GroupbyEnforce.GroupbyAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.JoinEnforce.JoinAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.SortedInputEnforce; +import static org.apache.tajo.plan.serder.PlanProto.SortEnforce; public class PhysicalPlannerImpl implements PhysicalPlanner { private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class); @@ -885,7 +885,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { List<EnforceProperty> property = enforcer.getEnforceProperties(EnforceType.SORTED_INPUT); if (property != null && property.size() > 0 && node.peek().getType() == NodeType.SORT) { SortNode sortNode = (SortNode) node.peek(); - TajoWorkerProtocol.SortedInputEnforce sortEnforcer = property.get(0).getSortedInput(); + SortedInputEnforce sortEnforcer = property.get(0).getSortedInput(); boolean condition = scanNode.getTableName().equals(sortEnforcer.getTableName()); SortSpec [] sortSpecs = LogicalNodeDeserializer.convertSortSpecs(sortEnforcer.getSortSpecsList()); http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java index 8128390..92ecadd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java @@ -18,24 +18,21 @@ package org.apache.tajo.engine.planner.enforce; - import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.ProtoObject; -import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; -import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; -import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.util.TUtil; import java.util.Collection; import java.util.List; import java.util.Map; -import static org.apache.tajo.ipc.TajoWorkerProtocol.*; -import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm; -import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; -import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.*; +import static org.apache.tajo.plan.serder.PlanProto.GroupbyEnforce.GroupbyAlgorithm; +import static org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.*; +import static org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType; +import static org.apache.tajo.plan.serder.PlanProto.ColumnPartitionEnforcer.ColumnPartitionAlgorithm; public class Enforcer implements ProtoObject<EnforcerProto> { Map<EnforceType, List<EnforceProperty>> properties; http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java index 3adc0a3..e09684a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java @@ -25,8 +25,7 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.util.StringUtils; -import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import static org.apache.tajo.ipc.TajoWorkerProtocol.*; +import static org.apache.tajo.plan.serder.PlanProto.DataChannelProto; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; import static org.apache.tajo.plan.serder.PlanProto.TransmitType; http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java index f8cd1e9..80317b0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java @@ -23,13 +23,14 @@ package org.apache.tajo.engine.planner.global; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; +import org.apache.tajo.plan.serder.PlanProto.ShuffleType; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.util.graph.DirectedGraphVisitor; import org.apache.tajo.util.graph.SimpleDirectedGraph; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.TajoWorkerProtocol; import java.util.ArrayList; import java.util.HashMap; @@ -37,8 +38,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; - public class MasterPlan { private final QueryId queryId; private final QueryContext context; @@ -288,7 +287,7 @@ public class MasterPlan { if (block.getEnforcer().getProperties().size() > 0) { sb.append("\n[Enforcers]\n"); int i = 0; - for (TajoWorkerProtocol.EnforceProperty enforce : block.getEnforcer().getProperties()) { + for (EnforceProperty enforce : block.getEnforcer().getProperties()) { sb.append(" ").append(i++).append(": "); sb.append(Enforcer.toString(enforce)); sb.append("\n"); http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java index 8095458..f181193 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java @@ -30,9 +30,9 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.GlobalPlanner.GlobalPlanContext; import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; -import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; -import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray; +import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm; +import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; +import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.SortSpecArray; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.PlanningException; http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java index 86874ba..5cca4c5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java @@ -21,7 +21,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.engine.planner.enforce.Enforcer; -import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -57,11 +57,11 @@ public abstract class ScanExec extends PhysicalExec { private boolean checkIfBroadcast() { Enforcer enforcer = context.getEnforcer(); - if (enforcer != null && enforcer.hasEnforceProperty(TajoWorkerProtocol.EnforceProperty.EnforceType.BROADCAST)) { - List<TajoWorkerProtocol.EnforceProperty> properties = - enforcer.getEnforceProperties(TajoWorkerProtocol.EnforceProperty.EnforceType.BROADCAST); + if (enforcer != null && enforcer.hasEnforceProperty(EnforceProperty.EnforceType.BROADCAST)) { + List<EnforceProperty> properties = + enforcer.getEnforceProperties(EnforceProperty.EnforceType.BROADCAST); - for (TajoWorkerProtocol.EnforceProperty property : properties) { + for (EnforceProperty property : properties) { if (getCanonicalName().equals(property.getBroadcast().getTableName())) { return true; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java index 2fa272a..48d4780 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java @@ -21,32 +21,27 @@ */ package org.apache.tajo.engine.query; +import org.apache.tajo.ResourceProtos.TaskRequestProto; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; -import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.worker.FetchImpl; import java.util.List; -public interface TaskRequest extends ProtoObject<TajoWorkerProtocol.TaskRequestProto> { +public interface TaskRequest extends ProtoObject<TaskRequestProto> { - public TaskAttemptId getId(); - public List<CatalogProtos.FragmentProto> getFragments(); - public String getOutputTableId(); - public boolean isClusteredOutput(); - public PlanProto.LogicalNodeTree getPlan(); - public boolean isInterQuery(); - public void setInterQuery(); - public void addFetch(String name, FetchImpl fetch); - public List<FetchImpl> getFetches(); - public boolean shouldDie(); - public void setShouldDie(); - public QueryContext getQueryContext(TajoConf conf); - public DataChannel getDataChannel(); - public Enforcer getEnforcer(); + TaskAttemptId getId(); + List<CatalogProtos.FragmentProto> getFragments(); + PlanProto.LogicalNodeTree getPlan(); + void setInterQuery(); + void addFetch(String name, FetchImpl fetch); + List<FetchImpl> getFetches(); + QueryContext getQueryContext(TajoConf conf); + DataChannel getDataChannel(); + Enforcer getEnforcer(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java index b4727dc..f97d005 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java @@ -22,9 +22,9 @@ import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto; -import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProtoOrBuilder; +import org.apache.tajo.ResourceProtos.TaskRequestProto; +import org.apache.tajo.ResourceProtos.FetchProto; +import org.apache.tajo.ResourceProtos.TaskRequestProtoOrBuilder; import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.worker.FetchImpl; @@ -34,22 +34,22 @@ import java.util.List; import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; public class TaskRequestImpl implements TaskRequest { - - private TaskAttemptId id; - private List<FragmentProto> fragments; - private String outputTable; + + private TaskAttemptId id; + private List<FragmentProto> fragments; + private String outputTable; private boolean isUpdated; private boolean clusteredOutput; private PlanProto.LogicalNodeTree plan; // logical node private Boolean interQuery; private List<FetchImpl> fetches; - private Boolean shouldDie; - private QueryContext queryContext; - private DataChannel dataChannel; - private Enforcer enforcer; + private QueryContext queryContext; + private DataChannel dataChannel; + private Enforcer enforcer; + private String queryMasterHostAndPort; - private TaskRequestProto proto = TajoWorkerProtocol.TaskRequestProto.getDefaultInstance(); - private TajoWorkerProtocol.TaskRequestProto.Builder builder = null; + private TaskRequestProto proto = TaskRequestProto.getDefaultInstance(); + private TaskRequestProto.Builder builder = null; private boolean viaProto = false; public TaskRequestImpl() { @@ -61,9 +61,9 @@ public class TaskRequestImpl implements TaskRequest { public TaskRequestImpl(TaskAttemptId id, List<FragmentProto> fragments, String outputTable, boolean clusteredOutput, PlanProto.LogicalNodeTree plan, QueryContext queryContext, DataChannel channel, - Enforcer enforcer) { + Enforcer enforcer, String queryMasterHostAndPort) { this(); - this.set(id, fragments, outputTable, clusteredOutput, plan, queryContext, channel, enforcer); + this.set(id, fragments, outputTable, clusteredOutput, plan, queryContext, channel, enforcer, queryMasterHostAndPort); } public TaskRequestImpl(TaskRequestProto proto) { @@ -75,7 +75,8 @@ public class TaskRequestImpl implements TaskRequest { public void set(TaskAttemptId id, List<FragmentProto> fragments, String outputTable, boolean clusteredOutput, - PlanProto.LogicalNodeTree plan, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) { + PlanProto.LogicalNodeTree plan, QueryContext queryContext, + DataChannel dataChannel, Enforcer enforcer, String queryMasterHostAndPort) { this.id = id; this.fragments = fragments; this.outputTable = outputTable; @@ -86,6 +87,7 @@ public class TaskRequestImpl implements TaskRequest { this.queryContext = queryContext; this.dataChannel = dataChannel; this.enforcer = enforcer; + this.queryMasterHostAndPort = queryMasterHostAndPort; } @Override @@ -124,32 +126,6 @@ public class TaskRequestImpl implements TaskRequest { return this.fragments; } - @Override - public String getOutputTableId() { - TaskRequestProtoOrBuilder p = viaProto ? proto : builder; - if (outputTable != null) { - return this.outputTable; - } - if (!p.hasOutputTable()) { - return null; - } - this.outputTable = p.getOutputTable(); - return this.outputTable; - } - - @Override - public boolean isClusteredOutput() { - TaskRequestProtoOrBuilder p = viaProto ? proto : builder; - if (isUpdated) { - return this.clusteredOutput; - } - if (!p.hasClusteredOutput()) { - return false; - } - this.clusteredOutput = p.getClusteredOutput(); - this.isUpdated = true; - return this.clusteredOutput; - } @Override public PlanProto.LogicalNodeTree getPlan() { @@ -248,33 +224,14 @@ public class TaskRequestImpl implements TaskRequest { } TaskRequestProtoOrBuilder p = viaProto ? proto : builder; this.fetches = new ArrayList<FetchImpl>(); - for(TajoWorkerProtocol.FetchProto fetch : p.getFetchesList()) { + for(FetchProto fetch : p.getFetchesList()) { fetches.add(new FetchImpl(fetch)); } } - @Override - public boolean shouldDie() { - TaskRequestProtoOrBuilder p = viaProto ? proto : builder; - if (shouldDie != null) { - return shouldDie; - } - if (!p.hasShouldDie()) { - return false; - } - this.shouldDie = p.getShouldDie(); - return this.shouldDie; - } - - @Override - public void setShouldDie() { - maybeInitBuilder(); - shouldDie = true; - } - private void maybeInitBuilder() { if (viaProto || builder == null) { - builder = TajoWorkerProtocol.TaskRequestProto.newBuilder(proto); + builder = TaskRequestProto.newBuilder(proto); } viaProto = true; } @@ -305,8 +262,8 @@ public class TaskRequestImpl implements TaskRequest { builder.addFetches(fetches.get(i).getProto()); } } - if (this.shouldDie != null) { - builder.setShouldDie(this.shouldDie); + if (this.queryMasterHostAndPort != null) { + builder.setQueryMasterHostAndPort(this.queryMasterHostAndPort); } if (this.queryContext != null) { builder.setQueryContext(queryContext.getProto()); http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java deleted file mode 100644 index cad63a0..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.querymaster.QueryMasterTask; -import org.apache.tajo.master.container.TajoContainer; -import org.apache.tajo.master.container.TajoContainerId; - -public abstract class ContainerProxy { - protected static final Log LOG = LogFactory.getLog(ContainerProxy.class); - - final public static FsPermission QUERYCONF_FILE_PERMISSION = - FsPermission.createImmutable((short) 0644); // rw-r--r-- - - - protected static enum ContainerState { - PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH - } - - protected final ExecutionBlockId executionBlockId; - protected Configuration conf; - protected QueryMasterTask.QueryMasterTaskContext context; - - protected ContainerState state; - // store enough information to be able to cleanup the container - protected TajoContainer container; - protected TajoContainerId containerId; - protected String hostName; - protected int port = -1; - - public abstract void launch(ContainerLaunchContext containerLaunchContext); - public abstract void stopContainer(); - - public ContainerProxy(QueryMasterTask.QueryMasterTaskContext context, Configuration conf, - ExecutionBlockId executionBlockId, TajoContainer container) { - this.context = context; - this.conf = conf; - this.state = ContainerState.PREP; - this.container = container; - this.executionBlockId = executionBlockId; - this.containerId = container.getId(); - } - - public synchronized boolean isCompletelyDone() { - return state == ContainerState.DONE || state == ContainerState.FAILED; - } - - public String getTaskHostName() { - return this.hostName; - } - - public int getTaskPort() { - return this.port; - } - - public TajoContainerId getContainerId() { - return containerId; - } - - public ExecutionBlockId getBlockId() { - return executionBlockId; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java deleted file mode 100644 index e620afa..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.master.container.TajoContainer; - -import java.util.Collection; - -public class LaunchTaskRunnersEvent extends TaskRunnerGroupEvent { - private final QueryContext queryContext; - private final String planJson; - - public LaunchTaskRunnersEvent(ExecutionBlockId executionBlockId, - Collection<TajoContainer> containers, QueryContext queryContext, - String planJson) { - super(EventType.CONTAINER_REMOTE_LAUNCH, executionBlockId, containers); - this.queryContext = queryContext; - this.planJson = planJson; - } - - public QueryContext getQueryContext() { - return queryContext; - } - - public String getPlanJson() { - return planJson; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java index 1b1d49e..3b04fc5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java @@ -23,13 +23,13 @@ import com.google.protobuf.RpcController; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.rm.Worker; -import org.apache.tajo.master.rm.WorkerResource; +import org.apache.tajo.master.rm.NodeStatus; +import org.apache.tajo.master.scheduler.event.ResourceReserveSchedulerEvent; import org.apache.tajo.rpc.AsyncRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto; @@ -37,7 +37,8 @@ import org.apache.tajo.util.NetUtils; import java.net.InetSocketAddress; import java.util.Collection; -import java.util.List; + +import static org.apache.tajo.ResourceProtos.*; public class QueryCoordinatorService extends AbstractService { private final static Log LOG = LogFactory.getLog(QueryCoordinatorService.class); @@ -59,30 +60,27 @@ public class QueryCoordinatorService extends AbstractService { } @Override - public void start() { + public void serviceStart() throws Exception { String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS); InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr); int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM); - try { - server = new AsyncRpcServer(QueryCoordinatorProtocol.class, masterHandler, initIsa, workerNum); - } catch (Exception e) { - LOG.error(e, e); - } + + server = new AsyncRpcServer(QueryCoordinatorProtocol.class, masterHandler, initIsa, workerNum); server.start(); bindAddress = NetUtils.getConnectAddress(server.getListenAddress()); this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress)); LOG.info("Instantiated TajoMasterService at " + this.bindAddress); - super.start(); + super.serviceStart(); } @Override - public void stop() { + public void serviceStop() throws Exception { if(server != null) { server.shutdown(); server = null; } - super.stop(); + super.serviceStop(); } public InetSocketAddress getBindAddress() { @@ -97,62 +95,47 @@ public class QueryCoordinatorService extends AbstractService { @Override public void heartbeat( RpcController controller, - TajoHeartbeat request, RpcCallback<QueryCoordinatorProtocol.TajoHeartbeatResponse> done) { + TajoHeartbeatRequest request, RpcCallback<TajoHeartbeatResponse> done) { if(LOG.isDebugEnabled()) { LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo())); } - QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand command = null; + TajoHeartbeatResponse.ResponseCommand command; QueryManager queryManager = context.getQueryJobManager(); command = queryManager.queryHeartbeat(request); - QueryCoordinatorProtocol.TajoHeartbeatResponse.Builder builder = QueryCoordinatorProtocol.TajoHeartbeatResponse.newBuilder(); + TajoHeartbeatResponse.Builder builder = TajoHeartbeatResponse.newBuilder(); builder.setHeartbeatResult(BOOL_TRUE); if(command != null) { builder.setResponseCommand(command); } - builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary()); done.run(builder.build()); } + /** + * Reserve a node resources to TajoMaster + */ @Override - public void allocateWorkerResources( - RpcController controller, - QueryCoordinatorProtocol.WorkerResourceAllocationRequest request, - RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> done) { - context.getResourceManager().allocateWorkerResources(request, done); - } - - @Override - public void releaseWorkerResource(RpcController controller, WorkerResourceReleaseRequest request, - RpcCallback<PrimitiveProtos.BoolProto> done) { - List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList(); - - for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) { - context.getResourceManager().releaseWorkerResource(eachContainer); - } - done.run(BOOL_TRUE); + public void reserveNodeResources(RpcController controller, NodeResourceRequest request, + RpcCallback<NodeResourceResponse> done) { + Dispatcher dispatcher = context.getResourceManager().getRMContext().getDispatcher(); + dispatcher.getEventHandler().handle(new ResourceReserveSchedulerEvent(request, done)); } + /** + * Get all worker connection information + */ @Override - public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request, - RpcCallback<WorkerResourcesRequest> done) { - - WorkerResourcesRequest.Builder builder = WorkerResourcesRequest.newBuilder(); - Collection<Worker> workers = context.getResourceManager().getWorkers().values(); - - for(Worker worker: workers) { - WorkerResource resource = worker.getResource(); - - WorkerResourceProto.Builder workerResource = WorkerResourceProto.newBuilder(); + public void getAllWorkers(RpcController controller, PrimitiveProtos.NullProto request, + RpcCallback<WorkerConnectionsResponse> done) { - workerResource.setConnectionInfo(worker.getConnectionInfo().getProto()); - workerResource.setMemoryMB(resource.getMemoryMB()); - workerResource.setDiskSlots(resource.getDiskSlots()); + WorkerConnectionsResponse.Builder builder = WorkerConnectionsResponse.newBuilder(); + Collection<NodeStatus> nodeStatuses = context.getResourceManager().getRMContext().getNodes().values(); - builder.addWorkerResources(workerResource); + for(NodeStatus nodeStatus : nodeStatuses) { + builder.addWorker(nodeStatus.getConnectionInfo().getProto()); } done.run(builder.build()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index ece42f7..e22663a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -22,24 +22,26 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringUtils; import org.apache.tajo.QueryId; +import org.apache.tajo.ResourceProtos.AllocationResourceProto; +import org.apache.tajo.ResourceProtos.QueryExecutionRequest; import org.apache.tajo.TajoProtos; -import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerAllocatedResource; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto; -import org.apache.tajo.master.rm.WorkerResourceManager; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.rm.TajoResourceManager; import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.rpc.*; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.session.Session; import org.apache.tajo.util.NetUtils; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -53,9 +55,9 @@ public class QueryInProgress { private LogicalRootNode plan; - private AtomicBoolean querySubmitted = new AtomicBoolean(false); + private volatile boolean querySubmitted; - private AtomicBoolean stopped = new AtomicBoolean(false); + private volatile boolean isStopped; private QueryInfo queryInfo; @@ -65,6 +67,8 @@ public class QueryInProgress { private QueryMasterProtocolService queryMasterRpcClient; + private AllocationResourceProto allocationResource; + private final Lock readLock; private final Lock writeLock; @@ -104,14 +108,16 @@ public class QueryInProgress { } public void stopProgress() { - if(stopped.getAndSet(true)) { + if (isStopped) { return; + } else { + isStopped = true; } LOG.info("========================================================="); LOG.info("Stop query:" + queryId); - masterContext.getResourceManager().releaseQueryMaster(queryId); + masterContext.getResourceManager().getScheduler().stopQuery(queryId); RpcClientManager.cleanup(queryMasterRpc); @@ -122,7 +128,13 @@ public class QueryInProgress { } } - public boolean startQueryMaster() { + /** + * Connect to QueryMaster and allocate QM resource. + * + * @param allocation QM resource + * @return If there is no available resource, It returns false + */ + protected boolean allocateToQueryMaster(AllocationResourceProto allocation) { try { writeLock.lockInterruptibly(); } catch (Exception e) { @@ -130,19 +142,29 @@ public class QueryInProgress { return false; } try { - LOG.info("Initializing QueryInProgress for QueryID=" + queryId); - WorkerResourceManager resourceManager = masterContext.getResourceManager(); - WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this); + TajoResourceManager resourceManager = masterContext.getResourceManager(); + WorkerConnectionInfo connectionInfo = + resourceManager.getRMContext().getNodes().get(allocation.getWorkerId()).getConnectionInfo(); + try { + if(queryMasterRpcClient == null) { + connectQueryMaster(connectionInfo); + } + + CallFuture<PrimitiveProtos.BoolProto> callFuture = new CallFuture<PrimitiveProtos.BoolProto>(); + queryMasterRpcClient.allocateQueryMaster(callFuture.getController(), allocation, callFuture); - // if no resource to allocate a query master - if(resource == null) { - throw new RuntimeException("No Available Resources for QueryMaster"); + if(!callFuture.get().getValue()) return false; + + } catch (ConnectException ce) { + return false; } - queryInfo.setQueryMaster(resource.getConnectionInfo().getHost()); - queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort()); - queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort()); - queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort()); + LOG.info("Initializing QueryInProgress for QueryID=" + queryId); + this.allocationResource = allocation; + this.queryInfo.setQueryMaster(connectionInfo.getHost()); + this.queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort()); + this.queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort()); + this.queryInfo.setQueryMasterInfoPort(connectionInfo.getHttpInfoPort()); return true; } catch (Exception e) { @@ -153,54 +175,58 @@ public class QueryInProgress { } } - private void connectQueryMaster() throws Exception { - InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort()); - LOG.info("Connect to QueryMaster:" + addr); - + private void connectQueryMaster(WorkerConnectionInfo connectionInfo) + throws NoSuchMethodException, ConnectException, ClassNotFoundException { RpcClientManager.cleanup(queryMasterRpc); + + InetSocketAddress addr = NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getQueryMasterPort()); + LOG.info("Try to connect to QueryMaster:" + addr); queryMasterRpc = RpcClientManager.getInstance().newClient(addr, QueryMasterProtocol.class, true); queryMasterRpcClient = queryMasterRpc.getStub(); } - public void submitQueryToMaster() { - if(querySubmitted.get()) { - return; + /** + * Launch the allocated query to QueryMaster + */ + public boolean submitToQueryMaster() { + if(querySubmitted) { + return false; } try { writeLock.lockInterruptibly(); } catch (Exception e) { LOG.error("Failed to lock by exception " + e.getMessage(), e); - return; + return false; } try { - if(queryMasterRpcClient == null) { - connectQueryMaster(); - } LOG.info("Call executeQuery to :" + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId); - QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder(); + QueryExecutionRequest.Builder builder = QueryExecutionRequest.newBuilder(); builder.setQueryId(queryId.getProto()) .setQueryContext(queryInfo.getQueryContext().getProto()) .setSession(session.getProto()) .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr())) - .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build()); + .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build()) + .setAllocation(allocationResource); CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>(); queryMasterRpcClient.executeQuery(callFuture.getController(), builder.build(), callFuture); callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); - querySubmitted.set(true); + querySubmitted = true; getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED); + return true; } catch (Exception e) { LOG.error("Failed to submit query " + queryId + " to master by exception " + e, e); catchException(e.getMessage(), e); } finally { writeLock.unlock(); } + return false; } public void catchException(String message, Throwable e) { @@ -222,10 +248,6 @@ public class QueryInProgress { } } - public boolean isStarted() { - return !stopped.get() && this.querySubmitted.get(); - } - public void heartbeat(QueryInfo queryInfo) { LOG.info("Received QueryMaster heartbeat:" + queryInfo);
