TAJO-1016: Refactor worker rpc information. (jinho) Closes #125
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/28282b56 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/28282b56 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/28282b56 Branch: refs/heads/block_iteration Commit: 28282b561ad0a75f9603936bf04f2aa5c99b6b58 Parents: 469820d Author: jhkim <[email protected]> Authored: Sat Sep 20 15:53:06 2014 +0900 Committer: jhkim <[email protected]> Committed: Sat Sep 20 15:53:06 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/client/TajoAdmin.java | 36 ++-- tajo-client/src/main/proto/ClientProtos.proto | 37 ++-- tajo-common/src/main/proto/tajo_protos.proto | 10 ++ .../tajo/master/DefaultTaskScheduler.java | 18 +- .../apache/tajo/master/LazyTaskScheduler.java | 4 +- .../apache/tajo/master/TajoContainerProxy.java | 5 +- .../tajo/master/TajoMasterClientService.java | 12 +- .../apache/tajo/master/TajoMasterService.java | 9 +- .../master/cluster/WorkerConnectionInfo.java | 178 +++++++++++++++++++ .../master/event/TaskAttemptAssignedEvent.java | 17 +- .../tajo/master/event/TaskRequestEvent.java | 13 +- .../master/querymaster/QueryInProgress.java | 6 +- .../master/querymaster/QueryJobManager.java | 11 +- .../tajo/master/querymaster/QueryMaster.java | 18 +- .../querymaster/QueryMasterManagerService.java | 2 +- .../tajo/master/querymaster/QueryUnit.java | 12 +- .../master/querymaster/QueryUnitAttempt.java | 32 +--- .../apache/tajo/master/rm/TajoRMContext.java | 14 +- .../tajo/master/rm/TajoResourceTracker.java | 24 +-- .../master/rm/TajoWorkerResourceManager.java | 33 ++-- .../java/org/apache/tajo/master/rm/Worker.java | 73 ++------ .../org/apache/tajo/master/rm/WorkerEvent.java | 6 +- .../tajo/master/rm/WorkerLivelinessMonitor.java | 4 +- .../tajo/master/rm/WorkerReconnectEvent.java | 2 +- .../tajo/master/rm/WorkerResourceManager.java | 6 +- .../tajo/master/rm/WorkerStatusEvent.java | 2 +- .../tajo/worker/AbstractResourceAllocator.java | 15 ++ .../tajo/worker/ExecutionBlockContext.java | 12 +- .../tajo/worker/TajoResourceAllocator.java | 17 +- .../java/org/apache/tajo/worker/TajoWorker.java | 178 ++++++++++--------- .../tajo/worker/TajoWorkerClientService.java | 6 +- .../tajo/worker/TajoWorkerManagerService.java | 31 +--- .../main/java/org/apache/tajo/worker/Task.java | 3 +- .../java/org/apache/tajo/worker/TaskRunner.java | 20 +-- .../apache/tajo/worker/TaskRunnerManager.java | 11 +- .../tajo/worker/WorkerHeartbeatService.java | 57 +++--- .../tajo/worker/event/TaskRunnerStartEvent.java | 18 +- .../main/proto/ResourceTrackerProtocol.proto | 12 +- .../src/main/proto/TajoMasterProtocol.proto | 36 ++-- .../src/main/proto/TajoWorkerProtocol.proto | 20 +-- .../main/resources/webapps/admin/cluster.jsp | 54 +++--- .../src/main/resources/webapps/admin/index.jsp | 4 +- .../src/main/resources/webapps/admin/query.jsp | 8 +- .../resources/webapps/worker/querytasks.jsp | 9 +- .../resources/webapps/worker/taskdetail.jsp | 2 + .../src/main/resources/webapps/worker/tasks.jsp | 4 +- .../tajo/cluster/TestWorkerConnectionInfo.java | 36 ++++ .../tajo/master/rm/TestTajoResourceManager.java | 8 +- .../tajo/pullserver/TajoPullServerService.java | 15 +- 50 files changed, 632 insertions(+), 530 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index a5f31f9..5d92881 100644 --- a/CHANGES +++ b/CHANGES @@ -444,6 +444,8 @@ Release 0.9.0 - unreleased SUB TASKS + TAJO-1016: Refactor worker rpc information. (jinho) + TAJO-1015: Add executionblock event in worker. (jinho) TAJO-783: Remove yarn-related code from tajo-core. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java index 95dfc68..1acdb4d 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java @@ -276,14 +276,15 @@ public class TajoAdmin { line5, line10, line10); writer.write(line); for (WorkerResourceInfo queryMaster : liveQueryMasters) { - String queryMasterHost = String.format("%s:%d", - queryMaster.getAllocatedHost(), - queryMaster.getQueryMasterPort()); - String heap = String.format("%d MB", queryMaster.getMaxHeap()/1024/1024); - line = String.format(fmtQueryMasterLine, queryMasterHost, - queryMaster.getClientPort(), - queryMaster.getNumQueryMasterTasks(), - heap, queryMaster.getWorkerStatus()); + TajoProtos.WorkerConnectionInfoProto connInfo = queryMaster.getConnectionInfo(); + String queryMasterHost = String.format("%s:%d", connInfo.getHost(), connInfo.getQueryMasterPort()); + String heap = String.format("%d MB", queryMaster.getMaxHeap() / 1024 / 1024); + line = String.format(fmtQueryMasterLine, + queryMasterHost, + connInfo.getClientPort(), + queryMaster.getNumQueryMasterTasks(), + heap, + queryMaster.getWorkerStatus()); writer.write(line); } @@ -301,12 +302,12 @@ public class TajoAdmin { writer.write(line); for (WorkerResourceInfo queryMaster : deadQueryMasters) { - String queryMasterHost = String.format("%s:%d", - queryMaster.getAllocatedHost(), - queryMaster.getQueryMasterPort()); - line = String.format(fmtQueryMasterLine, queryMasterHost, - queryMaster.getClientPort(), - queryMaster.getWorkerStatus()); + TajoProtos.WorkerConnectionInfoProto connInfo = queryMaster.getConnectionInfo(); + String queryMasterHost = String.format("%s:%d", connInfo.getHost(), connInfo.getQueryMasterPort()); + line = String.format(fmtQueryMasterLine, + queryMasterHost, + connInfo.getClientPort(), + queryMaster.getWorkerStatus()); writer.write(line); } @@ -358,9 +359,8 @@ public class TajoAdmin { writer.write(line); for (WorkerResourceInfo worker : workers) { - String workerHost = String.format("%s:%d", - worker.getAllocatedHost(), - worker.getPeerRpcPort()); + TajoProtos.WorkerConnectionInfoProto connInfo = worker.getConnectionInfo(); + String workerHost = String.format("%s:%d", connInfo.getHost(), connInfo.getPeerRpcPort()); String mem = String.format("%d/%d", worker.getUsedMemoryMB(), worker.getMemoryMB()); String disk = String.format("%.2f/%.2f", worker.getUsedDiskSlots(), @@ -369,7 +369,7 @@ public class TajoAdmin { worker.getMaxHeap()/1024/1024); line = String.format(fmtWorkerLine, workerHost, - worker.getPullServerPort(), + connInfo.getPullServerPort(), worker.getNumRunningTasks(), mem, disk, heap, worker.getWorkerStatus()); writer.write(line); http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-client/src/main/proto/ClientProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index c66b228..0359685 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -157,27 +157,22 @@ message GetClusterInfoRequest { } message WorkerResourceInfo { - required string allocatedHost = 1; - required int32 peerRpcPort = 2; - required int32 queryMasterPort = 3; - required int32 clientPort = 4; - required int32 pullServerPort = 5; - required int32 httpPort = 6; - required float diskSlots = 7; - required int32 cpuCoreSlots = 8; - required int32 memoryMB = 9; - required float usedDiskSlots = 10; - required int32 usedMemoryMB = 11; - required int32 usedCpuCoreSlots = 12; - required int64 maxHeap = 13; - required int64 freeHeap = 14; - required int64 totalHeap = 15; - required int32 numRunningTasks = 16; - required string workerStatus = 17; - required int64 lastHeartbeat = 18; - required bool queryMasterMode = 19; - required bool taskRunnerMode = 20; - required int32 numQueryMasterTasks = 21; + required WorkerConnectionInfoProto connectionInfo = 1; + required float diskSlots = 2; + required int32 cpuCoreSlots = 3; + required int32 memoryMB = 4; + required float usedDiskSlots = 5; + required int32 usedMemoryMB = 6; + required int32 usedCpuCoreSlots = 7; + required int64 maxHeap = 8; + required int64 freeHeap = 9; + required int64 totalHeap = 10; + required int32 numRunningTasks = 11; + required string workerStatus = 12; + required int64 lastHeartbeat = 13; + required bool queryMasterMode = 14; + required bool taskRunnerMode = 15; + required int32 numQueryMasterTasks = 16; } message GetClusterInfoResponse { http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-common/src/main/proto/tajo_protos.proto ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto index edd27fc..b6cd9ef 100644 --- a/tajo-common/src/main/proto/tajo_protos.proto +++ b/tajo-common/src/main/proto/tajo_protos.proto @@ -52,4 +52,14 @@ enum FetcherState { FETCH_FETCHING = 1; FETCH_FINISHED = 2; FETCH_FAILED = 3; +} + +message WorkerConnectionInfoProto { + required int32 id = 1; + required string host = 2; + required int32 peerRpcPort = 3; + required int32 pullServerPort = 4; + optional int32 queryMasterPort = 5; + required int32 clientPort = 6; + required int32 httpInfoPort = 7; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index 7684df2..2cb8878 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -33,6 +33,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryUnitRequest; import org.apache.tajo.engine.query.QueryUnitRequestImpl; import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext; import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; @@ -744,13 +745,15 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } // getting the hostname of requested node - String host = container.getTaskHostName(); + WorkerConnectionInfo connectionInfo = + context.getMasterContext().getResourceAllocator().getWorkerConnectionInfo(taskRequest.getWorkerId()); + String host = connectionInfo.getHost(); // if there are no worker matched to the hostname a task request if(!leafTaskHostMapping.containsKey(host)){ - host = NetUtils.normalizeHost(host); + String normalizedHost = NetUtils.normalizeHost(host); - if(!leafTaskHostMapping.containsKey(host) && !taskRequests.isEmpty()){ + if(!leafTaskHostMapping.containsKey(normalizedHost) && !taskRequests.isEmpty()){ // this case means one of either cases: // * there are no blocks which reside in this node. // * all blocks which reside in this node are consumed, and this task runner requests a remote task. @@ -826,8 +829,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, - taskRequest.getContainerId(), - host, container.getTaskPort())); + taskRequest.getContainerId(), connectionInfo)); assignedRequest.add(attemptId); scheduledObjectNum--; @@ -891,10 +893,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } - ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer( - taskRequest.getContainerId()); + WorkerConnectionInfo connectionInfo = context.getMasterContext().getResourceAllocator(). + getWorkerConnectionInfo(taskRequest.getWorkerId()); context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, - taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort())); + taskRequest.getContainerId(), connectionInfo)); taskRequest.getCallback().run(taskAssign.getProto()); totalAssigned++; scheduledObjectNum--; http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java index 6552998..f7953e0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java @@ -469,8 +469,6 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { private void assignTask(QueryUnitAttemptScheduleContext attemptContext, QueryUnitAttempt taskAttempt) { QueryUnitAttemptId attemptId = taskAttempt.getId(); - ContainerProxy containerProxy = context.getMasterContext().getResourceAllocator(). - getContainer(attemptContext.getContainerId()); QueryUnitRequest taskAssign = new QueryUnitRequestImpl( attemptId, new ArrayList<FragmentProto>(taskAttempt.getQueryUnit().getAllFragments()), @@ -495,7 +493,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { } context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, - attemptContext.getContainerId(), attemptContext.getHost(), containerProxy.getTaskPort())); + attemptContext.getContainerId(), taskAttempt.getWorkerConnectionInfo())); totalAssigned++; attemptContext.getCallback().run(taskAssign.getProto()); http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index c317ba5..c236c20 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -59,7 +59,7 @@ public class TajoContainerProxy extends ContainerProxy { context.getResourceAllocator().addContainer(containerID, this); this.hostName = container.getNodeId().getHost(); - this.port = ((TajoWorkerContainer)container).getWorkerResource().getPullServerPort(); + this.port = ((TajoWorkerContainer)container).getWorkerResource().getConnectionInfo().getPullServerPort(); this.state = ContainerState.RUNNING; if (LOG.isDebugEnabled()) { @@ -102,8 +102,7 @@ public class TajoContainerProxy extends ContainerProxy { TajoWorkerProtocol.RunExecutionBlockRequestProto request = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder() .setExecutionBlockId(executionBlockId.getProto()) - .setQueryMasterHost(myAddr.getHostName()) - .setQueryMasterPort(myAddr.getPort()) + .setQueryMaster(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getProto()) .setNodeId(container.getNodeId().toString()) .setContainerId(container.getId().toString()) .setQueryOutputPath(context.getStagingDir().toString()) http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 7d80a88..e69393a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -503,9 +503,9 @@ public class TajoMasterClientService extends AbstractService { context.getSessionManager().touch(request.getSessionId().getId()); GetClusterInfoResponse.Builder builder= GetClusterInfoResponse.newBuilder(); - Map<String, Worker> workers = context.getResourceManager().getWorkers(); + Map<Integer, Worker> workers = context.getResourceManager().getWorkers(); - List<String> wokerKeys = new ArrayList<String>(workers.keySet()); + List<Integer> wokerKeys = new ArrayList<Integer>(workers.keySet()); Collections.sort(wokerKeys); WorkerResourceInfo.Builder workerBuilder @@ -513,7 +513,8 @@ public class TajoMasterClientService extends AbstractService { for(Worker worker: workers.values()) { WorkerResource workerResource = worker.getResource(); - workerBuilder.setAllocatedHost(worker.getHostName()); + + workerBuilder.setConnectionInfo(worker.getConnectionInfo().getProto()); workerBuilder.setDiskSlots(workerResource.getDiskSlots()); workerBuilder.setCpuCoreSlots(workerResource.getCpuCoreSlots()); workerBuilder.setMemoryMB(workerResource.getMemoryMB()); @@ -524,11 +525,6 @@ public class TajoMasterClientService extends AbstractService { workerBuilder.setWorkerStatus(worker.getState().toString()); workerBuilder.setQueryMasterMode(workerResource.isQueryMasterMode()); workerBuilder.setTaskRunnerMode(workerResource.isTaskRunnerMode()); - workerBuilder.setPeerRpcPort(worker.getPeerRpcPort()); - workerBuilder.setQueryMasterPort(worker.getQueryMasterPort()); - workerBuilder.setClientPort(worker.getClientPort()); - workerBuilder.setPullServerPort(worker.getPullServerPort()); - workerBuilder.setHttpPort(worker.getHttpPort()); workerBuilder.setMaxHeap(workerResource.getMaxHeap()); workerBuilder.setFreeHeap(workerResource.getFreeHeap()); workerBuilder.setTotalHeap(workerResource.getTotalHeap()); http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java index 5e9f729..ddf24d3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java @@ -28,6 +28,7 @@ import org.apache.tajo.QueryId; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.querymaster.QueryJobManager; import org.apache.tajo.master.rm.Worker; import org.apache.tajo.master.rm.WorkerResource; @@ -97,7 +98,7 @@ public class TajoMasterService extends AbstractService { RpcController controller, TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) { if(LOG.isDebugEnabled()) { - LOG.debug("Received QueryHeartbeat:" + request.getTajoWorkerHost() + ":" + request.getTajoQueryMasterPort()); + LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo())); } TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null; @@ -156,13 +157,9 @@ public class TajoMasterService extends AbstractService { TajoMasterProtocol.WorkerResourceProto.Builder workerResource = TajoMasterProtocol.WorkerResourceProto.newBuilder(); - workerResource.setHost(worker.getHostName()); - workerResource.setPeerRpcPort(worker.getPeerRpcPort()); - workerResource.setInfoPort(worker.getHttpPort()); - workerResource.setQueryMasterPort(worker.getQueryMasterPort()); + workerResource.setConnectionInfo(worker.getConnectionInfo().getProto()); workerResource.setMemoryMB(resource.getMemoryMB()); workerResource.setDiskSlots(resource.getDiskSlots()); - workerResource.setQueryMasterPort(worker.getQueryMasterPort()); builder.addWorkerResources(workerResource); } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java new file mode 100644 index 0000000..78d4978 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.cluster; + +import org.apache.tajo.common.ProtoObject; + +import static org.apache.tajo.TajoProtos.WorkerConnectionInfoProto; + +public class WorkerConnectionInfo implements ProtoObject<WorkerConnectionInfoProto>, Comparable<WorkerConnectionInfo> { + + /** + * unique worker id + */ + private int id; + /** + * Hostname + */ + private String host; + /** + * Peer rpc port + */ + private int peerRpcPort; + /** + * pull server port + */ + private int pullServerPort; + /** + * QueryMaster rpc port + */ + private int queryMasterPort; + /** + * the port of client rpc which provides an client API + */ + private int clientPort; + /** + * http info port + */ + private int httpInfoPort; + + public WorkerConnectionInfo() { + } + + public WorkerConnectionInfo(WorkerConnectionInfoProto proto) { + this(); + this.id = proto.getId(); + this.host = proto.getHost(); + this.peerRpcPort = proto.getPeerRpcPort(); + this.pullServerPort = proto.getPullServerPort(); + this.clientPort = proto.getClientPort(); + this.httpInfoPort = proto.getHttpInfoPort(); + this.queryMasterPort = proto.getQueryMasterPort(); + } + + public WorkerConnectionInfo(String host, int peerRpcPort, int pullServerPort, int clientPort, + int queryMasterPort, int httpInfoPort) { + this(); + this.host = host; + this.peerRpcPort = peerRpcPort; + this.pullServerPort = pullServerPort; + this.clientPort = clientPort; + this.queryMasterPort = queryMasterPort; + this.httpInfoPort = httpInfoPort; + this.id = hashCode(); + } + + public String getHost() { + return host; + } + + public int getPeerRpcPort() { + return peerRpcPort; + } + + public int getPullServerPort() { + return pullServerPort; + } + + public int getQueryMasterPort() { + return queryMasterPort; + } + + public int getClientPort() { + return clientPort; + } + + public int getHttpInfoPort() { + return httpInfoPort; + } + + public int getId() { + return id; + } + + public String getHostAndPeerRpcPort() { + return this.getHost() + ":" + this.getPeerRpcPort(); + } + + @Override + public WorkerConnectionInfoProto getProto() { + WorkerConnectionInfoProto.Builder builder = WorkerConnectionInfoProto.newBuilder(); + builder.setId(id) + .setHost(host) + .setPeerRpcPort(peerRpcPort) + .setPullServerPort(pullServerPort) + .setClientPort(clientPort) + .setHttpInfoPort(httpInfoPort) + .setQueryMasterPort(queryMasterPort); + return builder.build(); + } + + @Override + public int hashCode() { + final int prime = 493217; + int result = 8501; + result = prime * result + this.getHost().hashCode(); + result = prime * result + this.getPeerRpcPort(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + WorkerConnectionInfo other = (WorkerConnectionInfo) obj; + if (!this.getHost().equals(other.getHost())) + return false; + if (this.getPeerRpcPort() != other.getPeerRpcPort()) + return false; + return true; + } + + @Override + public int compareTo(WorkerConnectionInfo other) { + int hostCompare = this.getHost().compareTo(other.getHost()); + if (hostCompare == 0) { + if (this.getPeerRpcPort() > other.getPeerRpcPort()) { + return 1; + } else if (this.getPeerRpcPort() < other.getPeerRpcPort()) { + return -1; + } + return 0; + } + return hostCompare; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("id:").append(id).append(", ") + .append("host:").append(host).append(", ") + .append("PeerRpcPort:").append(peerRpcPort).append(", ") + .append("PullServerPort:").append(pullServerPort).append(", ") + .append("ClientPort:").append(clientPort).append(", ") + .append("QueryMasterPort:").append(queryMasterPort).append(", ") + .append("HttpInfoPort:").append(httpInfoPort); + return builder.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java index 4934633..e0928c5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java @@ -20,29 +20,24 @@ package org.apache.tajo.master.event; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; public class TaskAttemptAssignedEvent extends TaskAttemptEvent { private final ContainerId cId; - private final String hostName; - private final int pullServerPort; + private final WorkerConnectionInfo workerConnectionInfo; public TaskAttemptAssignedEvent(QueryUnitAttemptId id, ContainerId cId, - String hostname, int pullServerPort) { + WorkerConnectionInfo connectionInfo) { super(id, TaskAttemptEventType.TA_ASSIGNED); this.cId = cId; - this.hostName = hostname; - this.pullServerPort = pullServerPort; + this.workerConnectionInfo = connectionInfo; } public ContainerId getContainerId() { return cId; } - public String getHostName() { - return hostName; - } - - public int getPullServerPort() { - return pullServerPort; + public WorkerConnectionInfo getWorkerConnectionInfo(){ + return workerConnectionInfo; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java index 9be7cab..2197c33 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java @@ -31,24 +31,31 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> { TASK_REQ } - private final ContainerId workerId; + private final int workerId; + private final ContainerId containerId; private final ExecutionBlockId executionBlockId; private final RpcCallback<QueryUnitRequestProto> callback; - public TaskRequestEvent(ContainerId workerId, + public TaskRequestEvent(int workerId, + ContainerId containerId, ExecutionBlockId executionBlockId, RpcCallback<QueryUnitRequestProto> callback) { super(TaskRequestEventType.TASK_REQ); this.workerId = workerId; + this.containerId = containerId; this.executionBlockId = executionBlockId; this.callback = callback; } - public ContainerId getContainerId() { + public int getWorkerId() { return this.workerId; } + public ContainerId getContainerId() { + return this.containerId; + } + public ExecutionBlockId getExecutionBlockId() { return executionBlockId; } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index 261200e..877a20a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -172,9 +172,9 @@ public class QueryInProgress extends CompositeService { return false; } - queryInfo.setQueryMaster(resource.getWorkerHost()); - queryInfo.setQueryMasterPort(resource.getQueryMasterPort()); - queryInfo.setQueryMasterclientPort(resource.getClientPort()); + queryInfo.setQueryMaster(resource.getConnectionInfo().getHost()); + queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort()); + queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort()); getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo)); http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java index acaefc9..e4f47cd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java @@ -31,6 +31,7 @@ import org.apache.tajo.engine.planner.logical.LogicalRootNode; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.session.Session; import org.apache.tajo.scheduler.SimpleFifoScheduler; @@ -241,11 +242,11 @@ public class QueryJobManager extends CompositeService { private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) { QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId())); - if(queryHeartbeat.getTajoWorkerHost() != null) { - queryInfo.setQueryMaster(queryHeartbeat.getTajoWorkerHost()); - queryInfo.setQueryMasterPort(queryHeartbeat.getTajoQueryMasterPort()); - queryInfo.setQueryMasterclientPort(queryHeartbeat.getTajoWorkerClientPort()); - } + WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo()); + + queryInfo.setQueryMaster(connectionInfo.getHost()); + queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort()); + queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort()); queryInfo.setLastMessage(queryHeartbeat.getStatusMessage()); queryInfo.setQueryState(queryHeartbeat.getState()); queryInfo.setProgress(queryHeartbeat.getQueryProgress()); http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index b54675c..b8c39e0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -192,9 +192,8 @@ public class QueryMaster extends CompositeService implements EventHandler { for (TajoMasterProtocol.WorkerResourceProto worker : workers) { try { - if (worker.getPeerRpcPort() == 0) continue; - - rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()), + TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); + rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); @@ -214,9 +213,8 @@ public class QueryMaster extends CompositeService implements EventHandler { for (TajoMasterProtocol.WorkerResourceProto worker : workers) { try { - if (worker.getPeerRpcPort() == 0) continue; - - rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()), + TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); + rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); @@ -299,9 +297,7 @@ public class QueryMaster extends CompositeService implements EventHandler { TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder() - .setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName()) - .setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort()) - .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort()) + .setConnectionInfo(workerContext.getConnectionInfo().getProto()) .setState(state) .setQueryId(queryId.getProto()); @@ -460,9 +456,7 @@ public class QueryMaster extends CompositeService implements EventHandler { private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) { TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder(); - builder.setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName()); - builder.setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort()); - builder.setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort()); + builder.setConnectionInfo(workerContext.getConnectionInfo().getProto()); builder.setState(queryMasterTask.getState()); builder.setQueryId(queryMasterTask.getQueryId().getProto()); http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java index 862dfef..f953995 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java @@ -133,7 +133,7 @@ public class QueryMasterManagerService extends CompositeService ContainerId cid = queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId()); LOG.debug("getTask:" + cid + ", ebId:" + ebId); - queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done)); + queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(request.getWorkerId(), cid, ebId, done)); } } catch (Exception e) { LOG.error(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java index f41fd0e..03c6d30 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java @@ -522,8 +522,8 @@ public class QueryUnit implements EventHandler<TaskEvent> { QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId()); task.successfulAttempt = attemptEvent.getTaskAttemptId(); - task.succeededHost = attempt.getHost(); - task.succeededPullServerPort = attempt.getPullServerPort(); + task.succeededHost = attempt.getWorkerConnectionInfo().getHost(); + task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort(); task.finishTask(); task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED)); @@ -537,7 +537,7 @@ public class QueryUnit implements EventHandler<TaskEvent> { TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId()); task.launchTime = System.currentTimeMillis(); - task.succeededHost = attempt.getHost(); + task.succeededHost = attempt.getWorkerConnectionInfo().getHost(); } } @@ -632,9 +632,12 @@ public class QueryUnit implements EventHandler<TaskEvent> { public static class PullHost implements Cloneable { String host; int port; + int hashCode; + public PullHost(String pullServerAddr, int pullServerPort){ this.host = pullServerAddr; this.port = pullServerPort; + this.hashCode = Objects.hashCode(host, port); } public String getHost() { return host; @@ -650,7 +653,7 @@ public class QueryUnit implements EventHandler<TaskEvent> { @Override public int hashCode() { - return Objects.hashCode(host, port); + return hashCode; } @Override @@ -668,6 +671,7 @@ public class QueryUnit implements EventHandler<TaskEvent> { PullHost newPullHost = (PullHost) super.clone(); newPullHost.host = host; newPullHost.port = port; + newPullHost.hashCode = Objects.hashCode(newPullHost.host, newPullHost.port); return newPullHost; } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java index a4fa12f..db6f130 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java @@ -29,6 +29,7 @@ import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext; import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; @@ -55,8 +56,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { final EventHandler eventHandler; private ContainerId containerId; - private String hostName; - private int port; + private WorkerConnectionInfo workerConnectionInfo; private int expire; private final Lock readLock; @@ -210,30 +210,14 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { return this.queryUnit; } - public String getHost() { - return this.hostName; - } - - public int getPort() { - return this.port; + public WorkerConnectionInfo getWorkerConnectionInfo() { + return this.workerConnectionInfo; } public void setContainerId(ContainerId containerId) { this.containerId = containerId; } - public void setHost(String host) { - this.hostName = host; - } - - public void setPullServerPort(int port) { - this.port = port; - } - - public int getPullServerPort() { - return port; - } - public synchronized void setExpireTime(int expire) { this.expire = expire; } @@ -277,7 +261,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { if (report.getShuffleFileOutputsCount() > 0) { this.getQueryUnit().setShuffleFileOutputs(report.getShuffleFileOutputsList()); - PullHost host = new PullHost(getHost(), getPullServerPort()); + PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort()); for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) { IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(), getId().getId(), p.getPartId(), host, p.getVolume()); @@ -325,8 +309,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { TaskAttemptEvent event) { TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event; taskAttempt.containerId = castEvent.getContainerId(); - taskAttempt.setHost(castEvent.getHostName()); - taskAttempt.setPullServerPort(castEvent.getPullServerPort()); + taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo(); taskAttempt.eventHandler.handle( new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_LAUNCHED)); @@ -415,7 +398,8 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event; taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED)); taskAttempt.addDiagnosticInfo(errorEvent.errorMessage()); - LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getHost() + " >> " + errorEvent.errorMessage()); + LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost() + + " >> " + errorEvent.errorMessage()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java index 2229f04..5d07ff2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java @@ -37,16 +37,16 @@ public class TajoRMContext { final Dispatcher rmDispatcher; /** map between workerIds and running workers */ - private final ConcurrentMap<String, Worker> workers = new ConcurrentHashMap<String, Worker>(); + private final ConcurrentMap<Integer, Worker> workers = Maps.newConcurrentMap(); /** map between workerIds and inactive workers */ - private final ConcurrentMap<String, Worker> inactiveWorkers = new ConcurrentHashMap<String, Worker>(); + private final ConcurrentMap<Integer, Worker> inactiveWorkers = Maps.newConcurrentMap(); /** map between queryIds and query master ContainerId */ private final ConcurrentMap<QueryId, ContainerIdProto> qmContainerMap = Maps.newConcurrentMap(); - private final Set<String> liveQueryMasterWorkerResources = - Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); + private final Set<Integer> liveQueryMasterWorkerResources = + Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>()); private final Set<QueryId> stoppedQueryIds = Collections.newSetFromMap(new ConcurrentHashMap<QueryId, Boolean>()); @@ -62,14 +62,14 @@ public class TajoRMContext { /** * @return The Map for active workers */ - public ConcurrentMap<String, Worker> getWorkers() { + public ConcurrentMap<Integer, Worker> getWorkers() { return workers; } /** * @return The Map for inactive workers */ - public ConcurrentMap<String, Worker> getInactiveWorkers() { + public ConcurrentMap<Integer, Worker> getInactiveWorkers() { return inactiveWorkers; } @@ -81,7 +81,7 @@ public class TajoRMContext { return qmContainerMap; } - public Set<String> getQueryMasterWorker() { + public Set<Integer> getQueryMasterWorker() { return liveQueryMasterWorkerResources; } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java index 4bd7adb..831ce43 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java @@ -28,6 +28,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.rpc.AsyncRpcServer; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; @@ -111,9 +112,9 @@ public class TajoResourceTracker extends AbstractService implements TajoResource /** The response builder */ private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE); - private static WorkerStatusEvent createStatusEvent(String workerKey, NodeHeartbeat heartbeat) { + private static WorkerStatusEvent createStatusEvent(int workerId, NodeHeartbeat heartbeat) { return new WorkerStatusEvent( - workerKey, + workerId, heartbeat.getServerStatus().getRunningTaskNum(), heartbeat.getServerStatus().getJvmHeap().getMaxHeap(), heartbeat.getServerStatus().getJvmHeap().getFreeHeap(), @@ -128,7 +129,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource try { // get a workerId from the heartbeat - String workerId = createWorkerId(heartbeat); + int workerId = heartbeat.getConnectionInfo().getId(); if(rmContext.getWorkers().containsKey(workerId)) { // if worker is running @@ -145,7 +146,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource // create new worker instance Worker newWorker = createWorkerResource(heartbeat); - String newWorkerId = newWorker.getWorkerId(); + int newWorkerId = newWorker.getWorkerId(); // add the new worker to the list of active workers rmContext.getWorkers().putIfAbsent(newWorkerId, newWorker); @@ -178,10 +179,6 @@ public class TajoResourceTracker extends AbstractService implements TajoResource } } - private static final String createWorkerId(NodeHeartbeat heartbeat) { - return heartbeat.getTajoWorkerHost() + ":" + heartbeat.getTajoQueryMasterPort() + ":" + heartbeat.getPeerRpcPort(); - } - private Worker createWorkerResource(NodeHeartbeat request) { boolean queryMasterMode = request.getServerStatus().getQueryMasterMode().getValue(); boolean taskRunnerMode = request.getServerStatus().getTaskRunnerMode().getValue(); @@ -204,14 +201,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource workerResource.setCpuCoreSlots(4); } - Worker worker = new Worker(rmContext, workerResource); - worker.setHostName(request.getTajoWorkerHost()); - worker.setHttpPort(request.getTajoWorkerHttpPort()); - worker.setPeerRpcPort(request.getPeerRpcPort()); - worker.setQueryMasterPort(request.getTajoQueryMasterPort()); - worker.setClientPort(request.getTajoWorkerClientPort()); - worker.setPullServerPort(request.getTajoWorkerPullServerPort()); - return worker; + return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo())); } public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() { @@ -224,7 +214,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource int totalAvailableMemoryMB = 0; synchronized(rmContext) { - for(String eachWorker: rmContext.getWorkers().keySet()) { + for(int eachWorker: rmContext.getWorkers().keySet()) { Worker worker = rmContext.getWorkers().get(eachWorker); WorkerResource resource = worker.getResource(); if(worker != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java index 3915225..0e3ccad 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java @@ -134,7 +134,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke @Override public void handle(WorkerEvent event) { - String workerId = event.getWorkerId(); + int workerId = event.getWorkerId(); Worker node = this.rmContext.getWorkers().get(workerId); if (node != null) { try { @@ -147,16 +147,16 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke } @Override - public Map<String, Worker> getWorkers() { + public Map<Integer, Worker> getWorkers() { return ImmutableMap.copyOf(rmContext.getWorkers()); } @Override - public Map<String, Worker> getInactiveWorkers() { + public Map<Integer, Worker> getInactiveWorkers() { return ImmutableMap.copyOf(rmContext.getInactiveWorkers()); } - public Collection<String> getQueryMasters() { + public Collection<Integer> getQueryMasters() { return Collections.unmodifiableSet(rmContext.getQueryMasterWorker()); } @@ -303,8 +303,8 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke new ArrayList<WorkerAllocatedResource>(); for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) { - NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getHostName(), - allocatedResource.worker.getPeerRpcPort()); + NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getConnectionInfo().getHost(), + allocatedResource.worker.getConnectionInfo().getPeerRpcPort()); TajoWorkerContainerId containerId = new TajoWorkerContainerId(); @@ -315,12 +315,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke ContainerIdProto containerIdProto = containerId.getProto(); allocatedResources.add(WorkerAllocatedResource.newBuilder() .setContainerId(containerIdProto) - .setNodeId(nodeId.toString()) - .setWorkerHost(allocatedResource.worker.getHostName()) - .setQueryMasterPort(allocatedResource.worker.getQueryMasterPort()) - .setClientPort(allocatedResource.worker.getClientPort()) - .setPeerRpcPort(allocatedResource.worker.getPeerRpcPort()) - .setWorkerPullServerPort(allocatedResource.worker.getPullServerPort()) + .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto()) .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB) .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots) .build()); @@ -339,7 +334,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke if(LOG.isDebugEnabled()) { LOG.debug("========================================="); LOG.debug("Available Workers"); - for(String liveWorker: rmContext.getWorkers().keySet()) { + for(int liveWorker: rmContext.getWorkers().keySet()) { LOG.debug(rmContext.getWorkers().get(liveWorker).toString()); } LOG.debug("========================================="); @@ -367,7 +362,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) { synchronized(rmContext) { - List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet()); + List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet()); Collections.shuffle(randomWorkers); int numContainers = resourceRequest.request.getNumContainers(); @@ -377,7 +372,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke resourceRequest.request.getMinDiskSlotPerContainer()); int liveWorkerSize = randomWorkers.size(); - Set<String> insufficientWorkers = new HashSet<String>(); + Set<Integer> insufficientWorkers = new HashSet<Integer>(); boolean stop = false; boolean checkMax = true; while(!stop) { @@ -394,7 +389,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke } int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB; - for(String eachWorker: randomWorkers) { + for(int eachWorker: randomWorkers) { if(allocatedResources >= numContainers) { stop = true; break; @@ -436,7 +431,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke } } else { synchronized(rmContext) { - List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet()); + List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet()); Collections.shuffle(randomWorkers); int numContainers = resourceRequest.request.getNumContainers(); @@ -446,7 +441,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke resourceRequest.request.getMinMemoryMBPerContainer()); int liveWorkerSize = randomWorkers.size(); - Set<String> insufficientWorkers = new HashSet<String>(); + Set<Integer> insufficientWorkers = new HashSet<Integer>(); boolean stop = false; boolean checkMax = true; while(!stop) { @@ -463,7 +458,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke } float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots; - for(String eachWorker: randomWorkers) { + for(int eachWorker: randomWorkers) { if(allocatedResources >= numContainers) { stop = true; break; http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java index de6ee9e..edded4d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.state.*; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import java.util.EnumSet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -39,24 +40,15 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> { /** context of {@link org.apache.tajo.master.rm.TajoWorkerResourceManager} */ private final TajoRMContext rmContext; - /** Hostname */ - private String hostName; - /** QueryMaster rpc port */ - private int qmRpcPort; - /** Peer rpc port */ - private int peerRpcPort; - /** http info port */ - private int httpInfoPort; - /** the port of QueryMaster client rpc which provides an client API */ - private int qmClientPort; - /** pull server port */ - private int pullServerPort; /** last heartbeat time */ private long lastHeartbeatTime; /** Resource capability */ private WorkerResource resource; + /** Worker connection information */ + private WorkerConnectionInfo connectionInfo; + private static final ReconnectNodeTransition RECONNECT_NODE_TRANSITION = new ReconnectNodeTransition(); private static final StatusUpdateTransition STATUS_UPDATE_TRANSITION = new StatusUpdateTransition(); @@ -99,9 +91,10 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> { private final StateMachine<WorkerState, WorkerEventType, WorkerEvent> stateMachine = stateMachineFactory.make(this, WorkerState.NEW); - public Worker(TajoRMContext rmContext, WorkerResource resource) { + public Worker(TajoRMContext rmContext, WorkerResource resource, WorkerConnectionInfo connectionInfo) { this.rmContext = rmContext; + this.connectionInfo = connectionInfo; this.lastHeartbeatTime = System.currentTimeMillis(); this.resource = resource; @@ -110,56 +103,12 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> { this.writeLock = lock.writeLock(); } - public String getWorkerId() { - return hostName + ":" + qmRpcPort + ":" + peerRpcPort; - } - - public String getHostName() { - return hostName; - } - - public void setHostName(String allocatedHost) { - this.hostName = allocatedHost; - } - - public int getPeerRpcPort() { - return peerRpcPort; - } - - public void setPeerRpcPort(int peerRpcPort) { - this.peerRpcPort = peerRpcPort; - } - - public int getQueryMasterPort() { - return qmRpcPort; - } - - public void setQueryMasterPort(int queryMasterPort) { - this.qmRpcPort = queryMasterPort; - } - - public int getClientPort() { - return qmClientPort; - } - - public void setClientPort(int clientPort) { - this.qmClientPort = clientPort; - } - - public int getPullServerPort() { - return pullServerPort; - } - - public void setPullServerPort(int pullServerPort) { - this.pullServerPort = pullServerPort; - } - - public int getHttpPort() { - return httpInfoPort; + public int getWorkerId() { + return connectionInfo.getId(); } - public void setHttpPort(int port) { - this.httpInfoPort = port; + public WorkerConnectionInfo getConnectionInfo() { + return connectionInfo; } public void setLastHeartbeatTime(long lastheartbeatReportTime) { @@ -209,7 +158,7 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> { if(o == null) { return 1; } - return getWorkerId().compareTo(o.getWorkerId()); + return connectionInfo.compareTo(o.connectionInfo); } public static class AddNodeTransition implements SingleArcTransition<Worker, WorkerEvent> { http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java index 389c3be..c208990 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java @@ -24,14 +24,14 @@ import org.apache.hadoop.yarn.event.AbstractEvent; * WorkerEvent describes all kinds of events which sent to {@link Worker}. */ public class WorkerEvent extends AbstractEvent<WorkerEventType> { - private final String workerId; + private final int workerId; - public WorkerEvent(String workerId, WorkerEventType workerEventType) { + public WorkerEvent(int workerId, WorkerEventType workerEventType) { super(workerEventType); this.workerId = workerId; } - public String getWorkerId() { + public int getWorkerId() { return workerId; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java index e3524d6..2751886 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java @@ -30,7 +30,7 @@ import org.apache.tajo.conf.TajoConf; * It periodically checks the latest heartbeat time of {@link Worker}. * If the latest heartbeat time is expired, it produces EXPIRE event to a corresponding {@link Worker}. */ -public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<String> { +public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<Integer> { private EventHandler dispatcher; @@ -50,7 +50,7 @@ public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<String> { } @Override - protected void expire(String id) { + protected void expire(Integer id) { dispatcher.handle(new WorkerEvent(id, WorkerEventType.EXPIRE)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java index 46f286d..3828b6a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java @@ -24,7 +24,7 @@ package org.apache.tajo.master.rm; */ public class WorkerReconnectEvent extends WorkerEvent { private final Worker worker; - public WorkerReconnectEvent(String workerId, Worker worker) { + public WorkerReconnectEvent(int workerId, Worker worker) { super(workerId, WorkerEventType.RECONNECTED); this.worker = worker; } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java index 54fe11c..8e8ac51 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java @@ -86,13 +86,13 @@ public interface WorkerResourceManager extends Service { * * @return a Map instance containing active workers */ - public Map<String, Worker> getWorkers(); + public Map<Integer, Worker> getWorkers(); /** * * @return a Map instance containing inactive workers */ - public Map<String, Worker> getInactiveWorkers(); + public Map<Integer, Worker> getInactiveWorkers(); public void stop(); @@ -106,5 +106,5 @@ public interface WorkerResourceManager extends Service { * * @return WorkerIds on which QueryMasters are running */ - Collection<String> getQueryMasters(); + Collection<Integer> getQueryMasters(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java index 8c3d7c1..f1ab401 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java @@ -28,7 +28,7 @@ public class WorkerStatusEvent extends WorkerEvent { private final long freeHeap; private final long totalHeap; - public WorkerStatusEvent(String workerId, int runningTaskNum, long maxHeap, long freeHeap, long totalHeap) { + public WorkerStatusEvent(int workerId, int runningTaskNum, long maxHeap, long freeHeap, long totalHeap) { super(workerId, WorkerEventType.STATE_UPDATE); this.runningTaskNum = runningTaskNum; this.maxHeap = maxHeap; http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java index 55aa8c4..ca71c53 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java @@ -22,10 +22,25 @@ import com.google.common.collect.Maps; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tajo.master.ContainerProxy; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import java.util.Map; +import java.util.concurrent.ConcurrentMap; public abstract class AbstractResourceAllocator extends CompositeService implements ResourceAllocator { + /** + * A key is worker id, and a value is a worker connection information. + */ + protected ConcurrentMap<Integer, WorkerConnectionInfo> workerInfoMap = Maps.newConcurrentMap(); + + public WorkerConnectionInfo getWorkerConnectionInfo(int workerId) { + return workerInfoMap.get(workerId); + } + + public void addWorkerConnectionInfo(WorkerConnectionInfo connectionInfo) { + workerInfoMap.putIfAbsent(connectionInfo.getId(), connectionInfo); + } + private Map<ContainerId, ContainerProxy> containers = Maps.newConcurrentMap(); public AbstractResourceAllocator() { http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index d4b9861..1ec8a88 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -33,12 +33,14 @@ import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageUtil; +import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.Pair; import org.apache.tajo.worker.event.TaskRunnerStartEvent; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; @@ -79,6 +81,7 @@ public class ExecutionBlockContext { private TajoQueryEngine queryEngine; private RpcConnectionPool connPool; private InetSocketAddress qmMasterAddr; + private WorkerConnectionInfo queryMaster; private TajoConf systemConf; // for the doAs block private UserGroupInformation taskOwner; @@ -92,12 +95,12 @@ public class ExecutionBlockContext { private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap(); - public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, InetSocketAddress queryMaster) + public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, WorkerConnectionInfo queryMaster) throws Throwable { this.manager = manager; this.executionBlockId = event.getExecutionBlockId(); this.connPool = RpcConnectionPool.getPool(manager.getTajoConf()); - this.qmMasterAddr = queryMaster; + this.queryMaster = queryMaster; this.systemConf = manager.getTajoConf(); this.reporter = new Reporter(); this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); @@ -118,6 +121,7 @@ public class ExecutionBlockContext { LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR)); LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR)); + this.qmMasterAddr = NetUtils.createSocketAddr(queryMaster.getHost(), queryMaster.getQueryMasterPort()); LOG.info("QueryMaster Address:" + qmMasterAddr); UserGroupInformation.setConfiguration(systemConf); @@ -329,8 +333,8 @@ public class ExecutionBlockContext { intermediateBuilder.clear(); intermediateBuilder.setEbId(ebId.getProto()) - .setHost(getWorkerContext().getTajoWorkerManagerService().getBindAddr().getHostName() + ":" + - getWorkerContext().getPullServerPort()) + .setHost(getWorkerContext().getConnectionInfo().getHost() + ":" + + getWorkerContext().getConnectionInfo().getPullServerPort()) .setTaskId(-1) .setAttemptId(-1) .setPartId(eachShuffle.getPartId()) http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 2cc8f0c..2220089 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -34,6 +34,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.*; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.event.ContainerAllocationEvent; import org.apache.tajo.master.event.ContainerAllocatorEventType; import org.apache.tajo.master.event.SubQueryContainerAllocationEvent; @@ -129,6 +130,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { LOG.warn(e.getMessage()); } } + + workerInfoMap.clear(); super.stop(); } @@ -325,8 +328,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { List<Container> containers = new ArrayList<Container>(); for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) { TajoWorkerContainer container = new TajoWorkerContainer(); - NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getWorkerHost(), - eachAllocatedResource.getPeerRpcPort()); + NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(), + eachAllocatedResource.getConnectionInfo().getPeerRpcPort()); TajoWorkerContainerId containerId = new TajoWorkerContainerId(); @@ -343,14 +346,10 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB()); workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots()); - Worker worker = new Worker(null, workerResource); - worker.setHostName(nodeId.getHost()); - worker.setPeerRpcPort(nodeId.getPort()); - worker.setQueryMasterPort(eachAllocatedResource.getQueryMasterPort()); - worker.setPullServerPort(eachAllocatedResource.getWorkerPullServerPort()); - + Worker worker = new Worker(null, workerResource, + new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo())); container.setWorkerResource(worker); - + addWorkerConnectionInfo(worker.getConnectionInfo()); containers.add(container); } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index a8d661b..280fc2b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -32,8 +32,10 @@ import org.apache.hadoop.yarn.util.RackResolver; import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.CatalogClient; import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.common.exception.NotImplementedException; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.ha.TajoMasterInfo; import org.apache.tajo.master.querymaster.QueryMaster; import org.apache.tajo.master.querymaster.QueryMasterManagerService; @@ -94,12 +96,13 @@ public class TajoWorker extends CompositeService { private TajoPullServerService pullService; - private int pullServerPort; - + @Deprecated private boolean yarnContainerMode; + @Deprecated private boolean queryMasterMode; + @Deprecated private boolean taskRunnerMode; private WorkerHeartbeatService workerHeartbeatThread; @@ -110,7 +113,7 @@ public class TajoWorker extends CompositeService { private TajoMasterProtocol.ClusterResourceSummary clusterResource; - private int httpPort; + private WorkerConnectionInfo connectionInfo; private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); @@ -199,71 +202,52 @@ public class TajoWorker extends CompositeService { this.dispatcher = new AsyncDispatcher(); addIfService(dispatcher); + tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort); + addIfService(tajoWorkerManagerService); + // querymaster worker tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort); - addService(tajoWorkerClientService); + addIfService(tajoWorkerClientService); queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort); - addService(queryMasterManagerService); + addIfService(queryMasterManagerService); // taskrunner worker taskRunnerManager = new TaskRunnerManager(workerContext, dispatcher); addService(taskRunnerManager); - tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort); - addService(tajoWorkerManagerService); - - if(!yarnContainerMode) { - if(taskRunnerMode && !TajoPullServerService.isStandalone()) { - pullService = new TajoPullServerService(); - addService(pullService); - } + workerHeartbeatThread = new WorkerHeartbeatService(workerContext); + addIfService(workerHeartbeatThread); - if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { - try { - httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort(); - if(queryMasterMode && !taskRunnerMode) { - //If QueryMaster and TaskRunner run on single host, http port conflicts - httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort(); - } - webServer = StaticHttpServer.getInstance(this ,"worker", null, httpPort , - true, null, systemConf, null); - webServer.start(); - httpPort = webServer.getPort(); - LOG.info("Worker info server started:" + httpPort); - - deletionService = new DeletionService(getMountPath().size(), 0); - if(systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)){ - getWorkerContext().cleanupTemporalDirectories(); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } + int httpPort = 0; + if(taskRunnerMode && !TajoPullServerService.isStandalone()) { + pullService = new TajoPullServerService(); + addIfService(pullService); } - LOG.info("Tajo Worker started: queryMaster=" + queryMasterMode + " taskRunner=" + taskRunnerMode + - ", qmRpcPort=" + qmManagerPort + - ",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort + - ", peerRpcPort=" + peerRpcPort + ":" + qmManagerPort + ",httpPort" + httpPort); + if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { + httpPort = initWebServer(); + } super.serviceInit(conf); - tajoMasterInfo = new TajoMasterInfo(); - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); + int pullServerPort; + if(pullService != null){ + pullServerPort = pullService.getPort(); } else { - tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars - .TAJO_MASTER_UMBILICAL_RPC_ADDRESS))); - tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars - .RESOURCE_TRACKER_RPC_ADDRESS))); + pullServerPort = getStandAlonePullServerPort(); } - connectToCatalog(); - workerHeartbeatThread = new WorkerHeartbeatService(workerContext); - workerHeartbeatThread.init(conf); - addIfService(workerHeartbeatThread); + this.connectionInfo = new WorkerConnectionInfo( + tajoWorkerManagerService.getBindAddr().getHostName(), + tajoWorkerManagerService.getBindAddr().getPort(), + pullServerPort, + tajoWorkerClientService.getBindAddr().getPort(), + queryMasterManagerService.getBindAddr().getPort(), + httpPort); + + LOG.info("Tajo Worker is initialized. \r\nQueryMaster=" + queryMasterMode + " TaskRunner=" + taskRunnerMode + + " connection :" + connectionInfo.toString()); try { hashShuffleAppenderManager = new HashShuffleAppenderManager(systemConf); @@ -300,14 +284,57 @@ public class TajoWorker extends CompositeService { }); } + private int initWebServer() { + int httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort(); + try { + if (queryMasterMode && !taskRunnerMode) { + //If QueryMaster and TaskRunner run on single host, http port conflicts + httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort(); + } + webServer = StaticHttpServer.getInstance(this, "worker", null, httpPort, + true, null, systemConf, null); + webServer.start(); + httpPort = webServer.getPort(); + LOG.info("Worker info server started:" + httpPort); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + return httpPort; + } + + private void initCleanupService() throws IOException { + deletionService = new DeletionService(getMountPath().size(), 0); + if (systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)) { + getWorkerContext().cleanupTemporalDirectories(); + } + } + public WorkerContext getWorkerContext() { return workerContext; } @Override public void serviceStart() throws Exception { + + tajoMasterInfo = new TajoMasterInfo(); + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); + } else { + tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars + .TAJO_MASTER_UMBILICAL_RPC_ADDRESS))); + tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars + .RESOURCE_TRACKER_RPC_ADDRESS))); + } + connectToCatalog(); + + if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { + initCleanupService(); + } + initWorkerMetrics(); super.serviceStart(); + LOG.info("Tajo Worker is started"); } @Override @@ -319,7 +346,7 @@ public class TajoWorker extends CompositeService { if(webServer != null) { try { webServer.stop(); - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); } } @@ -336,7 +363,7 @@ public class TajoWorker extends CompositeService { if(webServer != null && webServer.isAlive()) { try { webServer.stop(); - } catch (Exception e) { + } catch (Throwable e) { } } @@ -381,15 +408,19 @@ public class TajoWorker extends CompositeService { return catalogClient; } - public int getHttpPort() { - return httpPort; + public TajoPullServerService getPullService() { + return pullService; + } + + public WorkerConnectionInfo getConnectionInfo() { + return connectionInfo; } public String getWorkerName() { if (queryMasterMode) { return getQueryMasterManagerService().getHostAndPort(); } else { - return getTajoWorkerManagerService().getHostAndPort(); + return connectionInfo.getHostAndPeerRpcPort(); } } @@ -444,6 +475,7 @@ public class TajoWorker extends CompositeService { } } + @Deprecated public boolean isYarnContainerMode() { return yarnContainerMode; } @@ -503,45 +535,20 @@ public class TajoWorker extends CompositeService { public HashShuffleAppenderManager getHashShuffleAppenderManager() { return hashShuffleAppenderManager; } - - public int getPullServerPort() { - if (pullService != null) { - long startTime = System.currentTimeMillis(); - while (true) { - int pullServerPort = pullService.getPort(); - if (pullServerPort > 0) { - return pullServerPort; - } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - if (System.currentTimeMillis() - startTime > 30 * 1000) { - LOG.fatal("TajoWorker stopped cause can't get PullServer port."); - System.exit(-1); - } - } - } else { - if (pullServerPort != 0) { - return pullServerPort; - } else { - loadPullServerPort(); - return pullServerPort; - } - } - } } - private void loadPullServerPort() { - // get pull server port + private int getStandAlonePullServerPort() { long startTime = System.currentTimeMillis(); + int pullServerPort; + + //wait for pull server bring up while (true) { pullServerPort = TajoPullServerService.readPullServerPort(); if (pullServerPort > 0) { break; } try { - Thread.sleep(1000); + Thread.sleep(500); } catch (InterruptedException e) { } if (System.currentTimeMillis() - startTime > 30 * 1000) { @@ -549,6 +556,7 @@ public class TajoWorker extends CompositeService { System.exit(-1); } } + return pullServerPort; } public void stopWorkerForce() { http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index d25013c..fb4f861 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -54,7 +54,7 @@ public class TajoWorkerClientService extends AbstractService { private BlockingRpcServer rpcServer; private InetSocketAddress bindAddr; - private String addr; + private int port; private TajoConf conf; private TajoWorker.WorkerContext workerContext; @@ -88,14 +88,12 @@ public class TajoWorkerClientService extends AbstractService { this.rpcServer.start(); this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress()); - this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort(); - this.port = bindAddr.getPort(); } catch (Exception e) { LOG.error(e.getMessage(), e); } // Get the master address - LOG.info(TajoWorkerClientService.class.getSimpleName() + " is bind to " + addr); + LOG.info(TajoWorkerClientService.class.getSimpleName() + " is bind to " + bindAddr); super.init(conf); }
