http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java index 472ce1b..48f4f66 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java @@ -32,6 +32,7 @@ import org.apache.tajo.TajoIdProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.rpc.AsyncRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.util.NetUtils; @@ -46,7 +47,6 @@ public class TajoWorkerManagerService extends CompositeService private AsyncRpcServer rpcServer; private InetSocketAddress bindAddr; - private String addr; private int port; private TajoWorker.WorkerContext workerContext; @@ -74,14 +74,12 @@ public class TajoWorkerManagerService extends CompositeService this.rpcServer.start(); this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress()); - this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort(); - this.port = bindAddr.getPort(); } catch (Exception e) { LOG.error(e.getMessage(), e); } // Get the master address - LOG.info("TajoWorkerManagerService is bind to " + addr); + LOG.info("TajoWorkerManagerService is bind to " + bindAddr); tajoConf.setVar(TajoConf.ConfVars.WORKER_PEER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddr)); super.init(tajoConf); } @@ -104,10 +102,6 @@ public class TajoWorkerManagerService extends CompositeService return bindAddr; } - public String getHostAndPort() { - return bindAddr.getHostName() + ":" + bindAddr.getPort(); - } - @Override public void ping(RpcController controller, TajoIdProtos.QueryUnitAttemptIdProto attemptId, @@ -122,24 +116,11 @@ public class TajoWorkerManagerService extends CompositeService workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc(); try { - - String[] params = new String[7]; - params[0] = "standby"; //mode(never used) - params[1] = request.getExecutionBlockId().toString(); - // NodeId has a form of hostname:port. - params[2] = request.getNodeId(); - params[3] = request.getContainerId(); - - // QueryMaster's address - params[4] = request.getQueryMasterHost(); - params[5] = String.valueOf(request.getQueryMasterPort()); - params[6] = request.getQueryOutputPath(); - - ExecutionBlockId executionBlockId = new ExecutionBlockId(request.getExecutionBlockId()); workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent( - params - , executionBlockId, - new QueryContext(workerContext.getConf(), request.getQueryContext()), + new WorkerConnectionInfo(request.getQueryMaster()) + , new ExecutionBlockId(request.getExecutionBlockId()) + , request.getContainerId() + , new QueryContext(workerContext.getConf(), request.getQueryContext()), request.getPlanJson() )); done.run(TajoWorker.TRUE_PROTO);
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index c9c83d1..66e0f87 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -307,7 +307,7 @@ public class Task { public TaskStatusProto getReport() { TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); - builder.setWorkerName(executionBlockContext.getTaskRunner(taskRunnerId).getNodeId().toString()); + builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort()); builder.setId(context.getTaskId().getProto()) .setProgress(context.getProgress()) .setState(context.getState()); @@ -323,6 +323,7 @@ public class Task { public boolean isRunning(){ return context.getState() == TaskAttemptState.TA_RUNNING; } + public boolean isProgressChanged() { return context.isProgressChanged(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index ea8ed82..e4771a6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tajo.ExecutionBlockId; @@ -53,7 +52,6 @@ public class TaskRunner extends AbstractService { private volatile boolean stopped = false; private Path baseDirPath; - private NodeId nodeId; private ContainerId containerId; // for Fetcher @@ -69,7 +67,7 @@ public class TaskRunner extends AbstractService { private TaskRunnerHistory history; - public TaskRunner(ExecutionBlockContext executionBlockContext, String[] args) { + public TaskRunner(ExecutionBlockContext executionBlockContext, String containerId) { super(TaskRunner.class.getName()); ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); @@ -78,16 +76,7 @@ public class TaskRunner extends AbstractService { this.fetchLauncher = Executors.newFixedThreadPool( systemConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM), fetcherFactory); try { - // QueryBlockId from String - // NodeId has a form of hostname:port. - this.nodeId = ConverterUtils.toNodeId(args[2]); - this.containerId = ConverterUtils.toContainerId(args[3]); - - - // QueryMaster's address - //String host = args[4]; - //int port = Integer.parseInt(args[5]); - + this.containerId = ConverterUtils.toContainerId(containerId); this.executionBlockContext = executionBlockContext; this.history = executionBlockContext.createTaskRunnerHistory(this); this.history.setState(getServiceState()); @@ -101,10 +90,6 @@ public class TaskRunner extends AbstractService { return getId(getContext().getExecutionBlockId(), containerId); } - public NodeId getNodeId(){ - return nodeId; - } - public ContainerId getContainerId(){ return containerId; } @@ -212,6 +197,7 @@ public class TaskRunner extends AbstractService { GetTaskRequestProto request = GetTaskRequestProto.newBuilder() .setExecutionBlockId(getExecutionBlockId().getProto()) .setContainerId(((ContainerIdPBImpl) containerId).getProto()) + .setWorkerId(getContext().getWorkerContext().getConnectionInfo().getId()) .build(); qmClientService.getTask(null, request, callFuture); http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index c3713d1..faadf58 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -37,7 +37,6 @@ import org.jboss.netty.util.HashedWheelTimer; import org.jboss.netty.util.Timer; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -167,14 +166,10 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< if (event instanceof TaskRunnerStartEvent) { TaskRunnerStartEvent startEvent = (TaskRunnerStartEvent) event; ExecutionBlockContext context = executionBlockContextMap.get(event.getExecutionBlockId()); - String[] params = startEvent.getParams(); + if(context == null){ try { - // QueryMaster's address - String host = params[4]; - int port = Integer.parseInt(params[5]); - - context = new ExecutionBlockContext(this, startEvent, new InetSocketAddress(host, port)); + context = new ExecutionBlockContext(this, startEvent, startEvent.getQueryMaster()); } catch (Throwable e) { LOG.fatal(e.getMessage(), e); throw new RuntimeException(e); @@ -182,7 +177,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< executionBlockContextMap.put(event.getExecutionBlockId(), context); } - TaskRunner taskRunner = new TaskRunner(context, params); + TaskRunner taskRunner = new TaskRunner(context, startEvent.getContainerId()); LOG.info("Start TaskRunner:" + taskRunner.getId()); taskRunnerMap.put(taskRunner.getId(), taskRunner); taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory()); http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index 47f2261..6a90f74 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -19,6 +19,7 @@ package org.apache.tajo.worker; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,7 +38,6 @@ import org.apache.tajo.storage.v2.DiskUtil; import org.apache.tajo.util.HAServiceUtil; import java.io.File; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -68,30 +68,38 @@ public class WorkerHeartbeatService extends AbstractService { } @Override - public void serviceInit(Configuration conf) { + public void serviceInit(Configuration conf) throws Exception { Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance."); this.systemConf = (TajoConf) conf; connectionPool = RpcConnectionPool.getPool(systemConf); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { thread = new WorkerHeartbeatThread(); thread.start(); - super.init(conf); + super.serviceStart(); } @Override - public void serviceStop() { - thread.stopped.set(true); + public void serviceStop() throws Exception { + if(thread.stopped.getAndSet(true)){ + return; + } + synchronized (thread) { thread.notifyAll(); } - super.stop(); + + super.serviceStop(); } class WorkerHeartbeatThread extends Thread { private volatile AtomicBoolean stopped = new AtomicBoolean(false); TajoMasterProtocol.ServerStatusProto.System systemInfo; - List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = - new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>(); + List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = Lists.newArrayList(); float workerDiskSlots; int workerMemoryMB; List<DiskDeviceInfo> diskDeviceInfos; @@ -140,26 +148,6 @@ public class WorkerHeartbeatService extends AbstractService { public void run() { LOG.info("Worker Resource Heartbeat Thread start."); int sendDiskInfoCount = 0; - int pullServerPort = 0; - - String hostName = null; - int peerRpcPort = 0; - int queryMasterPort = 0; - int clientPort = 0; - - if(context.getTajoWorkerManagerService() != null) { - hostName = context.getTajoWorkerManagerService().getBindAddr().getHostName(); - peerRpcPort = context.getTajoWorkerManagerService().getBindAddr().getPort(); - } - if(context.getQueryMasterManagerService() != null) { - hostName = context.getQueryMasterManagerService().getBindAddr().getHostName(); - queryMasterPort = context.getQueryMasterManagerService().getBindAddr().getPort(); - } - if(context.getTajoWorkerClientService() != null) { - clientPort = context.getTajoWorkerClientService().getBindAddr().getPort(); - } - - pullServerPort = context.getPullServerPort(); while(!stopped.get()) { if(sendDiskInfoCount == 0 && diskDeviceInfos != null) { @@ -185,12 +173,7 @@ public class WorkerHeartbeatService extends AbstractService { .build(); NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder() - .setTajoWorkerHost(hostName) - .setTajoQueryMasterPort(queryMasterPort) - .setPeerRpcPort(peerRpcPort) - .setTajoWorkerClientPort(clientPort) - .setTajoWorkerHttpPort(context.getHttpPort()) - .setTajoWorkerPullServerPort(pullServerPort) + .setConnectionInfo(context.getConnectionInfo().getProto()) .setServerStatus(serverStatus) .build(); @@ -241,8 +224,10 @@ public class WorkerHeartbeatService extends AbstractService { } try { - synchronized (WorkerHeartbeatThread.this){ - wait(10 * 1000); + if(!stopped.get()){ + synchronized (thread){ + thread.wait(10 * 1000); + } } } catch (InterruptedException e) { break; http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java index 8c9fa51..ff63754 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java @@ -20,25 +20,33 @@ package org.apache.tajo.worker.event; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; public class TaskRunnerStartEvent extends TaskRunnerEvent { private final QueryContext queryContext; - private final String[] params; + private final WorkerConnectionInfo queryMaster; + private final String containerId; private final String plan; - public TaskRunnerStartEvent(String[] params, + public TaskRunnerStartEvent(WorkerConnectionInfo queryMaster, ExecutionBlockId executionBlockId, + String containerId, QueryContext context, String plan) { super(EventType.START, executionBlockId); - this.params = params; + this.queryMaster = queryMaster; + this.containerId = containerId; this.queryContext = context; this.plan = plan; } - public String[] getParams(){ - return this.params; + public WorkerConnectionInfo getQueryMaster() { + return queryMaster; + } + + public String getContainerId() { + return containerId; } public QueryContext getQueryContext() { http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/proto/ResourceTrackerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto index d46d09a..b117cac 100644 --- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto +++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto @@ -23,16 +23,12 @@ option java_generic_services = true; option java_generate_equals_and_hash = true; import "TajoMasterProtocol.proto"; +import "tajo_protos.proto"; message NodeHeartbeat { - required string tajoWorkerHost = 1; - required int32 peerRpcPort = 2; - required int32 tajoQueryMasterPort = 3; - optional ServerStatusProto serverStatus = 4; - optional int32 tajoWorkerClientPort = 5; - optional string statusMessage = 6; - optional int32 tajoWorkerPullServerPort = 7; - optional int32 tajoWorkerHttpPort = 8; + required WorkerConnectionInfoProto connectionInfo = 1; + optional ServerStatusProto serverStatus = 2; + optional string statusMessage = 3; } service TajoResourceTrackerProtocolService { http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/proto/TajoMasterProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto index 8fccbaf..7283543 100644 --- a/tajo-core/src/main/proto/TajoMasterProtocol.proto +++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto @@ -60,14 +60,12 @@ message ServerStatusProto { } message TajoHeartbeat { - required string tajoWorkerHost = 1; - required int32 tajoQueryMasterPort = 2; - optional int32 tajoWorkerClientPort = 3; - optional QueryIdProto queryId = 4; - optional QueryState state = 5; - optional string statusMessage = 6; - optional float queryProgress = 7; - optional int64 queryFinishTime = 8; + required WorkerConnectionInfoProto connectionInfo = 1; + optional QueryIdProto queryId = 2; + optional QueryState state = 3; + optional string statusMessage = 4; + optional float queryProgress = 5; + optional int64 queryFinishTime = 6; } message TajoHeartbeatResponse { @@ -110,12 +108,9 @@ message WorkerResourceAllocationRequest { } message WorkerResourceProto { - required string host = 1; - required int32 peerRpcPort = 2; - required int32 queryMasterPort = 3; - required int32 infoPort = 4; - required int32 memoryMB = 5 ; - required float diskSlots = 6; + required WorkerConnectionInfoProto connectionInfo = 1; + required int32 memoryMB = 2 ; + required float diskSlots = 3; } message WorkerResourcesRequest { @@ -129,15 +124,10 @@ message WorkerResourceReleaseRequest { message WorkerAllocatedResource { required hadoop.yarn.ContainerIdProto containerId = 1; - required string nodeId = 2; - required string workerHost = 3; - required int32 peerRpcPort = 4; - required int32 queryMasterPort = 5; - required int32 clientPort = 6; - required int32 workerPullServerPort = 7; - - required int32 allocatedMemoryMB = 8; - required float allocatedDiskSlots = 9; + required WorkerConnectionInfoProto connectionInfo = 2; + + required int32 allocatedMemoryMB = 3; + required float allocatedDiskSlots = 4; } message WorkerResourceAllocationResponse { http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/proto/TajoWorkerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index dff2733..bde2459 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -168,8 +168,9 @@ message QueryExecutionRequestProto { } message GetTaskRequestProto { - required hadoop.yarn.ContainerIdProto containerId = 1; - required ExecutionBlockIdProto executionBlockId = 2; + required int32 workerId = 1; + required hadoop.yarn.ContainerIdProto containerId = 2; + required ExecutionBlockIdProto executionBlockId = 3; } enum ShuffleType { @@ -202,14 +203,13 @@ message DataChannelProto { message RunExecutionBlockRequestProto { required ExecutionBlockIdProto executionBlockId = 1; - required string queryMasterHost = 2; - required int32 queryMasterPort = 3; - required string nodeId = 4; - required string containerId = 5; - optional string queryOutputPath = 6; - - required KeyValueSetProto queryContext = 7; - required string planJson = 8; + required WorkerConnectionInfoProto queryMaster = 2; + required string nodeId = 3; + required string containerId = 4; + optional string queryOutputPath = 5; + + required KeyValueSetProto queryContext = 6; + required string planJson = 7; } message ExecutionBlockListProto { http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/admin/cluster.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp index 0317759..6fe21a2 100644 --- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp +++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp @@ -20,20 +20,21 @@ <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ page import="org.apache.tajo.master.TajoMaster" %> +<%@ page import="org.apache.tajo.master.cluster.WorkerConnectionInfo" %> <%@ page import="org.apache.tajo.master.ha.HAService" %> <%@ page import="org.apache.tajo.master.ha.TajoMasterInfo" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="org.apache.tajo.master.rm.WorkerResource" %> <%@ page import="org.apache.tajo.master.rm.WorkerState" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> +<%@ page import="org.apache.tajo.util.TUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="java.util.*" %> -<%@ page import="org.apache.tajo.util.TUtil" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); - Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers(); - List<String> wokerKeys = new ArrayList<String>(workers.keySet()); + Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers(); + List<Integer> wokerKeys = new ArrayList<Integer>(workers.keySet()); Collections.sort(wokerKeys); int runningQueryMasterTasks = 0; @@ -175,17 +176,19 @@ <% int no = 1; for(Worker queryMaster: liveQueryMasters) { - WorkerResource resource = queryMaster.getResource(); - String queryMasterHttp = "http://" + queryMaster.getHostName() + ":" + queryMaster.getHttpPort() + "/index.jsp"; + WorkerResource resource = queryMaster.getResource(); + WorkerConnectionInfo connectionInfo = queryMaster.getConnectionInfo(); + String queryMasterHttp = "http://" + connectionInfo.getHost() + + ":" + connectionInfo.getHttpInfoPort() + "/index.jsp"; %> <tr> - <td width='30' align='right'><%=no++%></td> - <td><a href='<%=queryMasterHttp%>'><%=queryMaster.getHostName() + ":" + queryMaster.getQueryMasterPort()%></a></td> - <td width='100' align='center'><%=queryMaster.getClientPort()%></td> - <td width='200' align='right'><%=resource.getNumQueryMasterTasks()%></td> - <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td> - <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeatTime(), System.currentTimeMillis())%></td> - <td width='100' align='center'><%=queryMaster.getState()%></td> + <td width='30' align='right'><%=no++%></td> + <td><a href='<%=queryMasterHttp%>'><%=connectionInfo.getHost() + ":" + connectionInfo.getQueryMasterPort()%></a></td> + <td width='100' align='center'><%=connectionInfo.getClientPort()%></td> + <td width='200' align='right'><%=resource.getNumQueryMasterTasks()%></td> + <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td> + <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeatTime(), System.currentTimeMillis())%></td> + <td width='100' align='center'><%=queryMaster.getState()%></td> </tr> <% } //end fo for @@ -210,7 +213,7 @@ %> <tr> <td width='30' align='right'><%=no++%></td> - <td><%=queryMaster.getHostName() + ":" + queryMaster.getQueryMasterPort()%></td> + <td><%=queryMaster.getConnectionInfo().getHost() + ":" + queryMaster.getConnectionInfo().getQueryMasterPort()%></td> </tr> <% } //end fo for @@ -236,19 +239,20 @@ <% int no = 1; for(Worker worker: liveWorkers) { - WorkerResource resource = worker.getResource(); - String workerHttp = "http://" + worker.getHostName() + ":" + worker.getHttpPort() + "/index.jsp"; + WorkerResource resource = worker.getResource(); + WorkerConnectionInfo connectionInfo = worker.getConnectionInfo(); + String workerHttp = "http://" + connectionInfo.getHost() + ":" + connectionInfo.getHttpInfoPort() + "/index.jsp"; %> <tr> - <td width='30' align='right'><%=no++%></td> - <td><a href='<%=workerHttp%>'><%=worker.getHostName() + ":" + worker.getPeerRpcPort()%></a></td> - <td width='80' align='center'><%=worker.getPullServerPort()%></td> - <td width='100' align='right'><%=resource.getNumRunningTasks()%></td> - <td width='150' align='center'><%=resource.getUsedMemoryMB()%>/<%=resource.getMemoryMB()%></td> - <td width='100' align='center'><%=resource.getUsedDiskSlots()%>/<%=resource.getDiskSlots()%></td> - <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td> - <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeatTime(), System.currentTimeMillis())%></td> - <td width='100' align='center'><%=worker.getState()%></td> + <td width='30' align='right'><%=no++%></td> + <td><a href='<%=workerHttp%>'><%=connectionInfo.getHostAndPeerRpcPort()%></a></td> + <td width='80' align='center'><%=connectionInfo.getPullServerPort()%></td> + <td width='100' align='right'><%=resource.getNumRunningTasks()%></td> + <td width='150' align='center'><%=resource.getUsedMemoryMB()%>/<%=resource.getMemoryMB()%></td> + <td width='100' align='center'><%=resource.getUsedDiskSlots()%>/<%=resource.getDiskSlots()%></td> + <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td> + <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeatTime(), System.currentTimeMillis())%></td> + <td width='100' align='center'><%=worker.getState()%></td> </tr> <% } //end fo for @@ -279,7 +283,7 @@ %> <tr> <td width='30' align='right'><%=no++%></td> - <td><%=worker.getHostName() + ":" + worker.getPeerRpcPort()%></td> + <td><%=worker.getConnectionInfo().getHostAndPeerRpcPort()%></td> </tr> <% } //end fo for http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/admin/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp index 7ab1482..ce4d7dc 100644 --- a/tajo-core/src/main/resources/webapps/admin/index.jsp +++ b/tajo-core/src/main/resources/webapps/admin/index.jsp @@ -38,8 +38,8 @@ <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); - Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers(); - Map<String, Worker> inactiveWorkers = master.getContext().getResourceManager().getInactiveWorkers(); + Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers(); + Map<Integer, Worker> inactiveWorkers = master.getContext().getResourceManager().getInactiveWorkers(); int numWorkers = 0; int numLiveWorkers = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/admin/query.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp index e7b402f..9ddc90c 100644 --- a/tajo-core/src/main/resources/webapps/admin/query.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query.jsp @@ -43,17 +43,17 @@ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers(); + Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers(); Map<String, Integer> portMap = new HashMap<String, Integer>(); - Collection<String> queryMasters = master.getContext().getResourceManager().getQueryMasters(); + Collection<Integer> queryMasters = master.getContext().getResourceManager().getQueryMasters(); if (queryMasters == null || queryMasters.isEmpty()) { queryMasters = master.getContext().getResourceManager().getWorkers().keySet(); } - for(String eachQueryMasterKey: queryMasters) { + for(int eachQueryMasterKey: queryMasters) { Worker queryMaster = workers.get(eachQueryMasterKey); if(queryMaster != null) { - portMap.put(queryMaster.getHostName(), queryMaster.getHttpPort()); + portMap.put(queryMaster.getConnectionInfo().getHost(), queryMaster.getConnectionInfo().getHttpInfoPort()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/worker/querytasks.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp index 1a325da..6e74b99 100644 --- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp +++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp @@ -67,10 +67,10 @@ List<TajoMasterProtocol.WorkerResourceProto> allWorkers = tajoWorker.getWorkerContext() .getQueryMasterManagerService().getQueryMaster().getAllWorker(); - Map<String, TajoMasterProtocol.WorkerResourceProto> workerMap = new HashMap<String, TajoMasterProtocol.WorkerResourceProto>(); + Map<Integer, TajoMasterProtocol.WorkerResourceProto> workerMap = new HashMap<Integer, TajoMasterProtocol.WorkerResourceProto>(); if(allWorkers != null) { for(TajoMasterProtocol.WorkerResourceProto eachWorker: allWorkers) { - workerMap.put(eachWorker.getHost(), eachWorker); + workerMap.put(eachWorker.getConnectionInfo().getId(), eachWorker); } } QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext() @@ -201,12 +201,13 @@ String queryUnitHost = eachQueryUnit.getSucceededHost() == null ? "-" : eachQueryUnit.getSucceededHost(); if(eachQueryUnit.getSucceededHost() != null) { - TajoMasterProtocol.WorkerResourceProto worker = workerMap.get(eachQueryUnit.getSucceededHost()); + TajoMasterProtocol.WorkerResourceProto worker = + workerMap.get(eachQueryUnit.getLastAttempt().getWorkerConnectionInfo().getId()); if(worker != null) { QueryUnitAttempt lastAttempt = eachQueryUnit.getLastAttempt(); if(lastAttempt != null) { QueryUnitAttemptId lastAttemptId = lastAttempt.getId(); - queryUnitHost = "<a href='http://" + eachQueryUnit.getSucceededHost() + ":" + worker.getInfoPort() + "/taskdetail.jsp?queryUnitAttemptId=" + lastAttemptId + "'>" + eachQueryUnit.getSucceededHost() + "</a>"; + queryUnitHost = "<a href='http://" + eachQueryUnit.getSucceededHost() + ":" + worker.getConnectionInfo().getHttpInfoPort() + "/taskdetail.jsp?queryUnitAttemptId=" + lastAttemptId + "'>" + eachQueryUnit.getSucceededHost() + "</a>"; } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp index e20ab03..d84664f 100644 --- a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp +++ b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp @@ -22,7 +22,9 @@ <%@ page import="org.apache.commons.lang.StringUtils" %> <%@ page import="org.apache.tajo.QueryUnitAttemptId" %> <%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %> +<%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.TajoIdUtils" %> +<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="org.apache.tajo.worker.*" %> <%@ page import="java.text.SimpleDateFormat" %> <%@ page import="java.util.List" %> http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/worker/tasks.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/tasks.jsp b/tajo-core/src/main/resources/webapps/worker/tasks.jsp index b5fb9d7..ae05047 100644 --- a/tajo-core/src/main/resources/webapps/worker/tasks.jsp +++ b/tajo-core/src/main/resources/webapps/worker/tasks.jsp @@ -65,9 +65,9 @@ <tr><th>Id</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr> <% if (taskRunner != null) { - TaskRunner.TaskRunnerContext taskRunnerContext = taskRunner.getContext(); + ExecutionBlockContext context = taskRunner.getContext(); - for (Map.Entry<QueryUnitAttemptId, Task> entry : taskRunnerContext.getTasks().entrySet()) { + for (Map.Entry<QueryUnitAttemptId, Task> entry : context.getTasks().entrySet()) { QueryUnitAttemptId queryUnitId = entry.getKey(); TaskHistory eachTask = entry.getValue().createTaskHistory(); %> http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java b/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java new file mode 100644 index 0000000..03be125 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.cluster; + +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class TestWorkerConnectionInfo { + + @Test + public void testWorkerId() { + WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + WorkerConnectionInfo worker2 = new WorkerConnectionInfo("host2", 28091, 28092, 21000, 28093, 28080); + + assertNotEquals(worker.getId(), worker2.getId()); + assertEquals(worker.getId(), new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080).getId()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java index 09d674a..0423894 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java @@ -24,6 +24,7 @@ import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoMasterProtocol.*; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.junit.Test; @@ -94,11 +95,10 @@ public class TestTajoResourceManager { .setRunningTaskNum(0) .build(); + WorkerConnectionInfo connectionInfo = + new WorkerConnectionInfo("host" + (i + 1), 28091, 28092, 21000 + i, 28093, 28080); NodeHeartbeat tajoHeartbeat = NodeHeartbeat.newBuilder() - .setTajoWorkerHost("host" + (i + 1)) - .setTajoQueryMasterPort(21000) - .setTajoWorkerHttpPort(28080 + i) - .setPeerRpcPort(12345) + .setConnectionInfo(connectionInfo.getProto()) .setServerStatus(serverStatus) .build(); http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 720f0ca..3fa67ae 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -215,11 +215,10 @@ public class TajoPullServerService extends AbstractService { selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum); localFS = new LocalFileSystem(); - super.init(conf); - this.getConfig().setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname + conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname , TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal); - + super.init(conf); LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength); } catch (Throwable t) { LOG.error(t); @@ -228,8 +227,7 @@ public class TajoPullServerService extends AbstractService { // TODO change AbstractService to throw InterruptedException @Override - public synchronized void start() { - Configuration conf = getConfig(); + public synchronized void serviceInit(Configuration conf) throws Exception { ServerBootstrap bootstrap = new ServerBootstrap(selector); try { @@ -248,11 +246,11 @@ public class TajoPullServerService extends AbstractService { conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port)); pipelineFact.PullServer.setPort(port); LOG.info(getName() + " listening on port " + port); - super.start(); sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); + if (STANDALONE) { File pullServerPortFile = getPullServerPortFile(); if (pullServerPortFile.exists()) { @@ -272,6 +270,7 @@ public class TajoPullServerService extends AbstractService { IOUtils.closeStream(out); } } + super.serviceInit(conf); LOG.info("TajoPullServerService started: port=" + port); } @@ -487,9 +486,7 @@ public class TajoPullServerService extends AbstractService { } ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString()); - synchronized(processingStatusMap) { - processingStatusMap.put(request.getUri().toString(), processingStatus); - } + processingStatusMap.put(request.getUri().toString(), processingStatus); // Parsing the URL into key-values final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).getParameters();
