http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java index ff0399c..d4c62e4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java @@ -18,10 +18,10 @@ package org.apache.tajo.ws.rs.responses; -import org.apache.tajo.master.rm.Worker; -import org.apache.tajo.master.rm.WorkerResource; +import org.apache.tajo.master.rm.NodeStatus; import com.google.gson.annotations.Expose; +import org.apache.tajo.resource.NodeResource; public class WorkerResponse { @@ -35,34 +35,29 @@ public class WorkerResponse { @Expose private int usedMemoryMB; @Expose private int usedCpuCoreSlots; - @Expose private long maxHeap; - @Expose private long freeHeap; - @Expose private long totalHeap; - @Expose private int numRunningTasks; @Expose private int numQueryMasterTasks; @Expose private long lastHeartbeatTime; - public WorkerResponse(Worker worker) { - this(worker.getResource()); - - this.connectionInfo = new WorkerConnectionInfoResponse(worker.getConnectionInfo()); + public WorkerResponse(NodeStatus nodeStatus) { + this(nodeStatus.getTotalResourceCapability(), nodeStatus.getAvailableResource(), + nodeStatus.getNumRunningTasks(), nodeStatus.getNumRunningQueryMaster()); + + this.connectionInfo = new WorkerConnectionInfoResponse(nodeStatus.getConnectionInfo()); - this.lastHeartbeatTime = worker.getLastHeartbeatTime(); + this.lastHeartbeatTime = nodeStatus.getLastHeartbeatTime(); } - - private WorkerResponse(WorkerResource resource) { - this.cpuCoreSlots = resource.getCpuCoreSlots(); - this.memoryMB = resource.getMemoryMB(); - this.usedDiskSlots = resource.getUsedDiskSlots(); - this.usedMemoryMB = resource.getUsedMemoryMB(); - this.usedCpuCoreSlots = resource.getUsedCpuCoreSlots(); - this.maxHeap = resource.getMaxHeap(); - this.freeHeap = resource.getFreeHeap(); - this.totalHeap = resource.getTotalHeap(); - this.numRunningTasks = resource.getNumRunningTasks(); - this.numQueryMasterTasks = resource.getNumQueryMasterTasks(); + + private WorkerResponse(NodeResource total, NodeResource available, int numRunningTasks, int numQueryMasterTasks) { + this.cpuCoreSlots = total.getVirtualCores(); + this.memoryMB = total.getMemory(); + this.diskSlots = total.getDisks(); + this.usedDiskSlots = available.getDisks(); + this.usedMemoryMB = available.getMemory(); + this.usedCpuCoreSlots = available.getVirtualCores(); + this.numRunningTasks = numRunningTasks; + this.numQueryMasterTasks = numQueryMasterTasks; } public WorkerConnectionInfoResponse getConnectionInfo() { @@ -89,18 +84,6 @@ public class WorkerResponse { return usedCpuCoreSlots; } - public long getMaxHeap() { - return maxHeap; - } - - public long getFreeHeap() { - return freeHeap; - } - - public long getTotalHeap() { - return totalHeap; - } - public int getNumRunningTasks() { return numRunningTasks; }
http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/proto/ContainerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/ContainerProtocol.proto b/tajo-core/src/main/proto/ContainerProtocol.proto deleted file mode 100644 index df7a450..0000000 --- a/tajo-core/src/main/proto/ContainerProtocol.proto +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * These .proto interfaces are public and stable. - * Please see http://wiki.apache.org/hadoop/Compatibility - * for what changes are allowed for a *stable* .proto interface. - */ - -option java_package = "org.apache.tajo.ipc"; -option java_outer_classname = "ContainerProtocol"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; - -package hadoop.yarn; - -import "Security.proto"; -import "yarn_protos.proto"; - -message TajoContainerIdProto { - optional ApplicationIdProto app_id = 1; - optional ApplicationAttemptIdProto app_attempt_id = 2; - optional int32 id = 3; -} - -message TajoContainerProto { - optional TajoContainerIdProto id = 1; - optional NodeIdProto nodeId = 2; - optional string node_http_address = 3; - optional ResourceProto resource = 4; - optional PriorityProto priority = 5; - optional hadoop.common.TokenProto container_token = 6; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto index 2440e2a..be04dc4 100644 --- a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto +++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto @@ -23,122 +23,13 @@ option java_outer_classname = "QueryCoordinatorProtocol"; option java_generic_services = true; option java_generate_equals_and_hash = true; -import "yarn_protos.proto"; -import "tajo_protos.proto"; -import "TajoIdProtos.proto"; -import "CatalogProtos.proto"; import "PrimitiveProtos.proto"; -import "ContainerProtocol.proto"; +import "ResourceProtos.proto"; package hadoop.yarn; -message ServerStatusProto { - message System { - required int32 availableProcessors = 1; - required int32 freeMemoryMB = 2; - required int32 maxMemoryMB = 3; - required int32 totalMemoryMB = 4; - } - message Disk { - required string absolutePath = 1; - required int64 totalSpace = 2; - required int64 freeSpace = 3; - required int64 usableSpace = 4; - } - - message JvmHeap { - required int64 maxHeap = 1; - required int64 totalHeap = 2; - required int64 freeHeap = 3; - } - - required System system = 1; - required float diskSlots = 2; - required int32 memoryResourceMB = 3; - repeated Disk disk = 4; - required int32 runningTaskNum = 5; - required JvmHeap jvmHeap = 6; -} - -message TajoHeartbeat { - required WorkerConnectionInfoProto connectionInfo = 1; - optional QueryIdProto queryId = 2; - optional QueryState state = 3; - optional TableDescProto resultDesc = 4; - optional string statusMessage = 5; - optional float queryProgress = 6; -} - -message TajoHeartbeatResponse { - message ResponseCommand { - required string command = 1; - repeated string params = 2; - } - required BoolProto heartbeatResult = 1; - required ClusterResourceSummary clusterResourceSummary = 2; - optional ResponseCommand responseCommand = 3; -} - -message ClusterResourceSummary { - required int32 numWorkers = 1; - required int32 totalDiskSlots = 2; - required int32 totalCpuCoreSlots = 3; - required int32 totalMemoryMB = 4; - - required int32 totalAvailableDiskSlots = 5; - required int32 totalAvailableCpuCoreSlots = 6; - required int32 totalAvailableMemoryMB = 7; -} - -enum ResourceRequestPriority { - MEMORY = 1; - DISK = 2; -} - -message WorkerResourceAllocationRequest { - required QueryIdProto queryId = 1; - required ResourceRequestPriority resourceRequestPriority = 2; - - required int32 numContainers = 3; - - required int32 maxMemoryMBPerContainer = 4; - required int32 minMemoryMBPerContainer = 5; - - required float maxDiskSlotPerContainer = 6; - required float minDiskSlotPerContainer = 7; -} - -message WorkerResourceProto { - required WorkerConnectionInfoProto connectionInfo = 1; - required int32 memoryMB = 2 ; - required float diskSlots = 3; -} - -message WorkerResourcesRequest { - repeated WorkerResourceProto workerResources = 1; -} - -message WorkerResourceReleaseRequest { - required ExecutionBlockIdProto executionBlockId = 1; - repeated TajoContainerIdProto containerIds = 2; -} - -message WorkerAllocatedResource { - required TajoContainerIdProto containerId = 1; - required WorkerConnectionInfoProto connectionInfo = 2; - - required int32 allocatedMemoryMB = 3; - required float allocatedDiskSlots = 4; -} - -message WorkerResourceAllocationResponse { - required QueryIdProto queryId = 1; - repeated WorkerAllocatedResource workerAllocatedResource = 2; -} - service QueryCoordinatorProtocolService { - rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse); - rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse); - rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto); - rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest); + rpc heartbeat(TajoHeartbeatRequest) returns (TajoHeartbeatResponse); + rpc reserveNodeResources(NodeResourceRequest) returns (NodeResourceResponse); + rpc getAllWorkers(NullProto) returns (WorkerConnectionsResponse); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/proto/QueryMasterProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto index 855c2c6..f783f06 100644 --- a/tajo-core/src/main/proto/QueryMasterProtocol.proto +++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto @@ -21,26 +21,23 @@ option java_outer_classname = "QueryMasterProtocol"; option java_generic_services = true; option java_generate_equals_and_hash = true; -import "yarn_protos.proto"; -import "tajo_protos.proto"; import "TajoIdProtos.proto"; -import "CatalogProtos.proto"; import "PrimitiveProtos.proto"; -import "TajoWorkerProtocol.proto"; -import "ContainerProtocol.proto"; +import "ResourceProtos.proto"; package hadoop.yarn; service QueryMasterProtocolService { //from Worker - rpc getTask(GetTaskRequestProto) returns (TaskRequestProto); rpc statusUpdate (TaskStatusProto) returns (NullProto); rpc ping (ExecutionBlockIdProto) returns (NullProto); rpc fatalError(TaskFatalErrorReport) returns (NullProto); rpc done (TaskCompletionReport) returns (NullProto); rpc doneExecutionBlock(ExecutionBlockReport) returns (NullProto); + rpc getExecutionBlockContext(ExecutionBlockContextRequest) returns (ExecutionBlockContextResponse); //from TajoMaster's QueryJobManager rpc killQuery(QueryIdProto) returns (NullProto); - rpc executeQuery(QueryExecutionRequestProto) returns (NullProto); + rpc executeQuery(QueryExecutionRequest) returns (NullProto); + rpc allocateQueryMaster(AllocationResourceProto) returns (BoolProto); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/proto/ResourceProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/ResourceProtos.proto b/tajo-core/src/main/proto/ResourceProtos.proto new file mode 100644 index 0000000..97bf05e --- /dev/null +++ b/tajo-core/src/main/proto/ResourceProtos.proto @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo"; +option java_outer_classname = "ResourceProtos"; +option java_generic_services = false; +option java_generate_equals_and_hash = true; + +import "tajo_protos.proto"; +import "TajoIdProtos.proto"; +import "CatalogProtos.proto"; +import "PrimitiveProtos.proto"; +import "Plan.proto"; + +enum ResponseCommand { + NORMAL = 1; //ping + MEMBERSHIP = 2; // request membership to worker node + ABORT_QUERY = 3; //query master failure + SHUTDOWN = 4; // black list +} + +//TODO add node health information +message NodeStatusProto { +} + +enum ResourceType { + LEAF = 1; + INTERMEDIATE = 2; + QUERYMASTER = 3; +} + +message AllocationResourceProto { + required int32 workerId = 1; + required NodeResourceProto resource = 2; +} + + +message ExecutionBlockListProto { + repeated ExecutionBlockIdProto executionBlockId = 1; +} + +message TaskAllocationProto { + required TaskRequestProto taskRequest = 1; + required NodeResourceProto resource = 2; +} + +message TaskRequestProto { + required string queryMasterHostAndPort = 1; + required TaskAttemptIdProto id = 2; + repeated FragmentProto fragments = 3; + required string outputTable = 4; + required bool clusteredOutput = 5; + required LogicalNodeTree plan = 6; + optional bool interQuery = 7 [default = false]; + repeated FetchProto fetches = 8; + optional KeyValueSetProto queryContext = 9; + optional DataChannelProto dataChannel = 10; + optional EnforcerProto enforcer = 11; +} + +message FetchProto { + required string host = 1; + required int32 port = 2; + required ShuffleType type = 3; + required ExecutionBlockIdProto executionBlockId = 4; + required int32 partitionId = 5; + required string name = 6; + optional string rangeParams = 7; + optional bool hasNext = 8 [default = false]; + + //repeated part + repeated int32 taskId = 9 [packed=true]; + repeated int32 attemptId = 10 [packed=true]; + + optional int64 offset = 11; + optional int64 length = 12; +} + +message TaskStatusProto { + required TaskAttemptIdProto id = 1; + required string workerName = 2; + required float progress = 3; + required TaskAttemptState state = 4; + optional StatSetProto stats = 5; + optional TableStatsProto inputStats = 6; + optional TableStatsProto resultStats = 7; + repeated ShuffleFileOutput shuffleFileOutputs = 8; +} + +message TaskCompletionReport { + required TaskAttemptIdProto id = 1; + optional StatSetProto stats = 2; + optional TableStatsProto inputStats = 3; + optional TableStatsProto resultStats = 4; + repeated ShuffleFileOutput shuffleFileOutputs = 5; +} + +message TaskFatalErrorReport { + required TaskAttemptIdProto id = 1; + optional string errorMessage = 2; + optional string errorTrace = 3; +} + +message FailureIntermediateProto { + required int64 pagePos = 1; + required int32 startRowNum = 2; + required int32 endRowNum = 3; +} + +message IntermediateEntryProto { + message PageProto { + required int64 pos = 1; + required int32 length = 2; + } + required ExecutionBlockIdProto ebId = 1; + required int32 taskId = 2; + required int32 attemptId = 3; + required int32 partId = 4; + required string host = 5; + required int64 volume = 6; + repeated PageProto pages = 7; + repeated FailureIntermediateProto failures = 8; +} + +message ExecutionBlockReport { + required ExecutionBlockIdProto ebId = 1; + required bool reportSuccess = 2; + optional string reportErrorMessage = 3; + required int32 succeededTasks = 4; + repeated IntermediateEntryProto intermediateEntries = 5; +} + +// deprecated +message TaskResponseProto { + required string id = 1; + required QueryState status = 2; +} + +message StatusReportProto { + required int64 timestamp = 1; + required string serverName = 2; + repeated TaskStatusProto status = 3; + repeated TaskAttemptIdProto pings = 4; +} + +message CommandRequestProto { + repeated Command command = 1; +} + +message CommandResponseProto { +} + +message Command { + required TaskAttemptIdProto id = 1; + required CommandType type = 2; +} + +enum CommandType { + PREPARE = 0; + LAUNCH = 1; + STOP = 2; + FINALIZE = 3; +} + +message ShuffleFileOutput { + required int32 partId = 1; + optional string fileName = 2; + optional int64 volume = 3; +} + +message SessionProto { + required string session_id = 1; + required string username = 2; + required string current_database = 3; + required int64 last_access_time = 4; + required KeyValueSetProto variables = 5; +} + +message NodeHeartbeatRequest { + required int32 workerId = 1; + optional NodeResourceProto totalResource = 2; + optional NodeResourceProto availableResource = 3; + optional int32 runningTasks = 4; + optional int32 runningQueryMasters = 5; + optional WorkerConnectionInfoProto connectionInfo = 6; + optional NodeStatusProto status = 7; +} + +message NodeHeartbeatResponse { + required ResponseCommand command = 1 [default = NORMAL]; + optional int32 heartBeatInterval = 2; + repeated QueryIdProto queryId = 3; +} + +//deplecated +message TajoHeartbeatRequest { + required WorkerConnectionInfoProto connectionInfo = 1; + optional QueryIdProto queryId = 2; + optional QueryState state = 3; + optional TableDescProto resultDesc = 4; + optional string statusMessage = 5; + optional float queryProgress = 6; +} + +//deplecated +message TajoHeartbeatResponse { + message ResponseCommand { + required string command = 1; + repeated string params = 2; + } + required BoolProto heartbeatResult = 1; + optional ResponseCommand responseCommand = 3; +} + +message WorkerConnectionsResponse { + repeated WorkerConnectionInfoProto worker = 1; +} + +message NodeResourceRequest { + optional string queue = 1; + required string userId = 2; + required ResourceType type = 3; + required int32 priority = 4; + required QueryIdProto queryId = 5; + required int32 numContainers = 6; + required NodeResourceProto capacity = 7; + required int32 runningTasks = 8; + repeated int32 candidateNodes = 9; +} + +message NodeResourceResponse { + required QueryIdProto queryId = 1; + repeated AllocationResourceProto resource = 2; +} + +message ExecutionBlockContextRequest { + required ExecutionBlockIdProto executionBlockId = 1; + required WorkerConnectionInfoProto worker = 2; +} + +message ExecutionBlockContextResponse { + required ExecutionBlockIdProto executionBlockId = 1; + optional string queryOutputPath = 2; + + required KeyValueSetProto queryContext = 3; + required string planJson = 4; + required ShuffleType shuffleType = 5; +} + +message StopExecutionBlockRequest { + required ExecutionBlockIdProto executionBlockId = 1; + optional ExecutionBlockListProto cleanupList = 2; +} + +message BatchAllocationRequest { + required ExecutionBlockIdProto executionBlockId = 1; + repeated TaskAllocationProto taskRequest = 2; +} + +message BatchAllocationResponse { + repeated TaskAllocationProto cancellationTask = 1; +} + +message QueryExecutionRequest { + required QueryIdProto queryId = 1; + required SessionProto session = 2; + required KeyValueSetProto queryContext = 3; + required StringProto exprInJson = 4; + optional StringProto logicalPlanJson = 5; + required AllocationResourceProto allocation = 6; +} + + +//Task history +message FetcherHistoryProto { + required int64 startTime = 1; + optional int64 finishTime = 2; + required FetcherState state = 3; + required int64 fileLength = 4; + required int32 messageReceivedCount = 5; +} + +message TaskHistoryProto { + required TaskAttemptIdProto taskAttemptId = 1; + required TaskAttemptState state = 2; + required float progress = 3; + required int64 startTime = 4; + required int64 finishTime = 5; + required TableStatsProto inputStats = 6; + optional TableStatsProto outputStats = 7; + optional string outputPath = 8; + optional string workingPath = 9; + optional int32 finishedFetchCount = 10; + optional int32 totalFetchCount = 11; + repeated FetcherHistoryProto fetcherHistories = 12; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/proto/ResourceTrackerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto index dffd8c9..e0ddac2 100644 --- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto +++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto @@ -22,45 +22,10 @@ option java_outer_classname = "TajoResourceTrackerProtocol"; option java_generic_services = true; option java_generate_equals_and_hash = true; -import "QueryCoordinatorProtocol.proto"; -import "ContainerProtocol.proto"; -import "tajo_protos.proto"; -import "TajoIdProtos.proto"; +import "ResourceProtos.proto"; package hadoop.yarn; -// deprecated -message NodeHeartbeat { - required WorkerConnectionInfoProto connectionInfo = 1; - optional ServerStatusProto serverStatus = 2; - optional string statusMessage = 3; -} - -message NodeHeartbeatRequestProto { - required int32 workerId = 1; - optional NodeResourceProto totalResource = 2; - optional NodeResourceProto availableResource = 3; - optional WorkerConnectionInfoProto connectionInfo = 4; - optional NodeStatusProto status = 5; -} - -message NodeHeartbeatResponseProto { - required ResponseCommand command = 1 [default = NORMAL]; - repeated QueryIdProto queryId = 2; -} - -enum ResponseCommand { - NORMAL = 1; //ping - MEMBERSHIP = 2; // request membership to worker node - ABORT_QUERY = 3; //query master failure - SHUTDOWN = 4; // black list -} - -//TODO add node health information -message NodeStatusProto { -} - service TajoResourceTrackerProtocolService { - rpc heartbeat(NodeHeartbeat) returns (TajoHeartbeatResponse); - rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto); + rpc nodeHeartbeat(NodeHeartbeatRequest) returns (NodeHeartbeatResponse); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index 7cc4171..8667702 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -22,357 +22,19 @@ option java_outer_classname = "TajoWorkerProtocol"; option java_generic_services = true; option java_generate_equals_and_hash = true; -import "yarn_protos.proto"; -import "tajo_protos.proto"; + import "TajoIdProtos.proto"; -import "CatalogProtos.proto"; import "PrimitiveProtos.proto"; -import "Plan.proto"; -import "ContainerProtocol.proto"; +import "ResourceProtos.proto"; package hadoop.yarn; -message SessionProto { - required string session_id = 1; - required string username = 2; - required string current_database = 3; - required int64 last_access_time = 4; - required KeyValueSetProto variables = 5; -} - -message TaskStatusProto { - required TaskAttemptIdProto id = 1; - required string workerName = 2; - required float progress = 3; - required TaskAttemptState state = 4; - optional StatSetProto stats = 5; - optional TableStatsProto inputStats = 6; - optional TableStatsProto resultStats = 7; - repeated ShuffleFileOutput shuffleFileOutputs = 8; -} - -message TaskCompletionReport { - required TaskAttemptIdProto id = 1; - optional StatSetProto stats = 2; - optional TableStatsProto inputStats = 3; - optional TableStatsProto resultStats = 4; - repeated ShuffleFileOutput shuffleFileOutputs = 5; -} - -message TaskFatalErrorReport { - required TaskAttemptIdProto id = 1; - optional string errorMessage = 2; - optional string errorTrace = 3; -} - -message TaskRequestProto { - required TaskAttemptIdProto id = 1; - repeated FragmentProto fragments = 2; - required string outputTable = 3; - required bool clusteredOutput = 4; - required LogicalNodeTree plan = 5; - optional bool interQuery = 6 [default = false]; - repeated FetchProto fetches = 7; - optional bool shouldDie = 8; - optional KeyValueSetProto queryContext = 9; - optional DataChannelProto dataChannel = 10; - optional EnforcerProto enforcer = 11; -} - -message FetchProto { - required string host = 1; - required int32 port = 2; - required ShuffleType type = 3; - required ExecutionBlockIdProto executionBlockId = 4; - required int32 partitionId = 5; - required string name = 6; - optional string rangeParams = 7; - optional bool hasNext = 8 [default = false]; - - //repeated part - repeated int32 taskId = 9 [packed=true]; - repeated int32 attemptId = 10 [packed=true]; - - optional int64 offset = 11; - optional int64 length = 12; -} - -message FailureIntermediateProto { - required int64 pagePos = 1; - required int32 startRowNum = 2; - required int32 endRowNum = 3; -} - -message IntermediateEntryProto { - message PageProto { - required int64 pos = 1; - required int32 length = 2; - } - required ExecutionBlockIdProto ebId = 1; - required int32 taskId = 2; - required int32 attemptId = 3; - required int32 partId = 4; - required string host = 5; - required int64 volume = 6; - repeated PageProto pages = 7; - repeated FailureIntermediateProto failures = 8; -} - -message ExecutionBlockReport { - required ExecutionBlockIdProto ebId = 1; - required bool reportSuccess = 2; - optional string reportErrorMessage = 3; - required int32 succeededTasks = 4; - repeated IntermediateEntryProto intermediateEntries = 5; -} - -// deprecated -message TaskResponseProto { - required string id = 1; - required QueryState status = 2; -} - -message StatusReportProto { - required int64 timestamp = 1; - required string serverName = 2; - repeated TaskStatusProto status = 3; - repeated TaskAttemptIdProto pings = 4; -} - -message CommandRequestProto { - repeated Command command = 1; -} - -message CommandResponseProto { -} - -message Command { - required TaskAttemptIdProto id = 1; - required CommandType type = 2; -} - -enum CommandType { - PREPARE = 0; - LAUNCH = 1; - STOP = 2; - FINALIZE = 3; -} - -message ShuffleFileOutput { - required int32 partId = 1; - optional string fileName = 2; - optional int64 volume = 3; -} - -message QueryExecutionRequestProto { - required QueryIdProto queryId = 1; - required SessionProto session = 2; - required KeyValueSetProto queryContext = 3; - required StringProto exprInJson = 5; - optional StringProto logicalPlanJson = 6; -} - -// deprecated -message GetTaskRequestProto { - required int32 workerId = 1; - required TajoContainerIdProto containerId = 2; - required ExecutionBlockIdProto executionBlockId = 3; -} - -message DataChannelProto { - required ExecutionBlockIdProto srcId = 1; - required ExecutionBlockIdProto targetId = 2; - - required TransmitType transmitType = 3 [default = PULL_TRANSMIT]; - required ShuffleType shuffleType = 4; - - optional SchemaProto schema = 5; - - repeated ColumnProto shuffleKeys = 7; - optional int32 numOutputs = 9 [default = 1]; - - optional string storeType = 10; -} - -message RunExecutionBlockRequestProto { - required ExecutionBlockIdProto executionBlockId = 1; - required WorkerConnectionInfoProto queryMaster = 2; - required string nodeId = 3; - required string containerId = 4; - optional string queryOutputPath = 5; - - required KeyValueSetProto queryContext = 6; - required string planJson = 7; - required ShuffleType shuffleType = 8; -} - -message ExecutionBlockListProto { - repeated ExecutionBlockIdProto executionBlockId = 1; -} - -message TaskAllocationRequestProto { - required TaskRequestProto taskRequest = 1; - required NodeResourceProto resource = 2; -} - -message BatchAllocationRequestProto { - required ExecutionBlockIdProto executionBlockId = 1; - repeated TaskAllocationRequestProto taskRequest = 2; - optional RunExecutionBlockRequestProto executionBlockRequest = 3; //TODO should be refactored -} - -message BatchAllocationResponseProto { - repeated TaskAllocationRequestProto cancellationTask = 2; -} - service TajoWorkerProtocolService { rpc ping (TaskAttemptIdProto) returns (BoolProto); // from QueryMaster(Worker) - rpc startExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto); - rpc stopExecutionBlock(ExecutionBlockIdProto) returns (BoolProto); + rpc allocateTasks(BatchAllocationRequest) returns (BatchAllocationResponse); + rpc stopExecutionBlock(StopExecutionBlockRequest) returns (BoolProto); rpc killTaskAttempt(TaskAttemptIdProto) returns (BoolProto); - rpc cleanup(QueryIdProto) returns (BoolProto); - rpc cleanupExecutionBlocks(ExecutionBlockListProto) returns (BoolProto); -} - -message EnforceProperty { - enum EnforceType { - SORTED_INPUT = 0; - OUTPUT_DISTINCT = 1; - GROUP_BY = 2; - JOIN = 3; - SORT = 4; - BROADCAST = 5; - COLUMN_PARTITION = 6; - DISTINCT_GROUP_BY = 7; - } - - // Identifies which field is filled in. - required EnforceType type = 1; - - // One of the following will be filled in. - optional SortedInputEnforce sortedInput = 2; - optional OutputDistinctEnforce outputDistinct = 3; - optional GroupbyEnforce groupby = 4; - optional JoinEnforce join = 5; - optional SortEnforce sort = 6; - optional BroadcastEnforce broadcast = 7; - optional ColumnPartitionEnforcer columnPartition = 8; - optional DistinctGroupbyEnforcer distinct = 9; -} - -message SortedInputEnforce { - required string tableName = 1; - repeated SortSpecProto sortSpecs = 2; -} - -message OutputDistinctEnforce { -} - -message JoinEnforce { - enum JoinAlgorithm { - NESTED_LOOP_JOIN = 0; - BLOCK_NESTED_LOOP_JOIN = 1; - IN_MEMORY_HASH_JOIN = 2; - HYBRID_HASH_JOIN = 3; - MERGE_JOIN = 4; - } - - required int32 nodeId = 1; - required JoinAlgorithm algorithm = 2; -} - -message GroupbyEnforce { - enum GroupbyAlgorithm { - HASH_AGGREGATION = 0; - SORT_AGGREGATION = 1; - } - - required int32 nodeId = 1; - required GroupbyAlgorithm algorithm = 2; - repeated SortSpecProto sortSpecs = 3; -} - -message SortEnforce { - enum SortAlgorithm { - IN_MEMORY_SORT = 0; - MERGE_SORT = 1; - } - - required int32 nodeId = 1; - required SortAlgorithm algorithm = 2; -} - -message BroadcastEnforce { - required string tableName = 1; -} - -message ColumnPartitionEnforcer { - enum ColumnPartitionAlgorithm { - HASH_PARTITION = 0; - SORT_PARTITION = 1; - } - - required int32 nodeId = 1; - required ColumnPartitionAlgorithm algorithm = 2; -} - -message DistinctGroupbyEnforcer { - enum DistinctAggregationAlgorithm { - HASH_AGGREGATION = 0; - SORT_AGGREGATION = 1; - } - - enum MultipleAggregationStage { - FIRST_STAGE = 0; - SECOND_STAGE = 1; - THRID_STAGE = 3; - } - - message SortSpecArray { - required int32 nodeId = 1; - repeated SortSpecProto sortSpecs = 2; - } - required int32 nodeId = 1; - required DistinctAggregationAlgorithm algorithm = 2; - repeated SortSpecArray sortSpecArrays = 3; - required bool isMultipleAggregation = 4 [default = false]; - optional MultipleAggregationStage multipleAggregationStage = 5; -} - -message EnforcerProto { - repeated EnforceProperty properties = 1; -} - -message FetcherHistoryProto { - required int64 startTime = 1; - optional int64 finishTime = 2; - required FetcherState state = 3; - required int64 fileLength = 4; - required int32 messageReceivedCount = 5; -} - -message TaskHistoryProto { - required TaskAttemptIdProto taskAttemptId = 1; - required TaskAttemptState state = 2; - required float progress = 3; - required int64 startTime = 4; - required int64 finishTime = 5; - required TableStatsProto inputStats = 6; - optional TableStatsProto outputStats = 7; - optional string outputPath = 8; - optional string workingPath = 9; - optional int32 finishedFetchCount = 10; - optional int32 totalFetchCount = 11; - repeated FetcherHistoryProto fetcherHistories = 12; -} - -message TaskRunnerHistoryProto { - required ExecutionBlockIdProto executionBlockId = 1; - required string state = 2; - required string containerId = 3; - optional int64 startTime = 4; - optional int64 finishTime = 5; - repeated TaskHistoryProto taskHistories = 6; + rpc stopQuery(QueryIdProto) returns (BoolProto); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/admin/cluster.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp index 97ca698..7aad8d4 100644 --- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp +++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp @@ -23,14 +23,12 @@ <%@ page import="org.apache.tajo.master.cluster.WorkerConnectionInfo" %> <%@ page import="org.apache.tajo.service.ServiceTracker" %> <%@ page import="org.apache.tajo.service.TajoMasterInfo" %> -<%@ page import="org.apache.tajo.master.rm.Worker" %> -<%@ page import="org.apache.tajo.master.rm.WorkerResource" %> -<%@ page import="org.apache.tajo.master.rm.WorkerState" %> +<%@ page import="org.apache.tajo.master.rm.NodeStatus" %> +<%@ page import="org.apache.tajo.master.rm.NodeState" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.TUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="java.util.*" %> -<%@ page import="org.apache.tajo.service.ServiceTracker" %> <%@ page import="java.net.InetSocketAddress" %> <% @@ -40,37 +38,37 @@ InetSocketAddress socketAddress = new InetSocketAddress(masterName[0], Integer.parseInt(masterName[1])); String masterLabel = socketAddress.getAddress().getHostName()+ ":" + socketAddress.getPort(); - Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers(); - List<Integer> wokerKeys = new ArrayList<Integer>(workers.keySet()); + Map<Integer, NodeStatus> nodes = master.getContext().getResourceManager().getNodes(); + List<Integer> wokerKeys = new ArrayList<Integer>(nodes.keySet()); Collections.sort(wokerKeys); int runningQueryMasterTasks = 0; - Set<Worker> liveWorkers = new TreeSet<Worker>(); - Set<Worker> deadWorkers = new TreeSet<Worker>(); - Set<Worker> decommissionWorkers = new TreeSet<Worker>(); + Set<NodeStatus> liveNodes = new TreeSet<NodeStatus>(); + Set<NodeStatus> deadNodes = new TreeSet<NodeStatus>(); + Set<NodeStatus> decommissionNodes = new TreeSet<NodeStatus>(); - Set<Worker> liveQueryMasters = new TreeSet<Worker>(); - Set<Worker> deadQueryMasters = new TreeSet<Worker>(); + Set<NodeStatus> liveQueryMasters = new TreeSet<NodeStatus>(); + Set<NodeStatus> deadQueryMasters = new TreeSet<NodeStatus>(); - for(Worker eachWorker: workers.values()) { - liveQueryMasters.add(eachWorker); - liveWorkers.add(eachWorker); - runningQueryMasterTasks += eachWorker.getResource().getNumQueryMasterTasks(); + for(NodeStatus eachNode: nodes.values()) { + liveQueryMasters.add(eachNode); + liveNodes.add(eachNode); + runningQueryMasterTasks += eachNode.getNumRunningQueryMaster(); } - for (Worker inactiveWorker : master.getContext().getResourceManager().getInactiveWorkers().values()) { - WorkerState state = inactiveWorker.getState(); + for (NodeStatus inactiveNode : master.getContext().getResourceManager().getInactiveNodes().values()) { + NodeState state = inactiveNode.getState(); - if (state == WorkerState.LOST) { - deadQueryMasters.add(inactiveWorker); - deadWorkers.add(inactiveWorker); - } else if (state == WorkerState.DECOMMISSIONED) { - decommissionWorkers.add(inactiveWorker); + if (state == NodeState.LOST) { + deadQueryMasters.add(inactiveNode); + deadNodes.add(inactiveNode); + } else if (state == NodeState.DECOMMISSIONED) { + decommissionNodes.add(inactiveNode); } } - String deadWorkersHtml = deadWorkers.isEmpty() ? "0": "<font color='red'>" + deadWorkers.size() + "</font>"; + String deadNodesHtml = deadNodes.isEmpty() ? "0": "<font color='red'>" + deadNodes.size() + "</font>"; String deadQueryMastersHtml = deadQueryMasters.isEmpty() ? "0": "<font color='red'>" + deadQueryMasters.size() + "</font>"; ServiceTracker haService = master.getContext().getHAService(); @@ -169,12 +167,11 @@ } else { %> <table width="100%" class="border_table" border="1"> - <tr><th>No</th><th>QueryMaster</th><th>Client Port</th><th>Running Query</th><th>Heap(free/total/max)</th><th>Heartbeat</th><th>Status</th></tr> + <tr><th>No</th><th>QueryMaster</th><th>Client Port</th><th>Running Query</th><th>Heartbeat</th><th>Status</th></tr> <% int no = 1; - for(Worker queryMaster: liveQueryMasters) { - WorkerResource resource = queryMaster.getResource(); + for(NodeStatus queryMaster: liveQueryMasters) { WorkerConnectionInfo connectionInfo = queryMaster.getConnectionInfo(); String queryMasterHttp = "http://" + connectionInfo.getHost() + ":" + connectionInfo.getHttpInfoPort() + "/index.jsp"; @@ -183,8 +180,7 @@ <td width='30' align='right'><%=no++%></td> <td><a href='<%=queryMasterHttp%>'><%=connectionInfo.getHost() + ":" + connectionInfo.getQueryMasterPort()%></a></td> <td width='100' align='center'><%=connectionInfo.getClientPort()%></td> - <td width='200' align='right'><%=resource.getNumQueryMasterTasks()%></td> - <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td> + <td width='200' align='right'><%=queryMaster.getNumRunningQueryMaster()%></td> <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeatTime(), System.currentTimeMillis())%></td> <td width='100' align='center'><%=queryMaster.getState()%></td> </tr> @@ -207,7 +203,7 @@ <tr><th>No</th><th>QueryMaster</th> <% int no = 1; - for(Worker queryMaster: deadQueryMasters) { + for(NodeStatus queryMaster: deadQueryMasters) { %> <tr> <td width='30' align='right'><%=no++%></td> @@ -223,34 +219,32 @@ %> <hr/> - <h2>Worker</h2> - <div>Live:<%=liveWorkers.size()%>, Dead: <%=deadWorkersHtml%></div> + <h2>Node</h2> + <div>Live:<%=liveNodes.size()%>, Dead: <%=deadNodesHtml%></div> <hr/> - <h3>Live Workers</h3> + <h3>Live Nodes</h3> <% - if(liveWorkers.isEmpty()) { - out.write("No Live Workers\n"); + if(liveNodes.isEmpty()) { + out.write("No Live Nodes\n"); } else { %> <table width="100%" class="border_table" border="1"> - <tr><th>No</th><th>Worker</th><th>PullServer<br/>Port</th><th>Running Tasks</th><th>Memory Resource<br/>(used/total)</th><th>Disk Resource<br/>(used/total)</th><th>Heap<br/>(free/total/max)</th><th>Heartbeat</th><th>Status</th></tr> + <tr><th>No</th><th>Node</th><th>PullServer<br/>Port</th><th>Running Tasks</th><th>Available</th><th>Total</th><th>Heartbeat</th><th>Status</th></tr> <% int no = 1; - for(Worker worker: liveWorkers) { - WorkerResource resource = worker.getResource(); - WorkerConnectionInfo connectionInfo = worker.getConnectionInfo(); - String workerHttp = "http://" + connectionInfo.getHost() + ":" + connectionInfo.getHttpInfoPort() + "/index.jsp"; + for(NodeStatus node: liveNodes) { + WorkerConnectionInfo connectionInfo = node.getConnectionInfo(); + String nodeHttp = "http://" + connectionInfo.getHost() + ":" + connectionInfo.getHttpInfoPort() + "/index.jsp"; %> <tr> <td width='30' align='right'><%=no++%></td> - <td><a href='<%=workerHttp%>'><%=connectionInfo.getHostAndPeerRpcPort()%></a></td> + <td><a href='<%=nodeHttp%>'><%=connectionInfo.getHostAndPeerRpcPort()%></a></td> <td width='80' align='center'><%=connectionInfo.getPullServerPort()%></td> - <td width='100' align='right'><%=resource.getNumRunningTasks()%></td> - <td width='150' align='center'><%=resource.getUsedMemoryMB()%>/<%=resource.getMemoryMB()%></td> - <td width='100' align='center'><%=resource.getUsedDiskSlots()%>/<%=resource.getDiskSlots()%></td> - <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td> - <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeatTime(), System.currentTimeMillis())%></td> - <td width='100' align='center'><%=worker.getState()%></td> + <td width='100' align='right'><%=node.getNumRunningTasks()%></td> + <td width='150' align='center'><%=node.getAvailableResource()%></td> + <td width='150' align='center'><%=node.getTotalResourceCapability()%></td> + <td width='100' align='right'><%=JSPUtil.getElapsedTime(node.getLastHeartbeatTime(), System.currentTimeMillis())%></td> + <td width='100' align='center'><%=node.getState()%></td> </tr> <% } //end fo for @@ -263,25 +257,24 @@ <p/> <hr/> <p/> - <h3>Dead Workers</h3> + <h3>Dead Nodes</h3> <% - if(deadWorkers.isEmpty()) { + if(deadNodes.isEmpty()) { %> - No Dead Workers + No Dead Nodes <% } else { %> <table width="300" class="border_table" border="1"> - <tr><th>No</th><th>Worker</th></tr> + <tr><th>No</th><th>Node</th></tr> <% int no = 1; - for(Worker worker: deadWorkers) { - WorkerResource resource = worker.getResource(); + for(NodeStatus node: deadNodes) { %> <tr> <td width='30' align='right'><%=no++%></td> - <td><%=worker.getConnectionInfo().getHostAndPeerRpcPort()%></td> + <td><%=node.getConnectionInfo().getHostAndPeerRpcPort()%></td> </tr> <% } //end fo for http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/admin/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp index bd84283..96facc5 100644 --- a/tajo-core/src/main/resources/webapps/admin/index.jsp +++ b/tajo-core/src/main/resources/webapps/admin/index.jsp @@ -21,20 +21,19 @@ <%@ page import="org.apache.hadoop.fs.FileSystem" %> <%@ page import="org.apache.tajo.conf.TajoConf" %> -<%@ page import="org.apache.tajo.ipc.QueryCoordinatorProtocol" %> <%@ page import="org.apache.tajo.master.TajoMaster" %> -<%@ page import="org.apache.tajo.master.rm.Worker" %> -<%@ page import="org.apache.tajo.master.rm.WorkerState" %> <%@ page import="org.apache.tajo.service.ServiceTracker" %> <%@ page import="org.apache.tajo.service.TajoMasterInfo" %> +<%@ page import="org.apache.tajo.master.rm.NodeStatus" %> +<%@ page import="org.apache.tajo.master.rm.NodeState" %> <%@ page import="org.apache.tajo.storage.TablespaceManager" %> <%@ page import="org.apache.tajo.storage.Tablespace" %> <%@ page import="org.apache.tajo.util.NetUtils" %> <%@ page import="org.apache.tajo.util.TUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> +<%@ page import="java.util.List" %> <%@ page import="java.net.InetSocketAddress" %> <%@ page import="java.util.Date" %> -<%@ page import="java.util.List" %> <%@ page import="java.util.Map" %> <% @@ -44,8 +43,8 @@ InetSocketAddress socketAddress = new InetSocketAddress(masterName[0], Integer.parseInt(masterName[1])); String masterLabel = socketAddress.getAddress().getHostName()+ ":" + socketAddress.getPort(); - Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers(); - Map<Integer, Worker> inactiveWorkers = master.getContext().getResourceManager().getInactiveWorkers(); + Map<Integer, NodeStatus> workers = master.getContext().getResourceManager().getNodes(); + Map<Integer, NodeStatus> inactiveWorkers = master.getContext().getResourceManager().getInactiveNodes(); int numWorkers = 0; int numLiveWorkers = 0; @@ -58,24 +57,21 @@ int runningQueryMasterTask = 0; - QueryCoordinatorProtocol.ClusterResourceSummary clusterResourceSummary = - master.getContext().getResourceManager().getClusterResourceSummary(); - - for(Worker eachWorker: workers.values()) { + for(NodeStatus eachWorker: workers.values()) { numQueryMasters++; numLiveQueryMasters++; - runningQueryMasterTask += eachWorker.getResource().getNumQueryMasterTasks(); + runningQueryMasterTask += eachWorker.getNumRunningQueryMaster(); numWorkers++; numLiveWorkers++; } - for (Worker eachWorker : inactiveWorkers.values()) { - if (eachWorker.getState() == WorkerState.LOST) { + for (NodeStatus eachWorker : inactiveWorkers.values()) { + if (eachWorker.getState() == NodeState.LOST) { numQueryMasters++; numDeadQueryMasters++; numWorkers++; numDeadWorkers++; - } else if(eachWorker.getState() == WorkerState.DECOMMISSIONED) { + } else if(eachWorker.getState() == NodeState.DECOMMISSIONED) { numDecommissionWorkers++; } } @@ -130,8 +126,8 @@ <tr><td width='150'>System dir:</td><td><%=TajoConf.getSystemDir(master.getContext().getConf())%></td></tr> <tr><td width='150'>Warehouse dir:</td><td><%=TajoConf.getWarehouseDir(master.getContext().getConf())%></td></tr> <tr><td width='150'>Staging dir:</td><td><%=TajoConf.getDefaultRootStagingDir(master.getContext().getConf())%></td></tr> - <tr><td width='150'>Client Service:</td><td><%=NetUtils.normalizeInetSocketAddress(master.getTajoMasterClientService().getBindAddress())%></td></tr> - <tr><td width='150'>Catalog Service:</td><td><%=master.getCatalogServer().getCatalogServerName()%></td></tr> + <tr><td width='150'>Client Service:</td><td><%=master.getTajoMasterClientService().getBindAddress().getHostName() + ":" + master.getTajoMasterClientService().getBindAddress().getPort()%></td></tr> + <tr><td width='150'>Catalog Service:</td><td><%=master.getCatalogServer().getBindAddress().getHostName() + ":" + master.getCatalogServer().getBindAddress().getPort()%></td></tr> <tr><td width='150'>Heap(Free/Total/Max): </td><td><%=Runtime.getRuntime().freeMemory()/1024/1024%> MB / <%=Runtime.getRuntime().totalMemory()/1024/1024%> MB / <%=Runtime.getRuntime().maxMemory()/1024/1024%> MB</td> <tr><td width='150'>Configuration:</td><td><a href='conf.jsp'>detail...</a></td></tr> <tr><td width='150'>Environment:</td><td><a href='env.jsp'>detail...</a></td></tr> @@ -149,7 +145,7 @@ <hr/> <h3>Cluster Summary</h3> <table width="100%" class="border_table" border="1"> - <tr><th>Type</th><th>Total</th><th>Live</th><th>Dead</th><th>Running Master</th><th>Memory Resource<br/>(used/total)</th><th>Disk Resource<br/>(used/total)</th></tr> + <tr><th>Type</th><th>Total</th><th>Live</th><th>Dead</th><th>Running Master</th><th>Available</th><th>Total</th></tr> <tr> <td><a href='cluster.jsp'>Query Master</a></td> <td align='right'><%=numQueryMasters%></td> @@ -165,8 +161,8 @@ <td align='right'><%=numLiveWorkers%></td> <td align='right'><%=numDeadWorkersHtml%></td> <td align='right'>-</td> - <td align='center'><%=clusterResourceSummary.getTotalMemoryMB() - clusterResourceSummary.getTotalAvailableMemoryMB()%>/<%=clusterResourceSummary.getTotalMemoryMB()%></td> - <td align='center'><%=clusterResourceSummary.getTotalDiskSlots() - clusterResourceSummary.getTotalAvailableDiskSlots()%>/<%=clusterResourceSummary.getTotalDiskSlots()%></td> + <td align='center'><%=master.getContext().getResourceManager().getScheduler().getClusterResource()%></td> + <td align='center'><%=master.getContext().getResourceManager().getScheduler().getMaximumResourceCapability()%></td> </tr> <% if (haService != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/admin/query.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp index ca376bb..0701e34 100644 --- a/tajo-core/src/main/resources/webapps/admin/query.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query.jsp @@ -21,7 +21,7 @@ <%@ page import="org.apache.tajo.master.TajoMaster" %> <%@ page import="org.apache.tajo.master.QueryInProgress" %> -<%@ page import="org.apache.tajo.master.rm.Worker" %> +<%@ page import="org.apache.tajo.master.rm.NodeStatus" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.StringUtils" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> @@ -38,11 +38,13 @@ InetSocketAddress socketAddress = new InetSocketAddress(masterName[0], Integer.parseInt(masterName[1])); String masterLabel = socketAddress.getAddress().getHostName()+ ":" + socketAddress.getPort(); - List<QueryInProgress> runningQueries = + List<QueryInProgress> submittedQueries = new ArrayList<QueryInProgress>(master.getContext().getQueryJobManager().getSubmittedQueries()); + JSPUtil.sortQueryInProgress(submittedQueries, true); - runningQueries.addAll(master.getContext().getQueryJobManager().getRunningQueries()); - JSPUtil.sortQueryInProgress(runningQueries, true); + List<QueryInProgress> runningQueries = + new ArrayList<QueryInProgress>(master.getContext().getQueryJobManager().getRunningQueries()); + JSPUtil.sortQueryInProgress(runningQueries, true); int currentPage = 1; if (request.getParameter("page") != null && !request.getParameter("page").isEmpty()) { @@ -57,9 +59,8 @@ } } - String keyword = request.getParameter("keyword"); - HistoryReader historyReader = master.getContext().getHistoryReader(); - List<QueryInfo> allFinishedQueries = historyReader.getQueries(keyword); + List<QueryInfo> allFinishedQueries = new ArrayList<QueryInfo>(master.getContext().getQueryJobManager().getFinishedQueries()); + Collections.sort(allFinishedQueries, java.util.Collections.reverseOrder()); int numOfFinishedQueries = allFinishedQueries.size(); int totalPage = numOfFinishedQueries % pageSize == 0 ? @@ -69,15 +70,15 @@ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers(); + Map<Integer, NodeStatus> workers = master.getContext().getResourceManager().getNodes(); Map<String, Integer> portMap = new HashMap<String, Integer>(); Collection<Integer> queryMasters = master.getContext().getResourceManager().getQueryMasters(); if (queryMasters == null || queryMasters.isEmpty()) { - queryMasters = master.getContext().getResourceManager().getWorkers().keySet(); + queryMasters = master.getContext().getResourceManager().getNodes().keySet(); } for(int eachQueryMasterKey: queryMasters) { - Worker queryMaster = workers.get(eachQueryMasterKey); + NodeStatus queryMaster = workers.get(eachQueryMasterKey); if(queryMaster != null) { portMap.put(queryMaster.getConnectionInfo().getHost(), queryMaster.getConnectionInfo().getHttpInfoPort()); } @@ -119,7 +120,39 @@ <%@ include file="header.jsp"%> <div class='contents'> <h2>Tajo Master: <%=masterLabel%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2> - <hr/> + <p /> + <hr /> + <h3>Submitted Queries</h3> + <% + if(submittedQueries.isEmpty()) { + out.write("No submitted queries"); + } else { + %> + <table width="100%" border="1" class='border_table'> + <tr></tr><th>QueryId</th><th>Query Master</th><th>Submitted</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th><th>Kill Query</th></tr> + <% + for(QueryInProgress eachQuery: submittedQueries) { + long time = System.currentTimeMillis() - eachQuery.getQueryInfo().getStartTime(); + %> + <tr> + <td><%=eachQuery.getQueryId()%></td> + <td><%=eachQuery.getQueryInfo().getQueryMasterHost()%></td> + <td><%=df.format(eachQuery.getQueryInfo().getStartTime())%></td> + <td><%=(int)(eachQuery.getQueryInfo().getProgress() * 100.0f)%>%</td> + <td><%=StringUtils.formatTime(time)%></td> + <td><%=eachQuery.getQueryInfo().getQueryState()%></td> + <td><%=eachQuery.getQueryInfo().getSql()%></td> + <td><input type="submit" value="Kill" onClick="javascript:killQuery('<%=eachQuery.getQueryId()%>');"></td> + </tr> + <% + } + %> + </table> + <% + } + %> + <p/> + <hr/> <h3>Running Queries</h3> <% if(runningQueries.isEmpty()) { @@ -142,7 +175,7 @@ <td><%=StringUtils.formatTime(time)%></td> <td><%=eachQuery.getQueryInfo().getQueryState()%></td> <td><%=eachQuery.getQueryInfo().getSql()%></td> - <td><input id="btnSubmit" type="submit" value="Kill" onClick="javascript:killQuery('<%=eachQuery.getQueryId()%>');"></td> + <td><input type="submit" value="Kill" onClick="javascript:killQuery('<%=eachQuery.getQueryId()%>');"></td> </tr> <% } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/admin/query_executor.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp index 1a58583..49a43d4 100644 --- a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp @@ -21,7 +21,6 @@ <%@ page import="org.apache.tajo.master.TajoMaster" %> <%@ page import="org.apache.tajo.service.ServiceTracker" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> -<%@ page import="javax.xml.ws.Service" %> <%@ page import="java.net.InetSocketAddress" %> <% @@ -335,7 +334,7 @@ function getPage() { <hr/> <div id="queryResultTools"></div> <hr/> - <div style="display:none;"><form name="dataForm" id="dataForm" method="post" action="getCSV.jsp"><input type="hidden" id="csvData" name="csvData" value="" /></div> + <div style="display:none;"><form name="dataForm" id="dataForm" method="post" action="getCSV.jsp"><input type="hidden" id="csvData" name="csvData" value="" /></form></div> </div> </body> </html> http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/admin/querydetail.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/querydetail.jsp b/tajo-core/src/main/resources/webapps/admin/querydetail.jsp index 099301e..99edca4 100644 --- a/tajo-core/src/main/resources/webapps/admin/querydetail.jsp +++ b/tajo-core/src/main/resources/webapps/admin/querydetail.jsp @@ -40,6 +40,8 @@ queryHistory != null ? JSPUtil.sortStageHistories(queryHistory.getStageHistories()) : null; SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String masterLabel = master.getContext().getTajoMasterService().getBindAddress().getHostName()+ ":" + + master.getContext().getTajoMasterService().getBindAddress().getPort(); %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> @@ -52,7 +54,7 @@ <body> <%@ include file="header.jsp"%> <div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2> + <h2>Tajo Master: <%= masterLabel%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2> <hr/> <h3><%=queryId%></h3> <% http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/admin/querytasks.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/querytasks.jsp b/tajo-core/src/main/resources/webapps/admin/querytasks.jsp index d08c0d9..a7b26ae 100644 --- a/tajo-core/src/main/resources/webapps/admin/querytasks.jsp +++ b/tajo-core/src/main/resources/webapps/admin/querytasks.jsp @@ -26,7 +26,7 @@ <%@ page import="org.apache.tajo.util.history.HistoryReader" %> <%@ page import="org.apache.tajo.util.history.QueryHistory" %> <%@ page import="org.apache.tajo.util.history.StageHistory" %> -<%@ page import="org.apache.tajo.master.rm.Worker" %> +<%@ page import="org.apache.tajo.master.rm.NodeStatus" %> <%@ page import="java.util.*" %> <%@ page import="org.apache.tajo.util.history.TaskHistory" %> <%@ page import="org.apache.tajo.util.*" %> @@ -74,12 +74,12 @@ status = "ALL"; } - Collection<Worker> allWorkers = master.getContext().getResourceManager().getWorkers().values(); + Collection<NodeStatus> allWorkers = master.getContext().getResourceManager().getNodes().values(); - Map<String, Worker> workerMap = new HashMap<String, Worker>(); + Map<String, NodeStatus> nodeMap = new HashMap<String, NodeStatus>(); if(allWorkers != null) { - for(Worker eachWorker: allWorkers) { - workerMap.put(eachWorker.getConnectionInfo().getHostAndPeerRpcPort(), eachWorker); + for(NodeStatus eachWorker: allWorkers) { + nodeMap.put(eachWorker.getConnectionInfo().getHostAndPeerRpcPort(), eachWorker); } } @@ -99,7 +99,7 @@ totalWriteRows = stage.getTotalWriteRows(); } - List<TaskHistory> allTasks = reader.getTaskHistory(queryId, ebId); + List<TaskHistory> allTasks = reader.getTaskHistory(queryId, ebId, NumberUtils.toLong(startTime, 0)); int numTasks = allTasks.size(); int numShuffles = 0; float totalProgress = 0.0f; @@ -132,6 +132,9 @@ "&status=" + status + "&sortOrder=" + nextSortOrder + "&sort="; NumberFormat nf = NumberFormat.getInstance(Locale.US); + + String masterLabel = master.getContext().getTajoMasterService().getBindAddress().getHostName()+ ":" + + master.getContext().getTajoMasterService().getBindAddress().getPort(); %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> @@ -144,7 +147,7 @@ <body> <%@ include file="header.jsp"%> <div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2> + <h2>Tajo Master: <%=masterLabel%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2> <hr/> <h3><a href='querydetail.jsp?queryId=<%=queryId%>&startTime=<%=startTime%>'><%=ebId.toString()%></a></h3> <hr/> @@ -204,7 +207,7 @@ %> <div align="right"># Tasks: <%=numOfTasks%> / # Pages: <%=totalPage%></div> <table border="1" width="100%" class="border_table"> - <tr><th>No</th><th><a href='<%=url%>id'>Id</a></th><th>Status</th><th>Progress</th><th><a href='<%=url%>startTime'>Started</a></th><th><a href='<%=url%>runTime'>Running Time</a></th><th><a href='<%=url%>host'>Host</a></th></tr> + <tr><th>No</th><th><a href='<%=url%>id'>Id</a></th><th>Status</th><th>Progress</th><th><a href='<%=url%>startTime'>Started</a></th><th><a href='<%=url%>runTime'>Running Time</a></th><th>Retry</th><th><a href='<%=url%>host'>Host</a></th></tr> <% int rowNo = (currentPage - 1) * pageSize + 1; for (TaskHistory eachTask: tasks) { @@ -215,10 +218,10 @@ } String taskHost = eachTask.getHostAndPort() == null ? "-" : eachTask.getHostAndPort(); if (eachTask.getHostAndPort() != null) { - Worker worker = workerMap.get(eachTask.getHostAndPort()); - if (worker != null) { + NodeStatus nodeStatus = nodeMap.get(eachTask.getHostAndPort()); + if (nodeStatus != null) { String[] hostTokens = eachTask.getHostAndPort().split(":"); - taskHost = "<a href='http://" + hostTokens[0] + ":" + worker.getConnectionInfo().getHttpInfoPort() + + taskHost = "<a href='http://" + hostTokens[0] + ":" + nodeStatus.getConnectionInfo().getHttpInfoPort() + "/taskhistory.jsp?taskAttemptId=" + eachTask.getId() + "&startTime=" + eachTask.getLaunchTime() + "'>" + eachTask.getHostAndPort() + "</a>"; } @@ -226,12 +229,13 @@ %> <tr> - <td><%=rowNo%></td> + <td align='center'><%=rowNo%></td> <td><a href="<%=taskDetailUrl%>"><%=eachTask.getId()%></a></td> - <td><%=eachTask.getState()%></td> - <td><%=JSPUtil.percentFormat(eachTask.getProgress())%>%</td> - <td><%=eachTask.getLaunchTime() == 0 ? "-" : df.format(eachTask.getLaunchTime())%></td> + <td align='center'><%=eachTask.getState()%></td> + <td align='center'><%=JSPUtil.percentFormat(eachTask.getProgress())%>%</td> + <td align='center'><%=eachTask.getLaunchTime() == 0 ? "-" : df.format(eachTask.getLaunchTime())%></td> <td align='right'><%=eachTask.getLaunchTime() == 0 ? "-" : eachTask.getRunningTime() + " ms"%></td> + <td align='center'><%=eachTask.getRetryCount()%></td> <td><%=taskHost%></td> </tr> <% http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/admin/task.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/task.jsp b/tajo-core/src/main/resources/webapps/admin/task.jsp index 1530572..d3014b1 100644 --- a/tajo-core/src/main/resources/webapps/admin/task.jsp +++ b/tajo-core/src/main/resources/webapps/admin/task.jsp @@ -19,13 +19,13 @@ %> <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> -<%@ page import="org.apache.tajo.util.JSPUtil" %> -<%@ page import="org.apache.tajo.util.TajoIdUtils" %> -<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> -<%@ page import="java.text.SimpleDateFormat" %> +<%@ page import="org.apache.commons.lang.math.NumberUtils" %> <%@ page import="org.apache.tajo.master.TajoMaster" %> +<%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.history.HistoryReader" %> <%@ page import="org.apache.tajo.util.history.TaskHistory" %> +<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> +<%@ page import="java.text.SimpleDateFormat" %> <%@ page import="java.util.List" %> <% @@ -34,15 +34,16 @@ String queryId = request.getParameter("queryId"); String ebId = request.getParameter("ebid"); - + String startTime = request.getParameter("startTime"); String status = request.getParameter("status"); + if(status == null || status.isEmpty() || "null".equals(status)) { status = "ALL"; } String taskAttemptId = request.getParameter("taskAttemptId"); - List<TaskHistory> allTasks = reader.getTaskHistory(queryId, ebId); + List<TaskHistory> allTasks = reader.getTaskHistory(queryId, ebId, NumberUtils.toLong(startTime, 0)); TaskHistory task = null; for(TaskHistory eachTask: allTasks) { @@ -107,12 +108,15 @@ shuffleKey = task.getShuffleKey(); shuffleFileName = task.getShuffleFileName(); } + + String masterLabel = master.getContext().getTajoMasterService().getBindAddress().getHostName()+ ":" + + master.getContext().getTajoMasterService().getBindAddress().getPort(); %> <%@ include file="header.jsp"%> <div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2> + <h2>Tajo Master: <%=masterLabel%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2> <hr/> <h3><a href='<%=backUrl%>'><%=ebId%></a></h3> <hr/> http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/worker/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/index.jsp b/tajo-core/src/main/resources/webapps/worker/index.jsp index bc3cb1e..fa00012 100644 --- a/tajo-core/src/main/resources/webapps/worker/index.jsp +++ b/tajo-core/src/main/resources/webapps/worker/index.jsp @@ -24,9 +24,7 @@ <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="org.apache.tajo.worker.TajoWorker" %> -<%@ page import="org.apache.tajo.worker.TaskRunner" %> <%@ page import="java.text.SimpleDateFormat" %> -<%@ page import="java.util.ArrayList" %> <%@ page import="java.util.List" %> <% @@ -52,6 +50,8 @@ <tr><td width='100'>MaxHeap: </td><td><%=Runtime.getRuntime().maxMemory()/1024/1024%> MB</td> <tr><td width='100'>TotalHeap: </td><td><%=Runtime.getRuntime().totalMemory()/1024/1024%> MB</td> <tr><td width='100'>FreeHeap: </td><td><%=Runtime.getRuntime().freeMemory()/1024/1024%> MB</td> + <tr><td width='100'>Available Resource: </td><td><%= tajoWorker.getWorkerContext().getNodeResourceManager().getAvailableResource() %></td> + <tr><td width='100'>Running Tasks: </td><td><%= tajoWorker.getWorkerContext().getTaskManager().getRunningTasks() %></td> <tr><td width="100">Configuration:</td><td><a href='conf.jsp'>detail...</a></td></tr> <tr><td width="100">Environment:</td><td><a href='env.jsp'>detail...</a></td></tr> <tr><td width="100">Threads:</td><td><a href='thread.jsp'>thread dump...</a></tr> @@ -62,8 +62,6 @@ List<QueryMasterTask> queryMasterTasks = JSPUtil.sortQueryMasterTask(tajoWorker.getWorkerContext() .getQueryMasterManagerService().getQueryMaster().getQueryMasterTasks(), true); - List<QueryMasterTask> finishedQueryMasterTasks = JSPUtil.sortQueryMasterTask(tajoWorker.getWorkerContext() - .getQueryMasterManagerService().getQueryMaster().getFinishedQueryMasterTasks(), true); %> <h3>Running Query</h3> <% @@ -90,58 +88,6 @@ } //end of if %> </table> - <p/> - <hr/> - <h3>Finished Query</h3> - <% - if(finishedQueryMasterTasks.isEmpty()) { - out.write("No finished query master"); - } else { - %> - <table width="100%" border="1" class="border_table"> - <tr><th>QueryId</th><th>Status</th><th>StartTime</th><th>FinishTime</th><th>Progress</th><th>RunTime</th></tr> - <% - for(QueryMasterTask eachQueryMasterTask: finishedQueryMasterTasks) { - Query query = eachQueryMasterTask.getQuery(); - long startTime = query != null ? query.getStartTime() : eachQueryMasterTask.getQuerySubmitTime(); - %> - <tr> - <td align='center'><a href='querydetail.jsp?queryId=<%=eachQueryMasterTask.getQueryId()%>'><%=eachQueryMasterTask.getQueryId()%></a></td> - <td align='center'><%=eachQueryMasterTask.getState()%></td> - <td align='center'><%=df.format(startTime)%></td> - <td align='center'><%=(query == null || query.getFinishTime() == 0) ? "-" : df.format(query.getFinishTime())%></td> - <td align='center'><%=(query == null) ? "-" : (int)(query.getProgress()*100.0f)%>%</td> - <td align='right'><%=(query == null) ? "-" : JSPUtil.getElapsedTime(query.getStartTime(), query.getFinishTime())%></td> - </tr> - <% - } //end of for - } //end of if - %> - </table> - <p/> - <hr/> -<% - List<TaskRunner> taskRunners = new ArrayList<TaskRunner>(tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunners()); - JSPUtil.sortTaskRunner(taskRunners); -%> - <h3>Running Task Containers</h3> - <a href='taskcontainers.jsp'>[All Task Containers]</a> - <br/> - <table width="100%" border="1" class="border_table"> - <tr><th>ContainerId</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr> - <% - for(TaskRunner eachTaskRunner: taskRunners) { - %> - <tr> - <td><a href="tasks.jsp?taskRunnerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td> - <td><%=df.format(eachTaskRunner.getStartTime())%></td> - <td><%=eachTaskRunner.getFinishTime() == 0 ? "-" : df.format(eachTaskRunner.getFinishTime())%></td> - <td><%=JSPUtil.getElapsedTime(eachTaskRunner.getStartTime(), eachTaskRunner.getFinishTime())%></td> - <td><%=eachTaskRunner.getServiceState()%></td> -<% - } //end of for -%> - </table> </div> </body> </html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/worker/querydetail.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp index eb40b4f..29862e6 100644 --- a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp +++ b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp @@ -19,14 +19,12 @@ %> <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> -<%@ page import="org.apache.commons.lang.math.NumberUtils" %> <%@ page import="org.apache.tajo.QueryId" %> <%@ page import="org.apache.tajo.SessionVars" %> <%@ page import="org.apache.tajo.querymaster.Query" %> <%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.TajoIdUtils" %> -<%@ page import="org.apache.tajo.util.history.HistoryReader" %> <%@ page import="org.apache.tajo.util.history.QueryHistory" %> <%@ page import="org.apache.tajo.util.history.StageHistory" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> @@ -37,29 +35,21 @@ <% QueryId queryId = TajoIdUtils.parseQueryId(request.getParameter("queryId")); - String startTime = request.getParameter("startTime"); TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext() - .getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId, true); - - boolean runningQuery = queryMasterTask != null; + .getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId); QueryHistory queryHistory = null; - - Query query = null; + Query query; if (queryMasterTask != null) { query = queryMasterTask.getQuery(); if (query != null) { queryHistory = query.getQueryHistory(); } } else { - HistoryReader reader = tajoWorker.getWorkerContext().getHistoryReader(); - queryHistory = reader.getQueryHistory(queryId.toString(), NumberUtils.toLong(startTime, 0)); - } - - if (!runningQuery && queryHistory == null) { - out.write("<script type='text/javascript'>alert('no query history'); history.back(0); </script>"); + String tajoMasterHttp = request.getScheme() + "://" + JSPUtil.getTajoMasterHttpAddr(tajoWorker.getConfig()); + response.sendRedirect(tajoMasterHttp + request.getRequestURI() + "?" + request.getQueryString()); return; } @@ -67,6 +57,7 @@ queryHistory != null ? JSPUtil.sortStageHistories(queryHistory.getStageHistories()) : null; SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> @@ -82,7 +73,7 @@ <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2> <hr/> <% -if (runningQuery && query == null) { +if (query == null) { out.write("Query Status: " + queryMasterTask.getState()); String errorMessage = queryMasterTask.getErrorMessage(); if (errorMessage != null && !errorMessage.isEmpty()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/worker/queryplan.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp index 878efe3..422edfc 100644 --- a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp +++ b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp @@ -37,7 +37,7 @@ TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext() - .getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId, true); + .getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId); if(queryMasterTask == null) { out.write("<script type='text/javascript'>alert('no query'); history.back(0); </script>"); http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/worker/querytasks.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp index 3e3c2c2..2c32006 100644 --- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp +++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp @@ -24,7 +24,6 @@ <%@ page import="org.apache.tajo.TaskAttemptId" %> <%@ page import="org.apache.tajo.catalog.statistics.TableStats" %> <%@ page import="org.apache.tajo.plan.util.PlannerUtil" %> -<%@ page import="org.apache.tajo.ipc.QueryCoordinatorProtocol" %> <%@ page import="org.apache.tajo.querymaster.*" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="org.apache.tajo.worker.TajoWorker" %> @@ -33,6 +32,8 @@ <%@ page import="org.apache.tajo.util.history.HistoryReader" %> <%@ page import="org.apache.tajo.util.*" %> <%@ page import="java.util.*" %> +<%@ page import="org.apache.tajo.TajoProtos" %> +<%@ page import="org.apache.tajo.master.cluster.WorkerConnectionInfo" %> <% String paramQueryId = request.getParameter("queryId"); @@ -60,20 +61,21 @@ } TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); - List<QueryCoordinatorProtocol.WorkerResourceProto> allWorkers = tajoWorker.getWorkerContext() + List<TajoProtos.WorkerConnectionInfoProto> allWorkers = tajoWorker.getWorkerContext() .getQueryMasterManagerService().getQueryMaster().getAllWorker(); - Map<Integer, QueryCoordinatorProtocol.WorkerResourceProto> workerMap = new HashMap<Integer, QueryCoordinatorProtocol.WorkerResourceProto>(); + Map<Integer, TajoProtos.WorkerConnectionInfoProto> workerMap = new HashMap<Integer, TajoProtos.WorkerConnectionInfoProto>(); if(allWorkers != null) { - for(QueryCoordinatorProtocol.WorkerResourceProto eachWorker: allWorkers) { - workerMap.put(eachWorker.getConnectionInfo().getId(), eachWorker); + for(TajoProtos.WorkerConnectionInfoProto eachWorker: allWorkers) { + workerMap.put(eachWorker.getId(), eachWorker); } } QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext() - .getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId, true); + .getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId); if(queryMasterTask == null) { - out.write("<script type='text/javascript'>alert('no query'); history.back(0); </script>"); + String tajoMasterHttp = request.getScheme() + "://" + JSPUtil.getTajoMasterHttpAddr(tajoWorker.getConfig()); + response.sendRedirect(tajoMasterHttp + request.getRequestURI() + "?" + request.getQueryString()); return; } @@ -220,7 +222,7 @@ %> <div align="right"># Tasks: <%=numOfTasks%> / # Pages: <%=totalPage%></div> <table border="1" width="100%" class="border_table"> - <tr><th>No</th><th><a href='<%=url%>id'>Id</a></th><th>Status</th><th>Progress</th><th><a href='<%=url%>startTime'>Started</a></th><th><a href='<%=url%>runTime'>Running Time</a></th><th><a href='<%=url%>host'>Host</a></th></tr> + <tr><th>No</th><th><a href='<%=url%>id'>Id</a></th><th>Status</th><th>Progress</th><th><a href='<%=url%>startTime'>Started</a></th><th><a href='<%=url%>runTime'>Running Time</a></th><th>Retry</th><th><a href='<%=url%>host'>Host</a></th></tr> <% for(Task eachTask : tasks) { int taskSeq = eachTask.getId().getId(); @@ -228,26 +230,22 @@ "&page=" + currentPage + "&pageSize=" + pageSize + "&taskSeq=" + taskSeq + "&sort=" + sort + "&sortOrder=" + sortOrder; - String taskHost = eachTask.getSucceededHost() == null ? "-" : eachTask.getSucceededHost(); - if(eachTask.getSucceededHost() != null) { - QueryCoordinatorProtocol.WorkerResourceProto worker = - workerMap.get(eachTask.getLastAttempt().getWorkerConnectionInfo().getId()); - if(worker != null) { - TaskAttempt lastAttempt = eachTask.getLastAttempt(); - if(lastAttempt != null) { - TaskAttemptId lastAttemptId = lastAttempt.getId(); - taskHost = "<a href='http://" + eachTask.getSucceededHost() + ":" + worker.getConnectionInfo().getHttpInfoPort() + "/taskdetail.jsp?taskAttemptId=" + lastAttemptId + "'>" + eachTask.getSucceededHost() + "</a>"; - } - } + TaskAttempt lastAttempt = eachTask.getLastAttempt(); + String taskHost = lastAttempt == null ? "-" : lastAttempt.getWorkerConnectionInfo().getHost(); + if(lastAttempt != null) { + WorkerConnectionInfo conn = lastAttempt.getWorkerConnectionInfo(); + TaskAttemptId lastAttemptId = lastAttempt.getId(); + taskHost = "<a href='http://" + conn.getHost() + ":" + conn.getHttpInfoPort() + "/taskdetail.jsp?taskAttemptId=" + lastAttemptId + "'>" + conn.getHost() + "</a>"; } %> <tr> - <td><%=rowNo%></td> + <td align='center'><%=rowNo%></td> <td><a href="<%=taskDetailUrl%>"><%=eachTask.getId()%></a></td> - <td><%=eachTask.getLastAttemptStatus()%></td> - <td><%=JSPUtil.percentFormat(eachTask.getLastAttempt().getProgress())%>%</td> - <td><%=eachTask.getLaunchTime() == 0 ? "-" : df.format(eachTask.getLaunchTime())%></td> + <td align='center'><%=eachTask.getLastAttemptStatus()%></td> + <td align='center'><%=JSPUtil.percentFormat(eachTask.getLastAttempt().getProgress())%>%</td> + <td align='center'><%=eachTask.getLaunchTime() == 0 ? "-" : df.format(eachTask.getLaunchTime())%></td> <td align='right'><%=eachTask.getLaunchTime() == 0 ? "-" : eachTask.getRunningTime() + " ms"%></td> + <td align='center'><%=eachTask.getRetryCount()%></td> <td><%=taskHost%></td> </tr> <% http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/worker/task.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/task.jsp b/tajo-core/src/main/resources/webapps/worker/task.jsp index 17e884a..f2f903b 100644 --- a/tajo-core/src/main/resources/webapps/worker/task.jsp +++ b/tajo-core/src/main/resources/webapps/worker/task.jsp @@ -21,16 +21,16 @@ <%@ page import="org.apache.tajo.ExecutionBlockId" %> <%@ page import="org.apache.tajo.QueryId" %> +<%@ page import="org.apache.tajo.ResourceProtos.ShuffleFileOutput" %> <%@ page import="org.apache.tajo.TaskId" %> <%@ page import="org.apache.tajo.catalog.proto.CatalogProtos" %> <%@ page import="org.apache.tajo.catalog.statistics.TableStats" %> -<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %> <%@ page import="org.apache.tajo.querymaster.Query" %> <%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %> -<%@ page import="org.apache.tajo.querymaster.Task" %> <%@ page import="org.apache.tajo.querymaster.Stage" %> +<%@ page import="org.apache.tajo.querymaster.Task" %> <%@ page import="org.apache.tajo.storage.DataLocation" %> -<%@ page import="org.apache.tajo.storage.fragment.FileFragment" %> +<%@ page import="org.apache.tajo.storage.fragment.Fragment" %> <%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.TajoIdUtils" %> @@ -41,7 +41,6 @@ <%@ page import="java.text.SimpleDateFormat" %> <%@ page import="java.util.Map" %> <%@ page import="java.util.Set" %> -<%@ page import="org.apache.tajo.storage.fragment.Fragment" %> <% String paramQueryId = request.getParameter("queryId"); @@ -57,10 +56,11 @@ int taskSeq = Integer.parseInt(request.getParameter("taskSeq")); TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext() - .getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId, true); + .getQueryMasterManagerService().getQueryMaster().getQueryMasterTask(queryId); if(queryMasterTask == null) { - out.write("<script type='text/javascript'>alert('no query'); history.back(0); </script>"); + String tajoMasterHttp = request.getScheme() + "://" + JSPUtil.getTajoMasterHttpAddr(tajoWorker.getConfig()); + response.sendRedirect(tajoMasterHttp + request.getRequestURI() + "?" + request.getQueryString()); return; } @@ -131,7 +131,7 @@ String shuffleKey = "-"; String shuffleFileName = "-"; if(numShuffles > 0) { - TajoWorkerProtocol.ShuffleFileOutput shuffleFileOutputs = task.getShuffleFileOutputs().get(0); + ShuffleFileOutput shuffleFileOutputs = task.getShuffleFileOutputs().get(0); shuffleKey = "" + shuffleFileOutputs.getPartId(); shuffleFileName = shuffleFileOutputs.getFileName(); } @@ -161,7 +161,7 @@ <tr><td align="right">Launch Time</td><td><%=task.getLaunchTime() == 0 ? "-" : df.format(task.getLaunchTime())%></td></tr> <tr><td align="right">Finish Time</td><td><%=task.getFinishTime() == 0 ? "-" : df.format(task.getFinishTime())%></td></tr> <tr><td align="right">Running Time</td><td><%=task.getLaunchTime() == 0 ? "-" : task.getRunningTime() + " ms"%></td></tr> - <tr><td align="right">Host</td><td><%=task.getSucceededHost() == null ? "-" : task.getSucceededHost()%></td></tr> + <tr><td align="right">Host</td><td><%=task.getSucceededWorker() == null ? "-" : task.getSucceededWorker().getHost()%></td></tr> <tr><td align="right">Shuffles</td><td># Shuffle Outputs: <%=numShuffles%>, Shuffle Key: <%=shuffleKey%>, Shuffle file: <%=shuffleFileName%></td></tr> <tr><td align="right">Data Locations</td><td><%=dataLocationInfos%></td></tr> <tr><td align="right">Fragment</td><td><%=fragmentInfo%></td></tr> http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp b/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp deleted file mode 100644 index bb5e90d..0000000 --- a/tajo-core/src/main/resources/webapps/worker/taskcontainers.jsp +++ /dev/null @@ -1,93 +0,0 @@ -<% - /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -%> -<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> - -<%@ page import="org.apache.tajo.util.JSPUtil" %> -<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> -<%@ page import="org.apache.tajo.worker.TajoWorker" %> -<%@ page import="org.apache.tajo.worker.TaskRunner" %> -<%@ page import="java.text.SimpleDateFormat" %> -<%@ page import="java.util.ArrayList" %> -<%@ page import="java.util.List" %> -<%@ page import="org.apache.tajo.worker.TaskRunnerHistory" %> -<%@ page import="org.apache.tajo.worker.TaskRunnerHistory" %> - -<% - TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); - - List<TaskRunner> taskRunners = new ArrayList<TaskRunner>(tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunners()); - List<TaskRunnerHistory> histories = new ArrayList<TaskRunnerHistory>(tajoWorker.getWorkerContext().getTaskRunnerManager().getExecutionBlockHistories()); - - JSPUtil.sortTaskRunner(taskRunners); - JSPUtil.sortTaskRunnerHistory(histories); - - SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); -%> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<head> - <link rel="stylesheet" type = "text/css" href = "/static/style.css" /> - <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> - <title>tajo worker</title> -</head> -<body> -<%@ include file="header.jsp"%> -<div class='contents'> - <h2>Tajo Worker: <a href='index.jsp'><%=tajoWorker.getWorkerContext().getWorkerName()%></a></h2> - <hr/> - <h3>Running Task Containers</h3> - <table width="100%" border="1" class="border_table"> - <tr><th>ContainerId</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr> -<% - for(TaskRunner eachTaskRunner: taskRunners) { -%> - <tr> - <td><a href="tasks.jsp?taskRunnerId=<%=eachTaskRunner.getId()%>"><%=eachTaskRunner.getId()%></a></td> - <td><%=df.format(eachTaskRunner.getStartTime())%></td> - <td><%=eachTaskRunner.getFinishTime() == 0 ? "-" : df.format(eachTaskRunner.getFinishTime())%></td> - <td><%=JSPUtil.getElapsedTime(eachTaskRunner.getStartTime(), eachTaskRunner.getFinishTime())%></td> - <td><%=eachTaskRunner.getServiceState()%></td> -<% - } -%> - </table> - <p/> - <hr/> - <h3>Finished Task Containers</h3> - <table width="100%" border="1" class="border_table"> - <tr><th>ContainerId</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr> -<% - for(TaskRunnerHistory history: histories) { - String taskRunnerId = TaskRunner.getId(history.getExecutionBlockId(), history.getContainerId()); -%> - <tr> - <td><a href="tasks.jsp?taskRunnerId=<%=taskRunnerId%>"><%=taskRunnerId%></a></td> - <td><%=df.format(history.getStartTime())%></td> - <td><%=history.getFinishTime() == 0 ? "-" : df.format(history.getFinishTime())%></td> - <td><%=JSPUtil.getElapsedTime(history.getStartTime(), history.getFinishTime())%></td> - <td><%=history.getState()%></td> -<% - } -%> - </table> -</div> -</body> -</html> \ No newline at end of file
