http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEvent.java new file mode 100644 index 0000000..856c572 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEvent.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +public class SchedulerEvent extends AbstractEvent<SchedulerEventType> { + public SchedulerEvent(SchedulerEventType type) { + super(type); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEventType.java new file mode 100644 index 0000000..93fa032 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/event/SchedulerEventType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.scheduler.event; + +public enum SchedulerEventType { + + // consumer: Scheduler + RESOURCE_RESERVE, + RESOURCE_UPDATE +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java index 229a80a..200d689 100644 --- a/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java +++ b/tajo-core/src/main/java/org/apache/tajo/metrics/WorkerResourceMetricsGaugeSet.java @@ -22,8 +22,8 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricSet; import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.master.rm.Worker; -import org.apache.tajo.master.rm.WorkerState; +import org.apache.tajo.master.rm.NodeStatus; +import org.apache.tajo.master.rm.NodeState; import java.util.HashMap; import java.util.Map; @@ -40,31 +40,31 @@ public class WorkerResourceMetricsGaugeSet implements MetricSet { metricsMap.put("totalWorkers", new Gauge<Integer>() { @Override public Integer getValue() { - return tajoMasterContext.getResourceManager().getWorkers().size(); + return tajoMasterContext.getResourceManager().getNodes().size(); } }); metricsMap.put("liveWorkers", new Gauge<Integer>() { @Override public Integer getValue() { - return getNumWorkers(WorkerState.RUNNING); + return getNumWorkers(NodeState.RUNNING); } }); metricsMap.put("deadWorkers", new Gauge<Integer>() { @Override public Integer getValue() { - return getNumWorkers(WorkerState.LOST); + return getNumWorkers(NodeState.LOST); } }); return metricsMap; } - protected int getNumWorkers(WorkerState status) { + protected int getNumWorkers(NodeState status) { int numWorkers = 0; - for(Worker eachWorker: tajoMasterContext.getResourceManager().getWorkers().values()) { - if(eachWorker.getState() == status) { + for(NodeStatus eachNodeStatus : tajoMasterContext.getResourceManager().getNodes().values()) { + if(eachNodeStatus.getState() == status) { numWorkers++; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java index e45f274..8636eaa 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/AbstractTaskScheduler.java @@ -18,17 +18,21 @@ package org.apache.tajo.querymaster; +import com.google.common.collect.Sets; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tajo.master.event.TaskRequestEvent; import org.apache.tajo.master.event.TaskSchedulerEvent; +import java.util.Set; + public abstract class AbstractTaskScheduler extends AbstractService implements EventHandler<TaskSchedulerEvent> { protected int hostLocalAssigned; protected int rackLocalAssigned; protected int totalAssigned; + protected int cancellation; + protected Set<String> leafTaskHosts = Sets.newHashSet(); /** * Construct the service. @@ -51,6 +55,15 @@ public abstract class AbstractTaskScheduler extends AbstractService implements E return totalAssigned; } - public abstract void handleTaskRequestEvent(TaskRequestEvent event); + public int getCancellation() { + return cancellation; + } + + public abstract void releaseTaskAttempt(TaskAttempt taskAttempt); public abstract int remainingScheduledObjectNum(); + + + public Set<String> getLeafTaskHosts(){ + return leafTaskHosts; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index 939de60..32e4219 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -20,56 +20,69 @@ package org.apache.tajo.querymaster; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.RackResolver; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryIdFactory; import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.TaskRequest; import org.apache.tajo.engine.query.TaskRequestImpl; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.ContainerProxy; import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.plan.serder.LogicalNodeSerializer; -import org.apache.tajo.plan.serder.PlanProto; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.rpc.*; +import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.FetchImpl; +import java.net.InetSocketAddress; import java.util.*; import java.util.Map.Entry; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import static org.apache.tajo.ResourceProtos.*; public class DefaultTaskScheduler extends AbstractTaskScheduler { private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class); + private static final String REQUEST_MAX_NUM = "tajo.qm.task-scheduler.request.max-num"; + private final TaskSchedulerContext context; private Stage stage; + private TajoConf tajoConf; private Thread schedulingThread; - private AtomicBoolean stopEventHandling = new AtomicBoolean(false); + private volatile boolean isStopped; private ScheduledRequests scheduledRequests; - private TaskRequests taskRequests; + private int minTaskMemory; private int nextTaskId = 0; private int scheduledObjectNum = 0; + private boolean isLeaf; + private int schedulerDelay; + private int maximumRequestContainer; + + //candidate workers for locality of high priority + private Set<Integer> candidateWorkers = Sets.newHashSet(); public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) { super(DefaultTaskScheduler.class.getName()); @@ -79,30 +92,47 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { @Override public void init(Configuration conf) { - + tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); scheduledRequests = new ScheduledRequests(); - taskRequests = new TaskRequests(); - + minTaskMemory = tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY); + schedulerDelay= tajoConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_TASK_SCHEDULER_DELAY); super.init(conf); } @Override public void start() { LOG.info("Start TaskScheduler"); + maximumRequestContainer = tajoConf.getInt(REQUEST_MAX_NUM, stage.getContext().getWorkerMap().size() * 2); + isLeaf = stage.getMasterPlan().isLeaf(stage.getBlock()); + + if (isLeaf) { + candidateWorkers.addAll(getWorkerIds(getLeafTaskHosts())); + } else { + //find assigned hosts for Non-Leaf locality in children executionBlock + List<ExecutionBlock> executionBlockList = stage.getMasterPlan().getChilds(stage.getBlock()); + for (ExecutionBlock executionBlock : executionBlockList) { + Stage childStage = stage.getContext().getStage(executionBlock.getId()); + candidateWorkers.addAll(childStage.getAssignedWorkerMap().keySet()); + } + } this.schedulingThread = new Thread() { public void run() { - while(!stopEventHandling.get() && !Thread.currentThread().isInterrupted()) { + while (!isStopped && !Thread.currentThread().isInterrupted()) { + try { - synchronized (schedulingThread){ - schedulingThread.wait(100); - } schedule(); } catch (InterruptedException e) { - break; + if (isStopped) { + break; + } else { + LOG.fatal(e.getMessage(), e); + stage.abort(StageState.ERROR); + } } catch (Throwable e) { LOG.fatal(e.getMessage(), e); + stage.abort(StageState.ERROR); break; } } @@ -114,41 +144,17 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { super.start(); } - private static final TaskAttemptId NULL_ATTEMPT_ID; - public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq; - static { - ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); - NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0); - - TajoWorkerProtocol.TaskRequestProto.Builder builder = - TajoWorkerProtocol.TaskRequestProto.newBuilder(); - builder.setId(NULL_ATTEMPT_ID.getProto()); - builder.setShouldDie(true); - builder.setOutputTable(""); - builder.setPlan(PlanProto.LogicalNodeTree.newBuilder()); - builder.setClusteredOutput(false); - stopTaskRunnerReq = builder.build(); - } - @Override public void stop() { - if(stopEventHandling.getAndSet(true)){ - return; - } + isStopped = true; if (schedulingThread != null) { synchronized (schedulingThread) { - schedulingThread.notifyAll(); + schedulingThread.interrupt(); } } - - // Return all of request callbacks instantly. - if(taskRequests != null){ - for (TaskRequestEvent req : taskRequests.taskRequestQueue) { - req.getCallback().run(stopTaskRunnerReq); - } - } - + candidateWorkers.clear(); + scheduledRequests.clear(); LOG.info("Task Scheduler stopped"); super.stop(); } @@ -156,34 +162,38 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private Fragment[] fragmentsForNonLeafTask; private Fragment[] broadcastFragmentsForNonLeafTask; - LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>(); - public void schedule() { - - if (taskRequests.size() > 0) { - if (scheduledRequests.leafTaskNum() > 0) { - LOG.debug("Try to schedule tasks with taskRequestEvents: " + - taskRequests.size() + ", LeafTask Schedule Request: " + - scheduledRequests.leafTaskNum()); - taskRequests.getTaskRequests(taskRequestEvents, - scheduledRequests.leafTaskNum()); - LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents "); - if (taskRequestEvents.size() > 0) { - scheduledRequests.assignToLeafTasks(taskRequestEvents); - taskRequestEvents.clear(); + public void schedule() throws Exception{ + try { + if (remainingScheduledObjectNum() == 0) { + // all task is done, wait for stopping message + synchronized (schedulingThread) { + schedulingThread.wait(500); } - } - } + } else { + LinkedList<TaskRequestEvent> taskRequests = createTaskRequest(); - if (taskRequests.size() > 0) { - if (scheduledRequests.nonLeafTaskNum() > 0) { - LOG.debug("Try to schedule tasks with taskRequestEvents: " + - taskRequests.size() + ", NonLeafTask Schedule Request: " + - scheduledRequests.nonLeafTaskNum()); - taskRequests.getTaskRequests(taskRequestEvents, - scheduledRequests.nonLeafTaskNum()); - scheduledRequests.assignToNonLeafTasks(taskRequestEvents); - taskRequestEvents.clear(); + if (taskRequests.size() == 0) { + synchronized (schedulingThread) { + schedulingThread.wait(schedulerDelay); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Get " + taskRequests.size() + " taskRequestEvents "); + } + + if (isLeaf) { + if (scheduledRequests.leafTaskNum() > 0) { + scheduledRequests.assignToLeafTasks(taskRequests); + } + } else { + if (scheduledRequests.nonLeafTaskNum() > 0) { + scheduledRequests.assignToNonLeafTasks(taskRequests); + } + } + } } + } catch (TimeoutException e) { + LOG.error(e.getMessage()); } } @@ -251,19 +261,55 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } - @Override - public void handleTaskRequestEvent(TaskRequestEvent event) { - - taskRequests.handle(event); - int hosts = scheduledRequests.leafTaskHostMapping.size(); + private Set<Integer> getWorkerIds(Collection<String> hosts){ + Set<Integer> workerIds = Sets.newHashSet(); + if(hosts.isEmpty()) return workerIds; - // if available cluster resource are large then tasks, the scheduler thread are working immediately. - if(remainingScheduledObjectNum() > 0 && - (remainingScheduledObjectNum() <= hosts || hosts <= taskRequests.size())){ - synchronized (schedulingThread){ - schedulingThread.notifyAll(); + for (WorkerConnectionInfo worker : stage.getContext().getWorkerMap().values()) { + if(hosts.contains(worker.getHost())){ + workerIds.add(worker.getId()); } } + return workerIds; + } + + + protected LinkedList<TaskRequestEvent> createTaskRequest() throws Exception { + LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>(); + + //If scheduled tasks is long-term task, cluster resource can be the worst load balance. + //This part is to throttle the maximum required container per request + int requestContainerNum = Math.min(remainingScheduledObjectNum(), maximumRequestContainer); + if (LOG.isDebugEnabled()) { + LOG.debug("Try to schedule task resources: " + requestContainerNum); + } + + ServiceTracker serviceTracker = + context.getMasterContext().getQueryMasterContext().getWorkerContext().getServiceTracker(); + NettyClientBase tmClient = RpcClientManager.getInstance(). + getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); + QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); + + CallFuture<NodeResourceResponse> callBack = new CallFuture<NodeResourceResponse>(); + NodeResourceRequest.Builder request = NodeResourceRequest.newBuilder(); + request.setCapacity(NodeResources.createResource(minTaskMemory, isLeaf ? 1 : 0).getProto()) + .setNumContainers(requestContainerNum) + .setPriority(stage.getPriority()) + .setQueryId(context.getMasterContext().getQueryId().getProto()) + .setType(isLeaf ? ResourceType.LEAF : ResourceType.INTERMEDIATE) + .setUserId(context.getMasterContext().getQueryContext().getUser()) + .setRunningTasks(stage.getTotalScheduledObjectsCount() - stage.getCompletedTaskCount()) + .addAllCandidateNodes(candidateWorkers) + .setQueue(context.getMasterContext().getQueryContext().get("queue", "default")); //TODO set queue + + masterClientService.reserveNodeResources(callBack.getController(), request.build(), callBack); + NodeResourceResponse response = callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + for (AllocationResourceProto resource : response.getResourceList()) { + taskRequestEvents.add(new TaskRequestEvent(resource.getWorkerId(), resource, context.getBlockId())); + } + + return taskRequestEvents; } @Override @@ -271,43 +317,16 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { return scheduledObjectNum; } - private class TaskRequests implements EventHandler<TaskRequestEvent> { - private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue = - new LinkedBlockingQueue<TaskRequestEvent>(); - - @Override - public void handle(TaskRequestEvent event) { - if(LOG.isDebugEnabled()){ - LOG.debug("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId()); - } + public void releaseTaskAttempt(TaskAttempt taskAttempt) { + if (taskAttempt.isLeafTask() && taskAttempt.getWorkerConnectionInfo() != null) { - if(stopEventHandling.get()) { - event.getCallback().run(stopTaskRunnerReq); - return; + HostVolumeMapping mapping = + scheduledRequests.leafTaskHostMapping.get(taskAttempt.getWorkerConnectionInfo().getHost()); + if (mapping != null && mapping.lastAssignedVolumeId.containsKey(taskAttempt.getId())) { + mapping.decreaseConcurrency(mapping.lastAssignedVolumeId.remove(taskAttempt.getId())); } - int qSize = taskRequestQueue.size(); - if (qSize != 0 && qSize % 1000 == 0) { - LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize); - } - int remCapacity = taskRequestQueue.remainingCapacity(); - if (remCapacity < 1000) { - LOG.warn("Very low remaining capacity in the event-queue " - + "of DefaultTaskScheduler: " + remCapacity); - } - - taskRequestQueue.add(event); - } - - public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests, - int num) { - taskRequestQueue.drainTo(taskRequests, num); - } - - public int size() { - return taskRequestQueue.size(); } } - /** * One worker can have multiple running task runners. <code>HostVolumeMapping</code> * describes various information for one worker, including : @@ -342,8 +361,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private Map<Integer, LinkedHashSet<TaskAttempt>> unassignedTaskForEachVolume = Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<TaskAttempt>>()); /** A value is last assigned volume id for each task runner */ - private HashMap<TajoContainerId, Integer> lastAssignedVolumeId = new HashMap<TajoContainerId, - Integer>(); + private HashMap<TaskAttemptId, Integer> lastAssignedVolumeId = Maps.newHashMap(); /** * A key is disk volume id, and a value is the load of this volume. * This load is measured by counting how many number of tasks are running. @@ -383,24 +401,18 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { * 2. unknown block or Non-splittable task in host * 3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null */ - public synchronized TaskAttemptId getLocalTask(TajoContainerId containerId) { - int volumeId; + public synchronized TaskAttemptId getLocalTask() { + int volumeId = getLowestVolumeId(); TaskAttemptId taskAttemptId = null; - if (!lastAssignedVolumeId.containsKey(containerId)) { - volumeId = getLowestVolumeId(); - increaseConcurrency(containerId, volumeId); - } else { - volumeId = lastAssignedVolumeId.get(containerId); - } - + increaseConcurrency(volumeId); if (unassignedTaskForEachVolume.size() > 0) { int retry = unassignedTaskForEachVolume.size(); do { //clean and get a remaining local task taskAttemptId = getAndRemove(volumeId); if(!unassignedTaskForEachVolume.containsKey(volumeId)) { - decreaseConcurrency(containerId); + decreaseConcurrency(volumeId); if (volumeId > REMOTE) { diskVolumeLoads.remove(volumeId); } @@ -409,7 +421,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { if (taskAttemptId == null) { //reassign next volume volumeId = getLowestVolumeId(); - increaseConcurrency(containerId, volumeId); + increaseConcurrency(volumeId); retry--; } else { break; @@ -418,6 +430,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } else { this.remainTasksNum.set(0); } + + lastAssignedVolumeId.put(taskAttemptId, volumeId); return taskAttemptId; } @@ -490,11 +504,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { /** * Increase the count of running tasks and disk loads for a certain task runner. * - * @param containerId The task runner identifier * @param volumeId Volume identifier * @return the volume load (i.e., how many running tasks use this volume) */ - private synchronized int increaseConcurrency(TajoContainerId containerId, int volumeId) { + private synchronized int increaseConcurrency(int volumeId) { int concurrency = 1; if (diskVolumeLoads.containsKey(volumeId)) { @@ -512,16 +525,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { + ", Remote Concurrency : " + concurrency); } diskVolumeLoads.put(volumeId, concurrency); - lastAssignedVolumeId.put(containerId, volumeId); return concurrency; } /** * Decrease the count of running tasks of a certain task runner */ - private synchronized void decreaseConcurrency(TajoContainerId containerId){ - Integer volumeId = lastAssignedVolumeId.get(containerId); - if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){ + private synchronized void decreaseConcurrency(int volumeId){ + if(diskVolumeLoads.containsKey(volumeId)){ Integer concurrency = diskVolumeLoads.get(volumeId); if(concurrency > 0){ diskVolumeLoads.put(volumeId, concurrency - 1); @@ -531,7 +542,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } } - lastAssignedVolumeId.remove(containerId); } /** @@ -557,19 +567,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } - public boolean isAssigned(TajoContainerId containerId){ - return lastAssignedVolumeId.containsKey(containerId); - } - - public boolean isRemote(TajoContainerId containerId){ - Integer volumeId = lastAssignedVolumeId.get(containerId); - if(volumeId == null || volumeId > REMOTE){ - return false; - } else { - return true; - } - } - public int getRemoteConcurrency(){ return getVolumeConcurrency(REMOTE); } @@ -585,7 +582,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } public String getHost() { - return host; } @@ -594,6 +590,25 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } + public void cancel(TaskAttempt taskAttempt) { + + if(taskAttempt.isLeafTask()) { + List<DataLocation> locations = taskAttempt.getTask().getDataLocations(); + + for (DataLocation location : locations) { + HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost()); + volumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt); + } + + scheduledRequests.leafTasks.add(taskAttempt.getId()); + } else { + scheduledRequests.nonLeafTasks.add(taskAttempt.getId()); + } + + context.getMasterContext().getEventHandler().handle( + new TaskAttemptEvent(taskAttempt.getId(), TaskAttemptEventType.TA_ASSIGN_CANCEL)); + } + private class ScheduledRequests { // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner @@ -603,12 +618,20 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private Map<String, HostVolumeMapping> leafTaskHostMapping = Maps.newConcurrentMap(); private final Map<String, HashSet<TaskAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap(); - private synchronized void addLeafTask(TaskAttemptToSchedulerEvent event) { + protected void clear() { + leafTasks.clear(); + nonLeafTasks.clear(); + leafTaskHostMapping.clear(); + leafTasksRackMapping.clear(); + } + + private void addLeafTask(TaskAttemptToSchedulerEvent event) { TaskAttempt taskAttempt = event.getTaskAttempt(); List<DataLocation> locations = taskAttempt.getTask().getDataLocations(); for (DataLocation location : locations) { String host = location.getHost(); + leafTaskHosts.add(host); HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); if (hostVolumeMapping == null) { @@ -650,14 +673,12 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { return nonLeafTasks.size(); } - public Set<TaskAttemptId> assignedRequest = new HashSet<TaskAttemptId>(); - - private TaskAttemptId allocateLocalTask(String host, TajoContainerId containerId){ + private TaskAttemptId allocateLocalTask(String host){ HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); i++) { - TaskAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId); + TaskAttemptId attemptId = hostVolumeMapping.getLocalTask(); if(attemptId == null) break; //find remaining local task @@ -736,8 +757,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { rackLocalAssigned++; totalAssigned++; - LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%, Rack host: %s", - hostLocalAssigned, rackLocalAssigned, totalAssigned, + LOG.info(String.format("Assigned Local/Rack/Cancel/Total: (%d/%d/%d/%d), Locality: %.2f%%, Rack host: %s", + hostLocalAssigned, rackLocalAssigned, cancellation, totalAssigned, ((double) hostLocalAssigned / (double) totalAssigned) * 100, host)); } @@ -747,6 +768,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { public void assignToLeafTasks(LinkedList<TaskRequestEvent> taskRequests) { Collections.shuffle(taskRequests); LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>(); + String queryMasterHostAndPort = context.getMasterContext().getQueryMasterContext().getWorkerContext(). + getConnectionInfo().getHostAndQMPort(); TaskRequestEvent taskRequest; while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) { @@ -757,58 +780,61 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { // checking if this container is still alive. // If not, ignore the task request and stop the task runner - ContainerProxy container = context.getMasterContext().getResourceAllocator() - .getContainer(taskRequest.getContainerId()); - if(container == null) { - taskRequest.getCallback().run(stopTaskRunnerReq); - continue; - } + WorkerConnectionInfo connectionInfo = context.getMasterContext().getWorkerMap().get(taskRequest.getWorkerId()); + if(connectionInfo == null) continue; // getting the hostname of requested node - WorkerConnectionInfo connectionInfo = - context.getMasterContext().getResourceAllocator().getWorkerConnectionInfo(taskRequest.getWorkerId()); String host = connectionInfo.getHost(); // if there are no worker matched to the hostname a task request - if(!leafTaskHostMapping.containsKey(host)){ + if (!leafTaskHostMapping.containsKey(host) && !taskRequests.isEmpty()) { String normalizedHost = NetUtils.normalizeHost(host); - if(!leafTaskHostMapping.containsKey(normalizedHost) && !taskRequests.isEmpty()){ + if (!leafTaskHostMapping.containsKey(normalizedHost)) { // this case means one of either cases: // * there are no blocks which reside in this node. // * all blocks which reside in this node are consumed, and this task runner requests a remote task. // In this case, we transfer the task request to the remote task request list, and skip the followings. remoteTaskRequests.add(taskRequest); continue; + } else { + host = normalizedHost; } } - TajoContainerId containerId = taskRequest.getContainerId(); - LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," + - "containerId=" + containerId); + if (LOG.isDebugEnabled()) { + LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," + + "worker=" + connectionInfo.getHostAndPeerRpcPort()); + } ////////////////////////////////////////////////////////////////////// // disk or host-local allocation ////////////////////////////////////////////////////////////////////// - TaskAttemptId attemptId = allocateLocalTask(host, containerId); + TaskAttemptId attemptId = allocateLocalTask(host); if (attemptId == null) { // if a local task cannot be found HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); - if(hostVolumeMapping != null) { - if(!hostVolumeMapping.isRemote(containerId)){ - // assign to remote volume - hostVolumeMapping.decreaseConcurrency(containerId); - hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE); - } - // this part is remote concurrency management of a tail tasks - int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1); + if(!taskRequests.isEmpty()) { //if other requests remains, move to remote list for better locality + remoteTaskRequests.add(taskRequest); + candidateWorkers.remove(connectionInfo.getId()); + continue; - if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){ - //release container - hostVolumeMapping.decreaseConcurrency(containerId); - taskRequest.getCallback().run(stopTaskRunnerReq); - continue; + } else { + if(hostVolumeMapping != null) { + int nodes = context.getMasterContext().getWorkerMap().size(); + //this part is to control the assignment of tail and remote task balancing per node + int tailLimit = 1; + if (remainingScheduledObjectNum() > 0) { + tailLimit = Math.max(remainingScheduledObjectNum() / nodes, 1); + } + + if (hostVolumeMapping.getRemoteConcurrency() >= tailLimit) { //remote task throttling per node + continue; + } else { + // assign to remote volume + hostVolumeMapping.increaseConcurrency(HostVolumeMapping.REMOTE); + } } } @@ -816,6 +842,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { // rack-local allocation ////////////////////////////////////////////////////////////////////// attemptId = allocateRackTask(host); + if (attemptId != null && hostVolumeMapping != null) { + hostVolumeMapping.lastAssignedVolumeId.put(attemptId, HostVolumeMapping.REMOTE); + } ////////////////////////////////////////////////////////////////////// // random node allocation @@ -826,8 +855,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { leafTasks.remove(attemptId); rackLocalAssigned++; totalAssigned++; - LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,", - hostLocalAssigned, rackLocalAssigned, totalAssigned, + LOG.info(String.format("Assigned Local/Remote/Cancel/Total: (%d/%d/%d/%d), Locality: %.2f%%,", + hostLocalAssigned, rackLocalAssigned, cancellation, totalAssigned, ((double) hostLocalAssigned / (double) totalAssigned) * 100)); } } @@ -842,17 +871,51 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { false, LogicalNodeSerializer.serialize(task.getLogicalPlan()), context.getMasterContext().getQueryContext(), - stage.getDataChannel(), stage.getBlock().getEnforcer()); + stage.getDataChannel(), stage.getBlock().getEnforcer(), + queryMasterHostAndPort); + if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { taskAssign.setInterQuery(); } - context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, - taskRequest.getContainerId(), connectionInfo)); - assignedRequest.add(attemptId); + //TODO send batch request + BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder(); + requestProto.addTaskRequest(TaskAllocationProto.newBuilder() + .setResource(taskRequest.getResponseProto().getResource()) + .setTaskRequest(taskAssign.getProto()).build()); + + requestProto.setExecutionBlockId(attemptId.getTaskId().getExecutionBlockId().getProto()); + context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, connectionInfo)); + InetSocketAddress addr = stage.getAssignedWorkerMap().get(connectionInfo.getId()); + if (addr == null) addr = new InetSocketAddress(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()); + + AsyncRpcClient tajoWorkerRpc = null; + CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>(); + try { + tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); + tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(), callFuture); + + BatchAllocationResponse responseProto = + callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + if (responseProto.getCancellationTaskCount() > 0) { + for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) { + cancel(task.getAttempt(new TaskAttemptId(proto.getTaskRequest().getId()))); + cancellation++; + } + + if(LOG.isDebugEnabled()) { + LOG.debug("Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); + } + continue; + } + } catch (Exception e) { + LOG.error(e); + } scheduledObjectNum--; - taskRequest.getCallback().run(taskAssign.getProto()); + } else { throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!"); } @@ -874,6 +937,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { public void assignToNonLeafTasks(LinkedList<TaskRequestEvent> taskRequests) { Collections.shuffle(taskRequests); + String queryMasterHostAndPort = context.getMasterContext().getQueryMasterContext().getWorkerContext(). + getConnectionInfo().getHostAndQMPort(); TaskRequestEvent taskRequest; while (!taskRequests.isEmpty()) { @@ -891,6 +956,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { Task task; task = stage.getTask(attemptId.getTaskId()); + TaskRequest taskAssign = new TaskRequestImpl( attemptId, Lists.newArrayList(task.getAllFragments()), @@ -899,7 +965,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { LogicalNodeSerializer.serialize(task.getLogicalPlan()), context.getMasterContext().getQueryContext(), stage.getDataChannel(), - stage.getBlock().getEnforcer()); + stage.getBlock().getEnforcer(), + queryMasterHostAndPort); + if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { taskAssign.setInterQuery(); } @@ -912,13 +980,49 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } - WorkerConnectionInfo connectionInfo = context.getMasterContext().getResourceAllocator(). - getWorkerConnectionInfo(taskRequest.getWorkerId()); - context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, - taskRequest.getContainerId(), connectionInfo)); - taskRequest.getCallback().run(taskAssign.getProto()); - totalAssigned++; - scheduledObjectNum--; + WorkerConnectionInfo connectionInfo = + context.getMasterContext().getWorkerMap().get(taskRequest.getWorkerId()); + + //TODO send batch request + BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder(); + requestProto.addTaskRequest(TaskAllocationProto.newBuilder() + .setResource(taskRequest.getResponseProto().getResource()) + .setTaskRequest(taskAssign.getProto()).build()); + + requestProto.setExecutionBlockId(attemptId.getTaskId().getExecutionBlockId().getProto()); + context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, connectionInfo)); + + CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>(); + + InetSocketAddress addr = stage.getAssignedWorkerMap().get(connectionInfo.getId()); + if (addr == null) addr = new InetSocketAddress(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()); + + AsyncRpcClient tajoWorkerRpc; + try { + tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); + tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(), callFuture); + + BatchAllocationResponse + responseProto = callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + if(responseProto.getCancellationTaskCount() > 0) { + for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) { + cancel(task.getAttempt(new TaskAttemptId(proto.getTaskRequest().getId()))); + cancellation++; + } + + if(LOG.isDebugEnabled()) { + LOG.debug("Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); + } + continue; + } + + totalAssigned++; + scheduledObjectNum--; + } catch (Exception e) { + LOG.error(e); + } } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 9d5838d..6fc4ea9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -174,7 +174,16 @@ public class Query implements EventHandler<QueryEvent> { QueryEventType.KILL, QUERY_COMPLETED_TRANSITION) - // Transitions from FAILED state + // Transitions from KILLED state + // ignore-able transitions + .addTransition(QueryState.QUERY_KILLED, QueryState.QUERY_KILLED, + EnumSet.of(QueryEventType.START, QueryEventType.QUERY_COMPLETED, + QueryEventType.KILL, QueryEventType.INTERNAL_ERROR)) + .addTransition(QueryState.QUERY_KILLED, QueryState.QUERY_ERROR, + QueryEventType.INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + + // Transitions from FAILED state .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED, QueryEventType.DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) @@ -305,7 +314,6 @@ public class Query implements EventHandler<QueryEvent> { queryHistory.setQueryId(getId().toString()); queryHistory.setQueryMaster(context.getQueryMasterContext().getWorkerContext().getWorkerName()); queryHistory.setHttpPort(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getHttpInfoPort()); - queryHistory.setLogicalPlan(plan.toString()); queryHistory.setLogicalPlan(plan.getLogicalPlan().toString()); queryHistory.setDistributedPlan(plan.toString()); @@ -704,8 +712,17 @@ public class Query implements EventHandler<QueryEvent> { !executeNextBlock(query, castEvent.getExecutionBlockId())) { return; } - // if a query is completed due to finished, kill, failure, or error - query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState())); + + //wait for stages is completed + if (query.completedStagesCount >= query.stages.size()) { + // if a query is completed due to finished, kill, failure, or error + query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState())); + } + LOG.info(String.format("Complete Stage[%s], State: %s, %d/%d. ", + castEvent.getExecutionBlockId().toString(), + castEvent.getState().toString(), + query.completedStagesCount, + query.stages.size())); } catch (Throwable t) { LOG.error(t.getMessage(), t); query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR)); http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index 6c5bd22..e07b43f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -18,8 +18,8 @@ package org.apache.tajo.querymaster; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.commons.collections.map.LRUMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -29,19 +29,24 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tajo.*; +import org.apache.tajo.QueryId; + +import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryCoordinatorProtocol; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; -import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService; +import org.apache.tajo.ResourceProtos.TajoHeartbeatRequest; +import org.apache.tajo.ResourceProtos.TajoHeartbeatResponse; +import org.apache.tajo.ResourceProtos.WorkerConnectionsResponse; import org.apache.tajo.master.event.QueryStartEvent; import org.apache.tajo.master.event.QueryStopEvent; import org.apache.tajo.rpc.*; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.service.ServiceTracker; -import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.util.history.HistoryReader; import org.apache.tajo.util.history.QueryHistory; import org.apache.tajo.worker.TajoWorker; @@ -52,7 +57,6 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; public class QueryMaster extends CompositeService implements EventHandler { private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName()); @@ -69,11 +73,12 @@ public class QueryMaster extends CompositeService implements EventHandler { private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap(); - private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = Maps.newConcurrentMap(); + private final LRUMap + finishedQueryMasterTasksCache = new LRUMap(HistoryReader.DEFAULT_PAGE_SIZE); private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread; - private AtomicBoolean queryMasterStop = new AtomicBoolean(false); + private volatile boolean isStopped; private QueryMasterContext queryMasterContext; @@ -89,44 +94,37 @@ public class QueryMaster extends CompositeService implements EventHandler { private ExecutorService eventExecutor; + private ExecutorService singleEventExecutor; + public QueryMaster(TajoWorker.WorkerContext workerContext) { super(QueryMaster.class.getName()); this.workerContext = workerContext; } - public void init(Configuration conf) { - LOG.info("QueryMaster init"); - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("conf should be a TajoConf type"); - } - try { - this.systemConf = (TajoConf)conf; - this.manager = RpcClientManager.getInstance(); + @Override + public void serviceInit(Configuration conf) throws Exception { - querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT); - queryMasterContext = new QueryMasterContext(systemConf); + this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); + this.manager = RpcClientManager.getInstance(); - clock = new SystemClock(); + querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT); + queryMasterContext = new QueryMasterContext(systemConf); - this.dispatcher = new AsyncDispatcher(); - addIfService(dispatcher); + clock = new SystemClock(); - globalPlanner = new GlobalPlanner(systemConf, workerContext); + this.dispatcher = new AsyncDispatcher(); + addIfService(dispatcher); - dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler()); - dispatcher.register(QueryStopEvent.EventType.class, new QueryStopEventHandler()); + globalPlanner = new GlobalPlanner(systemConf, workerContext); - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - throw new RuntimeException(t); - } - super.init(conf); + dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler()); + dispatcher.register(QueryStopEvent.EventType.class, new QueryStopEventHandler()); + super.serviceInit(conf); + LOG.info("QueryMaster inited"); } @Override - public void start() { - LOG.info("QueryMaster start"); - + public void serviceStart() throws Exception { queryHeartbeatThread = new QueryHeartbeatThread(); queryHeartbeatThread.start(); @@ -136,15 +134,15 @@ public class QueryMaster extends CompositeService implements EventHandler { finishedQueryMasterTaskCleanThread = new FinishedQueryMasterTaskCleanThread(); finishedQueryMasterTaskCleanThread.start(); - eventExecutor = Executors.newSingleThreadExecutor(); - super.start(); + eventExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + singleEventExecutor = Executors.newSingleThreadExecutor(); + super.serviceStart(); + LOG.info("QueryMaster started"); } @Override - public void stop() { - if(queryMasterStop.getAndSet(true)){ - return; - } + public void serviceStop() throws Exception { + isStopped = true; if(queryHeartbeatThread != null) { queryHeartbeatThread.interrupt(); @@ -162,60 +160,15 @@ public class QueryMaster extends CompositeService implements EventHandler { eventExecutor.shutdown(); } - super.stop(); - - LOG.info("QueryMaster stopped"); - } - - protected void cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> executionBlockIds) { - StringBuilder cleanupMessage = new StringBuilder(); - String prefix = ""; - for (TajoIdProtos.ExecutionBlockIdProto eachEbId: executionBlockIds) { - cleanupMessage.append(prefix).append(new ExecutionBlockId(eachEbId).toString()); - prefix = ","; + if(singleEventExecutor != null){ + singleEventExecutor.shutdown(); } - LOG.info("cleanup executionBlocks: " + cleanupMessage); - NettyClientBase rpc = null; - List<WorkerResourceProto> workers = getAllWorker(); - TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder(); - builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds)); - TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build(); - for (WorkerResourceProto worker : workers) { - try { - TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); - rpc = manager.getClient(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), - TajoWorkerProtocol.class, true); - TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); - - tajoWorkerProtocolService.cleanupExecutionBlocks(null, executionBlockListProto, NullCallback.get()); - } catch (Exception e) { - LOG.warn("Ignoring exception. " + e.getMessage(), e); - continue; - } - } - } - - private void cleanup(QueryId queryId) { - LOG.info("cleanup query resources : " + queryId); - NettyClientBase rpc = null; - List<WorkerResourceProto> workers = getAllWorker(); - - for (WorkerResourceProto worker : workers) { - try { - TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); - rpc = manager.getClient(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), - TajoWorkerProtocol.class, true); - TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); - - tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get()); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } + super.serviceStop(); + LOG.info("QueryMaster stopped"); } - public List<WorkerResourceProto> getAllWorker() { + public List<TajoProtos.WorkerConnectionInfoProto> getAllWorker() { NettyClientBase rpc = null; try { @@ -228,16 +181,17 @@ public class QueryMaster extends CompositeService implements EventHandler { rpc = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterService = rpc.getStub(); - CallFuture<WorkerResourcesRequest> callBack = new CallFuture<WorkerResourcesRequest>(); - masterService.getAllWorkerResource(callBack.getController(), + CallFuture<WorkerConnectionsResponse> callBack = new CallFuture<WorkerConnectionsResponse>(); + masterService.getAllWorkers(callBack.getController(), PrimitiveProtos.NullProto.getDefaultInstance(), callBack); - WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS); - return workerResourcesRequest.getWorkerResourcesList(); + WorkerConnectionsResponse connectionsProto = + callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + return connectionsProto.getWorkerList(); } catch (Exception e) { LOG.error(e.getMessage(), e); } - return new ArrayList<WorkerResourceProto>(); + return new ArrayList<TajoProtos.WorkerConnectionInfoProto>(); } @Override @@ -253,13 +207,14 @@ public class QueryMaster extends CompositeService implements EventHandler { return queryMasterTasks.get(queryId); } + @Deprecated public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean includeFinished) { QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId); if(queryMasterTask != null) { return queryMasterTask; } else { if(includeFinished) { - return finishedQueryMasterTasks.get(queryId); + return (QueryMasterTask) finishedQueryMasterTasksCache.get(queryId); } else { return null; } @@ -274,8 +229,9 @@ public class QueryMaster extends CompositeService implements EventHandler { return queryMasterTasks.values(); } + @Deprecated public Collection<QueryMasterTask> getFinishedQueryMasterTasks() { - return finishedQueryMasterTasks.values(); + return finishedQueryMasterTasksCache.values(); } public class QueryMasterContext { @@ -293,6 +249,10 @@ public class QueryMaster extends CompositeService implements EventHandler { return eventExecutor; } + public ExecutorService getSingleEventExecutor(){ + return singleEventExecutor; + } + public AsyncDispatcher getDispatcher() { return dispatcher; } @@ -324,12 +284,12 @@ public class QueryMaster extends CompositeService implements EventHandler { return; } - finishedQueryMasterTasks.put(queryId, queryMasterTask); + finishedQueryMasterTasksCache.put(queryId, queryMasterTask); - TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask); + TajoHeartbeatRequest queryHeartbeat = buildTajoHeartBeat(queryMasterTask); CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>(); - NettyClientBase tmClient = null; + NettyClientBase tmClient; try { tmClient = manager.getClient(workerContext.getServiceTracker().getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); @@ -346,9 +306,6 @@ public class QueryMaster extends CompositeService implements EventHandler { try { queryMasterTask.stop(); - if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) { - cleanup(queryId); - } } catch (Exception e) { LOG.error(e.getMessage(), e); } @@ -367,8 +324,8 @@ public class QueryMaster extends CompositeService implements EventHandler { } } - private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) { - TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder(); + private TajoHeartbeatRequest buildTajoHeartBeat(QueryMasterTask queryMasterTask) { + TajoHeartbeatRequest.Builder builder = TajoHeartbeatRequest.newBuilder(); builder.setConnectionInfo(workerContext.getConnectionInfo().getProto()); builder.setQueryId(queryMasterTask.getQueryId().getProto()); @@ -387,7 +344,7 @@ public class QueryMaster extends CompositeService implements EventHandler { public void handle(QueryStartEvent event) { LOG.info("Start QueryStartEventHandler:" + event.getQueryId()); QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext, - event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr()); + event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr(), event.getAllocation()); synchronized(queryMasterTasks) { queryMasterTasks.put(event.getQueryId(), queryMasterTask); @@ -402,7 +359,6 @@ public class QueryMaster extends CompositeService implements EventHandler { if (queryMasterTask.isInitError()) { queryMasterContext.stopQuery(queryMasterTask.getQueryId()); - return; } } } @@ -422,31 +378,29 @@ public class QueryMaster extends CompositeService implements EventHandler { @Override public void run() { LOG.info("Start QueryMaster heartbeat thread"); - while(!queryMasterStop.get()) { + while(!isStopped) { List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>(); - synchronized(queryMasterTasks) { - tempTasks.addAll(queryMasterTasks.values()); - } - synchronized(queryMasterTasks) { - for(QueryMasterTask eachTask: tempTasks) { - NettyClientBase tmClient; - try { + tempTasks.addAll(queryMasterTasks.values()); + + for(QueryMasterTask eachTask: tempTasks) { + NettyClientBase tmClient; + try { - ServiceTracker serviceTracker = queryMasterContext.getWorkerContext().getServiceTracker(); - tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(), - QueryCoordinatorProtocol.class, true); - QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); + ServiceTracker serviceTracker = queryMasterContext.getWorkerContext().getServiceTracker(); + tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(), + QueryCoordinatorProtocol.class, true); + QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); - TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask); - masterClientService.heartbeat(null, queryHeartbeat, NullCallback.get()); - } catch (Throwable t) { - t.printStackTrace(); - } + TajoHeartbeatRequest queryHeartbeat = buildTajoHeartBeat(eachTask); + masterClientService.heartbeat(null, queryHeartbeat, NullCallback.get()); + } catch (Throwable t) { + t.printStackTrace(); } } - synchronized(queryMasterStop) { + + synchronized(this) { try { - queryMasterStop.wait(2000); + this.wait(2000); } catch (InterruptedException e) { break; } @@ -459,16 +413,16 @@ public class QueryMaster extends CompositeService implements EventHandler { class ClientSessionTimeoutCheckThread extends Thread { public void run() { LOG.info("ClientSessionTimeoutCheckThread started"); - while(!queryMasterStop.get()) { + while(!isStopped) { try { - Thread.sleep(1000); + synchronized (this) { + this.wait(1000); + } } catch (InterruptedException e) { break; } List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>(); - synchronized(queryMasterTasks) { - tempTasks.addAll(queryMasterTasks.values()); - } + tempTasks.addAll(queryMasterTasks.values()); for(QueryMasterTask eachTask: tempTasks) { if(!eachTask.isStopped()) { @@ -490,11 +444,13 @@ public class QueryMaster extends CompositeService implements EventHandler { class FinishedQueryMasterTaskCleanThread extends Thread { public void run() { - int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_HISTORY_EXPIRE_PERIOD); + int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_CACHE_EXPIRE_PERIOD); LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime); - while(!queryMasterStop.get()) { + while(!isStopped) { try { - Thread.sleep(60 * 1000); // minimum interval minutes + synchronized (this) { + this.wait(60 * 1000); // minimum interval minutes + } } catch (InterruptedException e) { break; } @@ -508,26 +464,25 @@ public class QueryMaster extends CompositeService implements EventHandler { } private void cleanExpiredFinishedQueryMasterTask(long expireTime) { - synchronized(finishedQueryMasterTasks) { - List<QueryId> expiredQueryIds = new ArrayList<QueryId>(); - for(Map.Entry<QueryId, QueryMasterTask> entry: finishedQueryMasterTasks.entrySet()) { - + List<QueryId> expiredQueryIds = new ArrayList<QueryId>(); + for(Object key: new ArrayList<Object>(finishedQueryMasterTasksCache.keySet())) { + QueryId queryId = (QueryId) key; /* If a query are abnormal termination, the finished time will be zero. */ - long finishedTime = entry.getValue().getStartTime(); - Query query = entry.getValue().getQuery(); - if (query != null && query.getFinishTime() > 0) { - finishedTime = query.getFinishTime(); - } - - if(finishedTime < expireTime) { - expiredQueryIds.add(entry.getKey()); - } + QueryMasterTask queryMasterTask = (QueryMasterTask) finishedQueryMasterTasksCache.get(queryId); + long finishedTime = queryMasterTask.getStartTime(); + Query query = queryMasterTask.getQuery(); + if (query != null && query.getFinishTime() > 0) { + finishedTime = query.getFinishTime(); } - for(QueryId eachId: expiredQueryIds) { - finishedQueryMasterTasks.remove(eachId); + if(finishedTime < expireTime) { + expiredQueryIds.add(queryId); } } + + for(QueryId eachId: expiredQueryIds) { + finishedQueryMasterTasksCache.remove(eachId); + } } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java index 59933a7..62216aa 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java @@ -18,7 +18,6 @@ package org.apache.tajo.querymaster; -import com.google.common.base.Preconditions; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import org.apache.commons.logging.Log; @@ -27,19 +26,26 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; import org.apache.tajo.*; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.*; -import org.apache.tajo.session.Session; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.serder.PlanProto; +import org.apache.tajo.resource.NodeResource; import org.apache.tajo.rpc.AsyncRpcServer; +import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.session.Session; import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TajoWorker; +import org.apache.tajo.worker.event.QMResourceAllocateEvent; import java.net.InetSocketAddress; +import static org.apache.tajo.ResourceProtos.*; + public class QueryMasterManagerService extends CompositeService implements QueryMasterProtocol.QueryMasterProtocolService.Interface { private static final Log LOG = LogFactory.getLog(QueryMasterManagerService.class.getName()); @@ -64,51 +70,41 @@ public class QueryMasterManagerService extends CompositeService } @Override - public void init(Configuration conf) { - Preconditions.checkArgument(conf instanceof TajoConf); - TajoConf tajoConf = (TajoConf) conf; - try { - // Setup RPC server - InetSocketAddress initIsa = - new InetSocketAddress("0.0.0.0", port); - if (initIsa.getAddress() == null) { - throw new IllegalArgumentException("Failed resolve of " + initIsa); - } - - int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM); - this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa, workerNum); - this.rpcServer.start(); + public void serviceInit(Configuration conf) throws Exception { + + TajoConf tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); + // Setup RPC server + InetSocketAddress initIsa = + new InetSocketAddress("0.0.0.0", port); + if (initIsa.getAddress() == null) { + throw new IllegalArgumentException("Failed resolve of " + initIsa); + } - this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress()); - this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort(); + int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM); + this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa, workerNum); + this.rpcServer.start(); - this.port = bindAddr.getPort(); + this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress()); + this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort(); - queryMaster = new QueryMaster(workerContext); - addService(queryMaster); + this.port = bindAddr.getPort(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } + queryMaster = new QueryMaster(workerContext); + addService(queryMaster); // Get the master address LOG.info("QueryMasterManagerService is bind to " + addr); - ((TajoConf)conf).setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr); + tajoConf.setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr); - super.init(conf); + super.serviceInit(conf); } @Override - public void start() { - super.start(); - } - - @Override - public void stop() { + public void serviceStop() throws Exception { if(rpcServer != null) { rpcServer.shutdown(); } LOG.info("QueryMasterManagerService stopped"); - super.stop(); + super.serviceStop(); } public InetSocketAddress getBindAddr() { @@ -116,52 +112,31 @@ public class QueryMasterManagerService extends CompositeService } @Override - public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request, - RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) { - try { - ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId()); - QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId()); - - if(queryMasterTask == null || queryMasterTask.isStopped()) { - done.run(DefaultTaskScheduler.stopTaskRunnerReq); - } else { - TajoContainerId cid = - queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId()); - LOG.debug("getTask:" + cid + ", ebId:" + ebId); - queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(request.getWorkerId(), cid, ebId, done)); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - controller.setFailed(e.getMessage()); - } - } - - @Override - public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request, + public void statusUpdate(RpcController controller, TaskStatusProto request, RpcCallback<PrimitiveProtos.NullProto> done) { QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId()); TaskAttemptId attemptId = new TaskAttemptId(request.getId()); QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId); - if (queryMasterTask == null) { - queryMasterTask = queryMaster.getQueryMasterTask(queryId, true); - } - Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId()); - Task task = sq.getTask(attemptId.getTaskId()); - TaskAttempt attempt = task.getAttempt(attemptId.getId()); - if(LOG.isDebugEnabled()){ - LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name())); - } + if (queryMasterTask != null) { + Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId()); + Task task = sq.getTask(attemptId.getTaskId()); + TaskAttempt attempt = task.getAttempt(attemptId.getId()); - if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) { - LOG.warn(attemptId + " Killed"); - attempt.handle( - new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED)); - } else { - queryMasterTask.getEventHandler().handle( - new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request)); - } + if(LOG.isDebugEnabled()){ + LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name())); + } + + if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) { + LOG.warn(attemptId + " Killed"); + attempt.handle( + new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED)); + } else { + queryMasterTask.getEventHandler().handle( + new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request)); + } + } done.run(TajoWorker.NULL_PROTO); } @@ -173,7 +148,7 @@ public class QueryMasterManagerService extends CompositeService } @Override - public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report, + public void fatalError(RpcController controller, TaskFatalErrorReport report, RpcCallback<PrimitiveProtos.NullProto> done) { QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId())); @@ -186,7 +161,7 @@ public class QueryMasterManagerService extends CompositeService } @Override - public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report, + public void done(RpcController controller, TaskCompletionReport report, RpcCallback<PrimitiveProtos.NullProto> done) { QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId())); @@ -198,7 +173,7 @@ public class QueryMasterManagerService extends CompositeService @Override public void doneExecutionBlock( - RpcController controller, TajoWorkerProtocol.ExecutionBlockReport request, + RpcController controller, ExecutionBlockReport request, RpcCallback<PrimitiveProtos.NullProto> done) { QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId())); if (queryMasterTask != null) { @@ -209,6 +184,38 @@ public class QueryMasterManagerService extends CompositeService } @Override + public void getExecutionBlockContext(RpcController controller, + ExecutionBlockContextRequest request, + RpcCallback<ExecutionBlockContextResponse> done) { + + QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( + new QueryId(request.getExecutionBlockId().getQueryId())); + if (queryMasterTask != null) { + + Stage stage = queryMasterTask.getQuery().getStage(new ExecutionBlockId(request.getExecutionBlockId())); + + // first request with starting ExecutionBlock + PlanProto.ShuffleType shuffleType = stage.getDataChannel().getShuffleType(); + + ExecutionBlockContextResponse.Builder ebRequestProto = ExecutionBlockContextResponse.newBuilder(); + ebRequestProto.setExecutionBlockId(request.getExecutionBlockId()) + .setQueryContext(stage.getContext().getQueryContext().getProto()) + .setQueryOutputPath(stage.getContext().getStagingDir().toString()) + .setPlanJson(CoreGsonHelper.toJson(stage.getBlock().getPlan(), LogicalNode.class)) + .setShuffleType(shuffleType); + + //Set assigned worker to stage + if (!stage.getAssignedWorkerMap().containsKey(request.getWorker().getId())) { + stage.getAssignedWorkerMap().put(request.getWorker().getId(), + NetUtils.createSocketAddr(request.getWorker().getHost(), request.getWorker().getPeerRpcPort())); + } + done.run(ebRequestProto.build()); + } else { + controller.setFailed("Can't find query. request: " + request); + } + } + + @Override public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request, RpcCallback<PrimitiveProtos.NullProto> done) { QueryId queryId = new QueryId(request); @@ -221,7 +228,7 @@ public class QueryMasterManagerService extends CompositeService @Override public void executeQuery(RpcController controller, - TajoWorkerProtocol.QueryExecutionRequestProto request, + QueryExecutionRequest request, RpcCallback<PrimitiveProtos.NullProto> done) { workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc(); @@ -231,7 +238,22 @@ public class QueryMasterManagerService extends CompositeService new Session(request.getSession()), new QueryContext(workerContext.getQueryMaster().getContext().getConf(), request.getQueryContext()), request.getExprInJson().getValue(), - request.getLogicalPlanJson().getValue())); + request.getLogicalPlanJson().getValue(), new NodeResource(request.getAllocation().getResource()))); done.run(TajoWorker.NULL_PROTO); } + + @Override + public void allocateQueryMaster(RpcController controller, + AllocationResourceProto request, + RpcCallback<PrimitiveProtos.BoolProto> done) { + CallFuture<PrimitiveProtos.BoolProto> callFuture = new CallFuture<PrimitiveProtos.BoolProto>(); + workerContext.getNodeResourceManager().handle(new QMResourceAllocateEvent(request, callFuture)); + + try { + done.run(callFuture.get()); + } catch (Exception e) { + controller.setFailed(e.getMessage()); + done.run(TajoWorker.FALSE_PROTO); + } + } }
