http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java new file mode 100644 index 0000000..351856f --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -0,0 +1,926 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.RackResolver; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.engine.planner.global.ExecutionBlock; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.query.TaskRequest; +import org.apache.tajo.engine.query.TaskRequestImpl; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.ContainerProxy; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.container.TajoContainerId; +import org.apache.tajo.master.event.*; +import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; +import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; +import org.apache.tajo.plan.serder.LogicalNodeSerializer; +import org.apache.tajo.plan.serder.PlanProto; +import org.apache.tajo.storage.DataLocation; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.NetUtils; +import org.apache.tajo.worker.FetchImpl; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; + +public class DefaultTaskScheduler extends AbstractTaskScheduler { + private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class); + + private final TaskSchedulerContext context; + private Stage stage; + + private Thread schedulingThread; + private AtomicBoolean stopEventHandling = new AtomicBoolean(false); + + private ScheduledRequests scheduledRequests; + private TaskRequests taskRequests; + + private int nextTaskId = 0; + private int scheduledObjectNum = 0; + + public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) { + super(DefaultTaskScheduler.class.getName()); + this.context = context; + this.stage = stage; + } + + @Override + public void init(Configuration conf) { + + scheduledRequests = new ScheduledRequests(); + taskRequests = new TaskRequests(); + + super.init(conf); + } + + @Override + public void start() { + LOG.info("Start TaskScheduler"); + + this.schedulingThread = new Thread() { + public void run() { + + while(!stopEventHandling.get() && !Thread.currentThread().isInterrupted()) { + try { + synchronized (schedulingThread){ + schedulingThread.wait(100); + } + schedule(); + } catch (InterruptedException e) { + break; + } catch (Throwable e) { + LOG.fatal(e.getMessage(), e); + break; + } + } + LOG.info("TaskScheduler schedulingThread stopped"); + } + }; + + this.schedulingThread.start(); + super.start(); + } + + private static final TaskAttemptId NULL_ATTEMPT_ID; + public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq; + static { + ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); + NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0); + + TajoWorkerProtocol.TaskRequestProto.Builder builder = + TajoWorkerProtocol.TaskRequestProto.newBuilder(); + builder.setId(NULL_ATTEMPT_ID.getProto()); + builder.setShouldDie(true); + builder.setOutputTable(""); + builder.setPlan(PlanProto.LogicalNodeTree.newBuilder()); + builder.setClusteredOutput(false); + stopTaskRunnerReq = builder.build(); + } + + @Override + public void stop() { + if(stopEventHandling.getAndSet(true)){ + return; + } + + if (schedulingThread != null) { + synchronized (schedulingThread) { + schedulingThread.notifyAll(); + } + } + + // Return all of request callbacks instantly. + if(taskRequests != null){ + for (TaskRequestEvent req : taskRequests.taskRequestQueue) { + req.getCallback().run(stopTaskRunnerReq); + } + } + + LOG.info("Task Scheduler stopped"); + super.stop(); + } + + private Fragment[] fragmentsForNonLeafTask; + private Fragment[] broadcastFragmentsForNonLeafTask; + + LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>(); + public void schedule() { + + if (taskRequests.size() > 0) { + if (scheduledRequests.leafTaskNum() > 0) { + LOG.debug("Try to schedule tasks with taskRequestEvents: " + + taskRequests.size() + ", LeafTask Schedule Request: " + + scheduledRequests.leafTaskNum()); + taskRequests.getTaskRequests(taskRequestEvents, + scheduledRequests.leafTaskNum()); + LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents "); + if (taskRequestEvents.size() > 0) { + scheduledRequests.assignToLeafTasks(taskRequestEvents); + taskRequestEvents.clear(); + } + } + } + + if (taskRequests.size() > 0) { + if (scheduledRequests.nonLeafTaskNum() > 0) { + LOG.debug("Try to schedule tasks with taskRequestEvents: " + + taskRequests.size() + ", NonLeafTask Schedule Request: " + + scheduledRequests.nonLeafTaskNum()); + taskRequests.getTaskRequests(taskRequestEvents, + scheduledRequests.nonLeafTaskNum()); + scheduledRequests.assignToNonLeafTasks(taskRequestEvents); + taskRequestEvents.clear(); + } + } + } + + @Override + public void handle(TaskSchedulerEvent event) { + if (event.getType() == EventType.T_SCHEDULE) { + if (event instanceof FragmentScheduleEvent) { + FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event; + if (context.isLeafQuery()) { + TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(); + Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++); + task.addFragment(castEvent.getLeftFragment(), true); + scheduledObjectNum++; + if (castEvent.hasRightFragments()) { + task.addFragments(castEvent.getRightFragments()); + } + stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); + } else { + fragmentsForNonLeafTask = new FileFragment[2]; + fragmentsForNonLeafTask[0] = castEvent.getLeftFragment(); + if (castEvent.hasRightFragments()) { + FileFragment[] rightFragments = castEvent.getRightFragments().toArray(new FileFragment[]{}); + fragmentsForNonLeafTask[1] = rightFragments[0]; + if (rightFragments.length > 1) { + broadcastFragmentsForNonLeafTask = new FileFragment[rightFragments.length - 1]; + System.arraycopy(rightFragments, 1, broadcastFragmentsForNonLeafTask, 0, broadcastFragmentsForNonLeafTask.length); + } else { + broadcastFragmentsForNonLeafTask = null; + } + } + } + } else if (event instanceof FetchScheduleEvent) { + FetchScheduleEvent castEvent = (FetchScheduleEvent) event; + Map<String, List<FetchImpl>> fetches = castEvent.getFetches(); + TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(); + Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++); + scheduledObjectNum++; + for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) { + task.addFetches(eachFetch.getKey(), eachFetch.getValue()); + task.addFragment(fragmentsForNonLeafTask[0], true); + if (fragmentsForNonLeafTask[1] != null) { + task.addFragment(fragmentsForNonLeafTask[1], true); + } + } + if (broadcastFragmentsForNonLeafTask != null && broadcastFragmentsForNonLeafTask.length > 0) { + task.addFragments(Arrays.asList(broadcastFragmentsForNonLeafTask)); + } + stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); + } else if (event instanceof TaskAttemptToSchedulerEvent) { + TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event; + if (context.isLeafQuery()) { + scheduledRequests.addLeafTask(castEvent); + } else { + scheduledRequests.addNonLeafTask(castEvent); + } + } + } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) { + // when a stage is killed, unassigned query unit attmpts are canceled from the scheduler. + // This event is triggered by TaskAttempt. + TaskAttemptToSchedulerEvent castedEvent = (TaskAttemptToSchedulerEvent) event; + scheduledRequests.leafTasks.remove(castedEvent.getTaskAttempt().getId()); + LOG.info(castedEvent.getTaskAttempt().getId() + " is canceled from " + this.getClass().getSimpleName()); + ((TaskAttemptToSchedulerEvent) event).getTaskAttempt().handle( + new TaskAttemptEvent(castedEvent.getTaskAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED)); + } + } + + @Override + public void handleTaskRequestEvent(TaskRequestEvent event) { + + taskRequests.handle(event); + int hosts = scheduledRequests.leafTaskHostMapping.size(); + + // if available cluster resource are large then tasks, the scheduler thread are working immediately. + if(remainingScheduledObjectNum() > 0 && + (remainingScheduledObjectNum() <= hosts || hosts <= taskRequests.size())){ + synchronized (schedulingThread){ + schedulingThread.notifyAll(); + } + } + } + + @Override + public int remainingScheduledObjectNum() { + return scheduledObjectNum; + } + + private class TaskRequests implements EventHandler<TaskRequestEvent> { + private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue = + new LinkedBlockingQueue<TaskRequestEvent>(); + + @Override + public void handle(TaskRequestEvent event) { + if(LOG.isDebugEnabled()){ + LOG.debug("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId()); + } + + if(stopEventHandling.get()) { + event.getCallback().run(stopTaskRunnerReq); + return; + } + int qSize = taskRequestQueue.size(); + if (qSize != 0 && qSize % 1000 == 0) { + LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize); + } + int remCapacity = taskRequestQueue.remainingCapacity(); + if (remCapacity < 1000) { + LOG.warn("Very low remaining capacity in the event-queue " + + "of DefaultTaskScheduler: " + remCapacity); + } + + taskRequestQueue.add(event); + } + + public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests, + int num) { + taskRequestQueue.drainTo(taskRequests, num); + } + + public int size() { + return taskRequestQueue.size(); + } + } + + /** + * One worker can have multiple running task runners. <code>HostVolumeMapping</code> + * describes various information for one worker, including : + * <ul> + * <li>host name</li> + * <li>rack name</li> + * <li>unassigned tasks for each disk volume</li> + * <li>last assigned volume id - it can be used for assigning task in a round-robin manner</li> + * <li>the number of running tasks for each volume</li> + * </ul>, each task runner and the concurrency number of running tasks for volumes. + * + * Here, we identifier a task runner by {@link ContainerId}, and we use volume ids to identify + * all disks in this node. Actually, each volume is only used to distinguish disks, and we don't + * know a certain volume id indicates a certain disk. If you want to know volume id, please read the below section. + * + * <h3>Volume id</h3> + * Volume id is an integer. Each volume id identifies each disk volume. + * + * This volume id can be obtained from org.apache.hadoop.fs.BlockStorageLocation#getVolumeIds()}. * + * HDFS cannot give any volume id due to unknown reason and disabled config 'dfs.client.file-block-locations.enabled'. + * In this case, the volume id will be -1 or other native integer. + * + * <h3>See Also</h3> + * <ul> + * <li>HDFS-3672 (https://issues.apache.org/jira/browse/HDFS-3672).</li> + * </ul> + */ + public class HostVolumeMapping { + private final String host; + private final String rack; + /** A key is disk volume, and a value is a list of tasks to be scheduled. */ + private Map<Integer, LinkedHashSet<TaskAttempt>> unassignedTaskForEachVolume = + Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<TaskAttempt>>()); + /** A value is last assigned volume id for each task runner */ + private HashMap<TajoContainerId, Integer> lastAssignedVolumeId = new HashMap<TajoContainerId, + Integer>(); + /** + * A key is disk volume id, and a value is the load of this volume. + * This load is measured by counting how many number of tasks are running. + * + * These disk volumes are kept in an order of ascending order of the volume id. + * In other words, the head volume ids are likely to -1, meaning no given volume id. + */ + private SortedMap<Integer, Integer> diskVolumeLoads = new TreeMap<Integer, Integer>(); + /** The total number of remain tasks in this host */ + private AtomicInteger remainTasksNum = new AtomicInteger(0); + public static final int REMOTE = -2; + + + public HostVolumeMapping(String host, String rack){ + this.host = host; + this.rack = rack; + } + + public synchronized void addTaskAttempt(int volumeId, TaskAttempt attemptId){ + synchronized (unassignedTaskForEachVolume){ + LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId); + if (list == null) { + list = new LinkedHashSet<TaskAttempt>(); + unassignedTaskForEachVolume.put(volumeId, list); + } + list.add(attemptId); + } + + remainTasksNum.incrementAndGet(); + + if(!diskVolumeLoads.containsKey(volumeId)) diskVolumeLoads.put(volumeId, 0); + } + + /** + * Priorities + * 1. a task list in a volume of host + * 2. unknown block or Non-splittable task in host + * 3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null + */ + public synchronized TaskAttemptId getLocalTask(TajoContainerId containerId) { + int volumeId; + TaskAttemptId taskAttemptId = null; + + if (!lastAssignedVolumeId.containsKey(containerId)) { + volumeId = getLowestVolumeId(); + increaseConcurrency(containerId, volumeId); + } else { + volumeId = lastAssignedVolumeId.get(containerId); + } + + if (unassignedTaskForEachVolume.size() > 0) { + int retry = unassignedTaskForEachVolume.size(); + do { + //clean and get a remaining local task + taskAttemptId = getAndRemove(volumeId); + if(!unassignedTaskForEachVolume.containsKey(volumeId)) { + decreaseConcurrency(containerId); + if (volumeId > REMOTE) { + diskVolumeLoads.remove(volumeId); + } + } + + if (taskAttemptId == null) { + //reassign next volume + volumeId = getLowestVolumeId(); + increaseConcurrency(containerId, volumeId); + retry--; + } else { + break; + } + } while (retry > 0); + } else { + this.remainTasksNum.set(0); + } + return taskAttemptId; + } + + public synchronized TaskAttemptId getTaskAttemptIdByRack(String rack) { + TaskAttemptId taskAttemptId = null; + + if (unassignedTaskForEachVolume.size() > 0 && this.rack.equals(rack)) { + int retry = unassignedTaskForEachVolume.size(); + do { + //clean and get a remaining task + int volumeId = getLowestVolumeId(); + taskAttemptId = getAndRemove(volumeId); + if (taskAttemptId == null) { + if (volumeId > REMOTE) { + diskVolumeLoads.remove(volumeId); + } + retry--; + } else { + break; + } + } while (retry > 0); + } + return taskAttemptId; + } + + private synchronized TaskAttemptId getAndRemove(int volumeId){ + TaskAttemptId taskAttemptId = null; + if(!unassignedTaskForEachVolume.containsKey(volumeId)) return taskAttemptId; + + LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId); + if(list != null && list.size() > 0){ + TaskAttempt taskAttempt; + synchronized (unassignedTaskForEachVolume) { + Iterator<TaskAttempt> iterator = list.iterator(); + taskAttempt = iterator.next(); + iterator.remove(); + } + + this.remainTasksNum.getAndDecrement(); + taskAttemptId = taskAttempt.getId(); + for (DataLocation location : taskAttempt.getTask().getDataLocations()) { + if (!this.getHost().equals(location.getHost())) { + HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost()); + if (volumeMapping != null) { + volumeMapping.removeTaskAttempt(location.getVolumeId(), taskAttempt); + } + } + } + } + + if(list == null || list.isEmpty()) { + unassignedTaskForEachVolume.remove(volumeId); + } + return taskAttemptId; + } + + private synchronized void removeTaskAttempt(int volumeId, TaskAttempt taskAttempt){ + if(!unassignedTaskForEachVolume.containsKey(volumeId)) return; + + LinkedHashSet<TaskAttempt> tasks = unassignedTaskForEachVolume.get(volumeId); + + if(tasks != null && tasks.size() > 0){ + tasks.remove(taskAttempt); + remainTasksNum.getAndDecrement(); + } else { + unassignedTaskForEachVolume.remove(volumeId); + } + } + + /** + * Increase the count of running tasks and disk loads for a certain task runner. + * + * @param containerId The task runner identifier + * @param volumeId Volume identifier + * @return the volume load (i.e., how many running tasks use this volume) + */ + private synchronized int increaseConcurrency(TajoContainerId containerId, int volumeId) { + + int concurrency = 1; + if (diskVolumeLoads.containsKey(volumeId)) { + concurrency = diskVolumeLoads.get(volumeId) + 1; + } + + if (volumeId > -1) { + LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency); + } else if (volumeId == -1) { + // this case is disabled namenode block meta or compressed text file or amazon s3 + LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency); + } else if (volumeId == REMOTE) { + // this case has processed all block on host and it will be assigned to remote + LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize() + + ", Remote Concurrency : " + concurrency); + } + diskVolumeLoads.put(volumeId, concurrency); + lastAssignedVolumeId.put(containerId, volumeId); + return concurrency; + } + + /** + * Decrease the count of running tasks of a certain task runner + */ + private synchronized void decreaseConcurrency(TajoContainerId containerId){ + Integer volumeId = lastAssignedVolumeId.get(containerId); + if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){ + Integer concurrency = diskVolumeLoads.get(volumeId); + if(concurrency > 0){ + diskVolumeLoads.put(volumeId, concurrency - 1); + } else { + if (volumeId > REMOTE) { + diskVolumeLoads.remove(volumeId); + } + } + } + lastAssignedVolumeId.remove(containerId); + } + + /** + * volume of a host : 0 ~ n + * compressed task, amazon s3, unKnown volume : -1 + * remote task : -2 + */ + public int getLowestVolumeId(){ + Map.Entry<Integer, Integer> volumeEntry = null; + + for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) { + if(volumeEntry == null) volumeEntry = entry; + + if (volumeEntry.getValue() >= entry.getValue()) { + volumeEntry = entry; + } + } + + if(volumeEntry != null){ + return volumeEntry.getKey(); + } else { + return REMOTE; + } + } + + public boolean isAssigned(TajoContainerId containerId){ + return lastAssignedVolumeId.containsKey(containerId); + } + + public boolean isRemote(TajoContainerId containerId){ + Integer volumeId = lastAssignedVolumeId.get(containerId); + if(volumeId == null || volumeId > REMOTE){ + return false; + } else { + return true; + } + } + + public int getRemoteConcurrency(){ + return getVolumeConcurrency(REMOTE); + } + + public int getVolumeConcurrency(int volumeId){ + Integer size = diskVolumeLoads.get(volumeId); + if(size == null) return 0; + else return size; + } + + public int getRemainingLocalTaskSize(){ + return remainTasksNum.get(); + } + + public String getHost() { + + return host; + } + + public String getRack() { + return rack; + } + } + + private class ScheduledRequests { + // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in + // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner + // if the task is not included in leafTasks and nonLeafTasks. + private final Set<TaskAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>()); + private final Set<TaskAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>()); + private Map<String, HostVolumeMapping> leafTaskHostMapping = Maps.newConcurrentMap(); + private final Map<String, HashSet<TaskAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap(); + + private synchronized void addLeafTask(TaskAttemptToSchedulerEvent event) { + TaskAttempt taskAttempt = event.getTaskAttempt(); + List<DataLocation> locations = taskAttempt.getTask().getDataLocations(); + + for (DataLocation location : locations) { + String host = location.getHost(); + + HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); + if (hostVolumeMapping == null) { + String rack = RackResolver.resolve(host).getNetworkLocation(); + hostVolumeMapping = new HostVolumeMapping(host, rack); + leafTaskHostMapping.put(host, hostVolumeMapping); + } + hostVolumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt); + + if (LOG.isDebugEnabled()) { + LOG.debug("Added attempt req to host " + host); + } + + HashSet<TaskAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack()); + if (list == null) { + list = new HashSet<TaskAttemptId>(); + leafTasksRackMapping.put(hostVolumeMapping.getRack(), list); + } + + list.add(taskAttempt.getId()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack()); + } + } + + leafTasks.add(taskAttempt.getId()); + } + + private void addNonLeafTask(TaskAttemptToSchedulerEvent event) { + nonLeafTasks.add(event.getTaskAttempt().getId()); + } + + public int leafTaskNum() { + return leafTasks.size(); + } + + public int nonLeafTaskNum() { + return nonLeafTasks.size(); + } + + public Set<TaskAttemptId> assignedRequest = new HashSet<TaskAttemptId>(); + + private TaskAttemptId allocateLocalTask(String host, TajoContainerId containerId){ + HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); + + if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode + for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); i++) { + TaskAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId); + + if(attemptId == null) break; + //find remaining local task + if (leafTasks.contains(attemptId)) { + leafTasks.remove(attemptId); + //LOG.info(attemptId + " Assigned based on host match " + hostName); + hostLocalAssigned++; + totalAssigned++; + return attemptId; + } + } + } + return null; + } + + private TaskAttemptId allocateRackTask(String host) { + + List<HostVolumeMapping> remainingTasks = Lists.newArrayList(leafTaskHostMapping.values()); + String rack = RackResolver.resolve(host).getNetworkLocation(); + TaskAttemptId attemptId = null; + + if (remainingTasks.size() > 0) { + synchronized (scheduledRequests) { + //find largest remaining task of other host in rack + Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() { + @Override + public int compare(HostVolumeMapping v1, HostVolumeMapping v2) { + // descending remaining tasks + if (v2.remainTasksNum.get() > v1.remainTasksNum.get()) { + return 1; + } else if (v2.remainTasksNum.get() == v1.remainTasksNum.get()) { + return 0; + } else { + return -1; + } + } + }); + } + + for (HostVolumeMapping tasks : remainingTasks) { + for (int i = 0; i < tasks.getRemainingLocalTaskSize(); i++) { + TaskAttemptId tId = tasks.getTaskAttemptIdByRack(rack); + + if (tId == null) break; + + if (leafTasks.contains(tId)) { + leafTasks.remove(tId); + attemptId = tId; + break; + } + } + if(attemptId != null) break; + } + } + + //find task in rack + if (attemptId == null) { + HashSet<TaskAttemptId> list = leafTasksRackMapping.get(rack); + if (list != null) { + synchronized (list) { + Iterator<TaskAttemptId> iterator = list.iterator(); + while (iterator.hasNext()) { + TaskAttemptId tId = iterator.next(); + iterator.remove(); + if (leafTasks.contains(tId)) { + leafTasks.remove(tId); + attemptId = tId; + break; + } + } + } + } + } + + if (attemptId != null) { + rackLocalAssigned++; + totalAssigned++; + + LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%, Rack host: %s", + hostLocalAssigned, rackLocalAssigned, totalAssigned, + ((double) hostLocalAssigned / (double) totalAssigned) * 100, host)); + + } + return attemptId; + } + + public void assignToLeafTasks(LinkedList<TaskRequestEvent> taskRequests) { + Collections.shuffle(taskRequests); + LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>(); + + TaskRequestEvent taskRequest; + while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) { + taskRequest = taskRequests.pollFirst(); + if(taskRequest == null) { // if there are only remote task requests + taskRequest = remoteTaskRequests.pollFirst(); + } + + // checking if this container is still alive. + // If not, ignore the task request and stop the task runner + ContainerProxy container = context.getMasterContext().getResourceAllocator() + .getContainer(taskRequest.getContainerId()); + if(container == null) { + taskRequest.getCallback().run(stopTaskRunnerReq); + continue; + } + + // getting the hostname of requested node + WorkerConnectionInfo connectionInfo = + context.getMasterContext().getResourceAllocator().getWorkerConnectionInfo(taskRequest.getWorkerId()); + String host = connectionInfo.getHost(); + + // if there are no worker matched to the hostname a task request + if(!leafTaskHostMapping.containsKey(host)){ + String normalizedHost = NetUtils.normalizeHost(host); + + if(!leafTaskHostMapping.containsKey(normalizedHost) && !taskRequests.isEmpty()){ + // this case means one of either cases: + // * there are no blocks which reside in this node. + // * all blocks which reside in this node are consumed, and this task runner requests a remote task. + // In this case, we transfer the task request to the remote task request list, and skip the followings. + remoteTaskRequests.add(taskRequest); + continue; + } + } + + TajoContainerId containerId = taskRequest.getContainerId(); + LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," + + "containerId=" + containerId); + + ////////////////////////////////////////////////////////////////////// + // disk or host-local allocation + ////////////////////////////////////////////////////////////////////// + TaskAttemptId attemptId = allocateLocalTask(host, containerId); + + if (attemptId == null) { // if a local task cannot be found + HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); + + if(hostVolumeMapping != null) { + if(!hostVolumeMapping.isRemote(containerId)){ + // assign to remote volume + hostVolumeMapping.decreaseConcurrency(containerId); + hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE); + } + // this part is remote concurrency management of a tail tasks + int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1); + + if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){ + //release container + hostVolumeMapping.decreaseConcurrency(containerId); + taskRequest.getCallback().run(stopTaskRunnerReq); + continue; + } + } + + ////////////////////////////////////////////////////////////////////// + // rack-local allocation + ////////////////////////////////////////////////////////////////////// + attemptId = allocateRackTask(host); + + ////////////////////////////////////////////////////////////////////// + // random node allocation + ////////////////////////////////////////////////////////////////////// + if (attemptId == null && leafTaskNum() > 0) { + synchronized (leafTasks){ + attemptId = leafTasks.iterator().next(); + leafTasks.remove(attemptId); + rackLocalAssigned++; + totalAssigned++; + LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,", + hostLocalAssigned, rackLocalAssigned, totalAssigned, + ((double) hostLocalAssigned / (double) totalAssigned) * 100)); + } + } + } + + if (attemptId != null) { + Task task = stage.getTask(attemptId.getTaskId()); + TaskRequest taskAssign = new TaskRequestImpl( + attemptId, + new ArrayList<FragmentProto>(task.getAllFragments()), + "", + false, + LogicalNodeSerializer.serialize(task.getLogicalPlan()), + context.getMasterContext().getQueryContext(), + stage.getDataChannel(), stage.getBlock().getEnforcer()); + if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { + taskAssign.setInterQuery(); + } + + context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, + taskRequest.getContainerId(), connectionInfo)); + assignedRequest.add(attemptId); + + scheduledObjectNum--; + taskRequest.getCallback().run(taskAssign.getProto()); + } else { + throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!"); + } + } + } + + private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) { + if (masterPlan.isRoot(block)) { + return false; + } + + ExecutionBlock parent = masterPlan.getParent(block); + if (masterPlan.isRoot(parent) && parent.hasUnion()) { + return false; + } + + return true; + } + + public void assignToNonLeafTasks(LinkedList<TaskRequestEvent> taskRequests) { + Collections.shuffle(taskRequests); + + TaskRequestEvent taskRequest; + while (!taskRequests.isEmpty()) { + taskRequest = taskRequests.pollFirst(); + LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId()); + + TaskAttemptId attemptId; + // random allocation + if (nonLeafTasks.size() > 0) { + synchronized (nonLeafTasks){ + attemptId = nonLeafTasks.iterator().next(); + nonLeafTasks.remove(attemptId); + } + LOG.debug("Assigned based on * match"); + + Task task; + task = stage.getTask(attemptId.getTaskId()); + TaskRequest taskAssign = new TaskRequestImpl( + attemptId, + Lists.newArrayList(task.getAllFragments()), + "", + false, + LogicalNodeSerializer.serialize(task.getLogicalPlan()), + context.getMasterContext().getQueryContext(), + stage.getDataChannel(), + stage.getBlock().getEnforcer()); + if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { + taskAssign.setInterQuery(); + } + for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) { + Collection<FetchImpl> fetches = entry.getValue(); + if (fetches != null) { + for (FetchImpl fetch : fetches) { + taskAssign.addFetch(entry.getKey(), fetch); + } + } + } + + WorkerConnectionInfo connectionInfo = context.getMasterContext().getResourceAllocator(). + getWorkerConnectionInfo(taskRequest.getWorkerId()); + context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, + taskRequest.getContainerId(), connectionInfo)); + taskRequest.getCallback().run(taskAssign.getProto()); + totalAssigned++; + scheduledObjectNum--; + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java new file mode 100644 index 0000000..5fe2f80 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.master.event.TaskSchedulerEvent; +import org.apache.tajo.worker.FetchImpl; + +import java.util.List; +import java.util.Map; + +public class FetchScheduleEvent extends TaskSchedulerEvent { + private final Map<String, List<FetchImpl>> fetches; + + public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId, + final Map<String, List<FetchImpl>> fetches) { + super(eventType, blockId); + this.fetches = fetches; + } + + public Map<String, List<FetchImpl>> getFetches() { + return fetches; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java new file mode 100644 index 0000000..2932694 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -0,0 +1,738 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import com.google.common.collect.Maps; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.state.*; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryId; +import org.apache.tajo.SessionVars; +import org.apache.tajo.TajoProtos.QueryState; +import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.planner.global.ExecutionBlock; +import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.master.event.*; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.util.history.QueryHistory; +import org.apache.tajo.util.history.StageHistory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class Query implements EventHandler<QueryEvent> { + private static final Log LOG = LogFactory.getLog(Query.class); + + // Facilities for Query + private final TajoConf systemConf; + private final Clock clock; + private String queryStr; + private Map<ExecutionBlockId, Stage> stages; + private final EventHandler eventHandler; + private final MasterPlan plan; + QueryMasterTask.QueryMasterTaskContext context; + private ExecutionBlockCursor cursor; + + // Query Status + private final QueryId id; + private long appSubmitTime; + private long startTime; + private long finishTime; + private TableDesc resultDesc; + private int completedStagesCount = 0; + private int successedStagesCount = 0; + private int killedStagesCount = 0; + private int failedStagesCount = 0; + private int erroredStagesCount = 0; + private final List<String> diagnostics = new ArrayList<String>(); + + // Internal Variables + private final Lock readLock; + private final Lock writeLock; + private int priority = 100; + + // State Machine + private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine; + private QueryState queryState; + + // Transition Handler + private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); + private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); + private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition(); + private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition(); + + protected static final StateMachineFactory + <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory = + new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent> + (QueryState.QUERY_NEW) + + // Transitions from NEW state + .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_RUNNING, + QueryEventType.START, + new StartTransition()) + .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_NEW, + QueryEventType.DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_KILLED, + QueryEventType.KILL, + new KillNewQueryTransition()) + .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_ERROR, + QueryEventType.INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + + // Transitions from RUNNING state + .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING, + QueryEventType.STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) + .addTransition(QueryState.QUERY_RUNNING, + EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED, + QueryState.QUERY_ERROR), + QueryEventType.QUERY_COMPLETED, + QUERY_COMPLETED_TRANSITION) + .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING, + QueryEventType.DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT, + QueryEventType.KILL, + new KillAllStagesTransition()) + .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR, + QueryEventType.INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + + // Transitions from QUERY_SUCCEEDED state + .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED, + QueryEventType.DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + // ignore-able transitions + .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED, + QueryEventType.STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) + .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED, + QueryEventType.KILL) + .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR, + QueryEventType.INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + + // Transitions from KILL_WAIT state + .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT, + QueryEventType.STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) + .addTransition(QueryState.QUERY_KILL_WAIT, + EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED, + QueryState.QUERY_ERROR), + QueryEventType.QUERY_COMPLETED, + QUERY_COMPLETED_TRANSITION) + .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT, + QueryEventType.DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_ERROR, + QueryEventType.INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + // Ignore-able transitions + .addTransition(QueryState.QUERY_KILL_WAIT, EnumSet.of(QueryState.QUERY_KILLED), + QueryEventType.KILL, + QUERY_COMPLETED_TRANSITION) + + // Transitions from FAILED state + .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED, + QueryEventType.DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_ERROR, + QueryEventType.INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + // Ignore-able transitions + .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED, + QueryEventType.KILL) + + // Transitions from ERROR state + .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR, + QueryEventType.DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR, + QueryEventType.INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + // Ignore-able transitions + .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR, + EnumSet.of(QueryEventType.KILL, QueryEventType.STAGE_COMPLETED)) + + .installTopology(); + + public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId id, + final long appSubmitTime, + final String queryStr, + final EventHandler eventHandler, + final MasterPlan plan) { + this.context = context; + this.systemConf = context.getConf(); + this.id = id; + this.clock = context.getClock(); + this.appSubmitTime = appSubmitTime; + this.queryStr = queryStr; + this.stages = Maps.newConcurrentMap(); + this.eventHandler = eventHandler; + this.plan = plan; + this.cursor = new ExecutionBlockCursor(plan, true); + + StringBuilder sb = new StringBuilder("\n======================================================="); + sb.append("\nThe order of execution: \n"); + int order = 1; + while (cursor.hasNext()) { + ExecutionBlock currentEB = cursor.nextBlock(); + sb.append("\n").append(order).append(": ").append(currentEB.getId()); + order++; + } + sb.append("\n======================================================="); + LOG.info(sb); + cursor.reset(); + + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + this.readLock = readWriteLock.readLock(); + this.writeLock = readWriteLock.writeLock(); + + stateMachine = stateMachineFactory.make(this); + queryState = stateMachine.getCurrentState(); + } + + public float getProgress() { + QueryState state = getState(); + if (state == QueryState.QUERY_SUCCEEDED) { + return 1.0f; + } else { + int idx = 0; + List<Stage> tempStages = new ArrayList<Stage>(); + synchronized(stages) { + tempStages.addAll(stages.values()); + } + + float [] subProgresses = new float[tempStages.size()]; + for (Stage stage: tempStages) { + if (stage.getState() != StageState.NEW) { + subProgresses[idx] = stage.getProgress(); + } else { + subProgresses[idx] = 0.0f; + } + idx++; + } + + float totalProgress = 0.0f; + float proportion = 1.0f / (float)(getExecutionBlockCursor().size() - 1); // minus one is due to + + for (int i = 0; i < subProgresses.length; i++) { + totalProgress += subProgresses[i] * proportion; + } + + return totalProgress; + } + } + + public long getAppSubmitTime() { + return this.appSubmitTime; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime() { + startTime = clock.getTime(); + } + + public long getFinishTime() { + return finishTime; + } + + public void setFinishTime() { + finishTime = clock.getTime(); + } + + public QueryHistory getQueryHistory() { + QueryHistory queryHistory = makeQueryHistory(); + queryHistory.setStageHistories(makeStageHistories()); + return queryHistory; + } + + private List<StageHistory> makeStageHistories() { + List<StageHistory> stageHistories = new ArrayList<StageHistory>(); + for(Stage eachStage : getStages()) { + stageHistories.add(eachStage.getStageHistory()); + } + + return stageHistories; + } + + private QueryHistory makeQueryHistory() { + QueryHistory queryHistory = new QueryHistory(); + + queryHistory.setQueryId(getId().toString()); + queryHistory.setQueryMaster(context.getQueryMasterContext().getWorkerContext().getWorkerName()); + queryHistory.setHttpPort(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getHttpInfoPort()); + queryHistory.setLogicalPlan(plan.toString()); + queryHistory.setLogicalPlan(plan.getLogicalPlan().toString()); + queryHistory.setDistributedPlan(plan.toString()); + + List<String[]> sessionVariables = new ArrayList<String[]>(); + for(Map.Entry<String,String> entry: plan.getContext().getAllKeyValus().entrySet()) { + if (SessionVars.exists(entry.getKey()) && SessionVars.isPublic(SessionVars.get(entry.getKey()))) { + sessionVariables.add(new String[]{entry.getKey(), entry.getValue()}); + } + } + queryHistory.setSessionVariables(sessionVariables); + + return queryHistory; + } + + public List<String> getDiagnostics() { + readLock.lock(); + try { + return diagnostics; + } finally { + readLock.unlock(); + } + } + + protected void addDiagnostic(String diag) { + diagnostics.add(diag); + } + + public TableDesc getResultDesc() { + return resultDesc; + } + + public void setResultDesc(TableDesc desc) { + resultDesc = desc; + } + + public MasterPlan getPlan() { + return plan; + } + + public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() { + return stateMachine; + } + + public void addStage(Stage stage) { + stages.put(stage.getId(), stage); + } + + public QueryId getId() { + return this.id; + } + + public Stage getStage(ExecutionBlockId id) { + return this.stages.get(id); + } + + public Collection<Stage> getStages() { + return this.stages.values(); + } + + public QueryState getSynchronizedState() { + readLock.lock(); + try { + return stateMachine.getCurrentState(); + } finally { + readLock.unlock(); + } + } + + /* non-blocking call for client API */ + public QueryState getState() { + return queryState; + } + + public ExecutionBlockCursor getExecutionBlockCursor() { + return cursor; + } + + public static class StartTransition + implements SingleArcTransition<Query, QueryEvent> { + + @Override + public void transition(Query query, QueryEvent queryEvent) { + + query.setStartTime(); + Stage stage = new Stage(query.context, query.getPlan(), + query.getExecutionBlockCursor().nextBlock()); + stage.setPriority(query.priority--); + query.addStage(stage); + + stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT)); + LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan()); + } + } + + public static class QueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> { + + @Override + public QueryState transition(Query query, QueryEvent queryEvent) { + QueryCompletedEvent stageEvent = (QueryCompletedEvent) queryEvent; + QueryState finalState; + + if (stageEvent.getState() == StageState.SUCCEEDED) { + finalState = finalizeQuery(query, stageEvent); + } else if (stageEvent.getState() == StageState.FAILED) { + finalState = QueryState.QUERY_FAILED; + } else if (stageEvent.getState() == StageState.KILLED) { + finalState = QueryState.QUERY_KILLED; + } else { + finalState = QueryState.QUERY_ERROR; + } + if (finalState != QueryState.QUERY_SUCCEEDED) { + Stage lastStage = query.getStage(stageEvent.getExecutionBlockId()); + if (lastStage != null && lastStage.getTableMeta() != null) { + StoreType storeType = lastStage.getTableMeta().getStoreType(); + if (storeType != null) { + LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); + try { + StorageManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild()); + } catch (IOException e) { + LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e); + } + } + } + } + query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId())); + query.setFinishTime(); + + return finalState; + } + + private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { + Stage lastStage = query.getStage(event.getExecutionBlockId()); + StoreType storeType = lastStage.getTableMeta().getStoreType(); + try { + LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); + CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); + + Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType) + .commitOutputData(query.context.getQueryContext(), + lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc); + + QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); + hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir); + } catch (Exception e) { + query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); + return QueryState.QUERY_ERROR; + } + + return QueryState.QUERY_SUCCEEDED; + } + + private static interface QueryHook { + boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir); + void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, + ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception; + } + + private class QueryHookExecutor { + private List<QueryHook> hookList = TUtil.newList(); + private QueryMaster.QueryMasterContext context; + + public QueryHookExecutor(QueryMaster.QueryMasterContext context) { + this.context = context; + hookList.add(new MaterializedResultHook()); + hookList.add(new CreateTableHook()); + hookList.add(new InsertTableHook()); + } + + public void execute(QueryContext queryContext, Query query, + ExecutionBlockId finalExecBlockId, + Path finalOutputDir) throws Exception { + for (QueryHook hook : hookList) { + if (hook.isEligible(queryContext, query, finalExecBlockId, finalOutputDir)) { + hook.execute(context, queryContext, query, finalExecBlockId, finalOutputDir); + } + } + } + } + + private class MaterializedResultHook implements QueryHook { + + @Override + public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, + Path finalOutputDir) { + Stage lastStage = query.getStage(finalExecBlockId); + NodeType type = lastStage.getBlock().getPlan().getType(); + return type != NodeType.CREATE_TABLE && type != NodeType.INSERT; + } + + @Override + public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, + Query query, ExecutionBlockId finalExecBlockId, + Path finalOutputDir) throws Exception { + Stage lastStage = query.getStage(finalExecBlockId); + TableMeta meta = lastStage.getTableMeta(); + + String nullChar = queryContext.get(SessionVars.NULL_CHAR); + meta.putOption(StorageConstants.TEXT_NULL, nullChar); + + TableStats stats = lastStage.getResultStats(); + + TableDesc resultTableDesc = + new TableDesc( + query.getId().toString(), + lastStage.getSchema(), + meta, + finalOutputDir.toUri()); + resultTableDesc.setExternal(true); + + stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir)); + resultTableDesc.setStats(stats); + query.setResultDesc(resultTableDesc); + } + } + + private class CreateTableHook implements QueryHook { + + @Override + public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, + Path finalOutputDir) { + Stage lastStage = query.getStage(finalExecBlockId); + return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_TABLE; + } + + @Override + public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, + Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception { + CatalogService catalog = context.getWorkerContext().getCatalog(); + Stage lastStage = query.getStage(finalExecBlockId); + TableStats stats = lastStage.getResultStats(); + + CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan(); + TableMeta meta = new TableMeta(createTableNode.getStorageType(), createTableNode.getOptions()); + + TableDesc tableDescTobeCreated = + new TableDesc( + createTableNode.getTableName(), + createTableNode.getTableSchema(), + meta, + finalOutputDir.toUri()); + tableDescTobeCreated.setExternal(createTableNode.isExternal()); + + if (createTableNode.hasPartition()) { + tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod()); + } + + stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir)); + tableDescTobeCreated.setStats(stats); + query.setResultDesc(tableDescTobeCreated); + + catalog.createTable(tableDescTobeCreated); + } + } + + private class InsertTableHook implements QueryHook { + + @Override + public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, + Path finalOutputDir) { + Stage lastStage = query.getStage(finalExecBlockId); + return lastStage.getBlock().getPlan().getType() == NodeType.INSERT; + } + + @Override + public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, + Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) + throws Exception { + + CatalogService catalog = context.getWorkerContext().getCatalog(); + Stage lastStage = query.getStage(finalExecBlockId); + TableMeta meta = lastStage.getTableMeta(); + TableStats stats = lastStage.getResultStats(); + + InsertNode insertNode = (InsertNode) lastStage.getBlock().getPlan(); + + TableDesc finalTable; + if (insertNode.hasTargetTable()) { + String tableName = insertNode.getTableName(); + finalTable = catalog.getTableDesc(tableName); + } else { + String tableName = query.getId().toString(); + finalTable = new TableDesc(tableName, lastStage.getSchema(), meta, finalOutputDir.toUri()); + } + + long volume = getTableVolume(query.systemConf, finalOutputDir); + stats.setNumBytes(volume); + finalTable.setStats(stats); + + if (insertNode.hasTargetTable()) { + UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder(); + builder.setTableName(finalTable.getName()); + builder.setStats(stats.getProto()); + + catalog.updateTableStats(builder.build()); + } + + query.setResultDesc(finalTable); + } + } + } + + public static long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(systemConf); + ContentSummary directorySummary = fs.getContentSummary(tablePath); + return directorySummary.getLength(); + } + + public static class StageCompletedTransition implements SingleArcTransition<Query, QueryEvent> { + + private boolean hasNext(Query query) { + ExecutionBlockCursor cursor = query.getExecutionBlockCursor(); + ExecutionBlock nextBlock = cursor.peek(); + return !query.getPlan().isTerminal(nextBlock); + } + + private void executeNextBlock(Query query) { + ExecutionBlockCursor cursor = query.getExecutionBlockCursor(); + ExecutionBlock nextBlock = cursor.nextBlock(); + Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock); + nextStage.setPriority(query.priority--); + query.addStage(nextStage); + nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT)); + + LOG.info("Scheduling Stage:" + nextStage.getId()); + if(LOG.isDebugEnabled()) { + LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority()); + LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan()); + } + } + + @Override + public void transition(Query query, QueryEvent event) { + try { + query.completedStagesCount++; + StageCompletedEvent castEvent = (StageCompletedEvent) event; + + if (castEvent.getState() == StageState.SUCCEEDED) { + query.successedStagesCount++; + } else if (castEvent.getState() == StageState.KILLED) { + query.killedStagesCount++; + } else if (castEvent.getState() == StageState.FAILED) { + query.failedStagesCount++; + } else if (castEvent.getState() == StageState.ERROR) { + query.erroredStagesCount++; + } else { + LOG.error(String.format("Invalid Stage (%s) State %s at %s", + castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name())); + query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR)); + } + + // if a stage is succeeded and a query is running + if (castEvent.getState() == StageState.SUCCEEDED && // latest stage succeeded + query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR. + hasNext(query)) { // there remains at least one stage. + query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport(); + executeNextBlock(query); + } else { // if a query is completed due to finished, kill, failure, or error + query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState())); + } + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR)); + } + } + } + + private static class DiagnosticsUpdateTransition implements SingleArcTransition<Query, QueryEvent> { + @Override + public void transition(Query query, QueryEvent event) { + query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate()); + } + } + + private static class KillNewQueryTransition implements SingleArcTransition<Query, QueryEvent> { + @Override + public void transition(Query query, QueryEvent event) { + query.setFinishTime(); + query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId())); + } + } + + private static class KillAllStagesTransition implements SingleArcTransition<Query, QueryEvent> { + @Override + public void transition(Query query, QueryEvent event) { + synchronized (query.stages) { + for (Stage stage : query.stages.values()) { + query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); + } + } + } + } + + private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> { + + @Override + public void transition(Query query, QueryEvent event) { + query.setFinishTime(); + query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId())); + } + } + + @Override + public void handle(QueryEvent event) { + LOG.info("Processing " + event.getQueryId() + " of type " + event.getType()); + try { + writeLock.lock(); + QueryState oldState = getSynchronizedState(); + try { + getStateMachine().doTransition(event.getType(), event); + queryState = getSynchronizedState(); + } catch (InvalidStateTransitonException e) { + LOG.error("Can't handle this event at current state" + + ", type:" + event + + ", oldState:" + oldState.name() + + ", nextState:" + getSynchronizedState().name() + , e); + eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR)); + } + + //notify the eventhandler of state change + if (oldState != getSynchronizedState()) { + LOG.info(id + " Query Transitioned from " + oldState + " to " + getSynchronizedState()); + } + } + + finally { + writeLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java new file mode 100644 index 0000000..bda2ec1 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.QueryMasterProtocol; +import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto; +import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.rm.WorkerResourceManager; +import org.apache.tajo.session.Session; +import org.apache.tajo.plan.logical.LogicalRootNode; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.util.NetUtils; + +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource; + +public class QueryInProgress extends CompositeService { + private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName()); + + private QueryId queryId; + + private Session session; + + private AsyncDispatcher dispatcher; + + private LogicalRootNode plan; + + private AtomicBoolean querySubmitted = new AtomicBoolean(false); + + private AtomicBoolean stopped = new AtomicBoolean(false); + + private QueryInfo queryInfo; + + private final TajoMaster.MasterContext masterContext; + + private NettyClientBase queryMasterRpc; + + private QueryMasterProtocolService queryMasterRpcClient; + + public QueryInProgress( + TajoMaster.MasterContext masterContext, + Session session, + QueryContext queryContext, + QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) { + super(QueryInProgress.class.getName()); + this.masterContext = masterContext; + this.session = session; + this.queryId = queryId; + this.plan = plan; + + queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr); + queryInfo.setStartTime(System.currentTimeMillis()); + } + + @Override + public void init(Configuration conf) { + dispatcher = new AsyncDispatcher(); + this.addService(dispatcher); + + dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler()); + super.init(conf); + } + + public synchronized void kill() { + if(queryMasterRpcClient != null){ + queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); + } + } + + @Override + public void stop() { + if(stopped.getAndSet(true)) { + return; + } + + LOG.info("========================================================="); + LOG.info("Stop query:" + queryId); + + masterContext.getResourceManager().stopQueryMaster(queryId); + + long startTime = System.currentTimeMillis(); + while(true) { + try { + if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) { + LOG.info(queryId + " QueryMaster stopped"); + break; + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + break; + } + + try { + synchronized (this){ + wait(100); + } + } catch (InterruptedException e) { + break; + } + if(System.currentTimeMillis() - startTime > 60 * 1000) { + LOG.warn("Failed to stop QueryMaster:" + queryId); + break; + } + } + + if(queryMasterRpc != null) { + RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc); + } + + masterContext.getHistoryWriter().appendHistory(queryInfo); + super.stop(); + } + + @Override + public void start() { + super.start(); + } + + public EventHandler getEventHandler() { + return dispatcher.getEventHandler(); + } + + + + public boolean startQueryMaster() { + try { + LOG.info("Initializing QueryInProgress for QueryID=" + queryId); + WorkerResourceManager resourceManager = masterContext.getResourceManager(); + WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this); + + // if no resource to allocate a query master + if(resource == null) { + LOG.info("No Available Resources for QueryMaster"); + return false; + } + + queryInfo.setQueryMaster(resource.getConnectionInfo().getHost()); + queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort()); + queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort()); + queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort()); + + getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo)); + + return true; + } catch (Exception e) { + catchException(e); + return false; + } + } + + class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> { + @Override + public void handle(QueryJobEvent queryJobEvent) { + if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) { + heartbeat(queryJobEvent.getQueryInfo()); + } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) { + QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId); + queryInProgress.getEventHandler().handle( + new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo())); + } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) { + submmitQueryToMaster(); + } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { + kill(); + } + } + } + + private void connectQueryMaster() throws Exception { + InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort()); + LOG.info("Connect to QueryMaster:" + addr); + queryMasterRpc = + RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true); + queryMasterRpcClient = queryMasterRpc.getStub(); + } + + private synchronized void submmitQueryToMaster() { + if(querySubmitted.get()) { + return; + } + + try { + if(queryMasterRpcClient == null) { + connectQueryMaster(); + } + if(queryMasterRpcClient == null) { + LOG.info("No QueryMaster conneciton info."); + //TODO wait + return; + } + LOG.info("Call executeQuery to :" + + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId); + + QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder(); + builder.setQueryId(queryId.getProto()) + .setQueryContext(queryInfo.getQueryContext().getProto()) + .setSession(session.getProto()) + .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr())) + .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build()); + + queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get()); + querySubmitted.set(true); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + + public void catchException(Exception e) { + LOG.error(e.getMessage(), e); + queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED); + queryInfo.setLastMessage(StringUtils.stringifyException(e)); + } + + public QueryId getQueryId() { + return queryId; + } + + public QueryInfo getQueryInfo() { + return this.queryInfo; + } + + public boolean isStarted() { + return !stopped.get() && this.querySubmitted.get(); + } + + private void heartbeat(QueryInfo queryInfo) { + LOG.info("Received QueryMaster heartbeat:" + queryInfo); + + // to avoid partial update by different heartbeats + synchronized (this.queryInfo) { + + // terminal state will let client to retrieve a query result + // So, we must set the query result before changing query state + if (isFinishState(queryInfo.getQueryState())) { + if (queryInfo.hasResultdesc()) { + this.queryInfo.setResultDesc(queryInfo.getResultDesc()); + } + } + + this.queryInfo.setQueryState(queryInfo.getQueryState()); + this.queryInfo.setProgress(queryInfo.getProgress()); + this.queryInfo.setFinishTime(queryInfo.getFinishTime()); + + // Update diagnosis message + if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) { + this.queryInfo.setLastMessage(queryInfo.getLastMessage()); + LOG.info(queryId + queryInfo.getLastMessage()); + } + + // if any error occurs, print outs the error message + if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) { + LOG.warn(queryId + " failed, " + queryInfo.getLastMessage()); + } + + + if (isFinishState(this.queryInfo.getQueryState())) { + masterContext.getQueryJobManager().getEventHandler().handle( + new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo)); + } + } + } + + private boolean isFinishState(TajoProtos.QueryState state) { + return state == TajoProtos.QueryState.QUERY_FAILED || + state == TajoProtos.QueryState.QUERY_KILLED || + state == TajoProtos.QueryState.QUERY_SUCCEEDED; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java new file mode 100644 index 0000000..1a1f2ff --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.querymaster; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tajo.master.QueryInfo; + +public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> { + private QueryInfo queryInfo; + + public QueryJobEvent(Type type, QueryInfo queryInfo) { + super(type); + + this.queryInfo = queryInfo; + } + + public QueryInfo getQueryInfo() { + return this.queryInfo; + } + + public enum Type { + QUERY_JOB_START, + QUERY_JOB_HEARTBEAT, + QUERY_JOB_FINISH, + QUERY_JOB_STOP, + QUERY_MASTER_START, + QUERY_MASTER_STOP, + QUERY_JOB_KILL + } +}
