Repository: hive Updated Branches: refs/heads/master 96a409e1c -> 20c9a3905
Revert "HIVE-18326 : LLAP Tez scheduler - only preempt tasks if there's a dependency between them (Sergey Shelukhin, reviewed by Eric Wohlstadter, Jason Dere)" This reverts commit 3f5148d6aae94f2ae9db2aacccb302211834c699. Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/20c9a390 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/20c9a390 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/20c9a390 Branch: refs/heads/master Commit: 20c9a3905f4b1b627c935ad54a53a7a59015587c Parents: 96a409e Author: sergey <[email protected]> Authored: Thu Jan 4 19:03:04 2018 -0800 Committer: sergey <[email protected]> Committed: Thu Jan 4 19:03:04 2018 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 - .../tezplugins/LlapTaskSchedulerService.java | 165 ++++--------------- 2 files changed, 35 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/20c9a390/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6529da6..1dc7501 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3400,9 +3400,6 @@ public class HiveConf extends Configuration { "Backoff factor on successive blacklists of a node due to some failures. Blacklist times\n" + "start at the min timeout and go up to the max timeout based on this backoff factor.", "llap.task.scheduler.node.disable.backoff.factor"), - LLAP_TASK_SCHEDULER_PREEMPT_INDEPENDENT("hive.llap.task.scheduler.preempt.independent", false, - "Whether the AM LLAP scheduler should preempt a lower priority task for a higher pri one\n" + - "even if the former doesn't depend on the latter (e.g. for two parallel sides of a union)."), LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE( "hive.llap.task.scheduler.num.schedulable.tasks.per.node", 0, "The number of tasks the AM TaskScheduler will try allocating per node. 0 indicates that\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/20c9a390/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 14bb85a..e97a267 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -14,6 +14,15 @@ package org.apache.hadoop.hive.llap.tezplugins; +import com.google.common.io.ByteArrayDataOutput; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl; + +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -66,17 +75,13 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance; import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator.OperationCallback; import org.apache.hadoop.hive.llap.tezplugins.endpoint.LlapPluginServerImpl; -import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics; import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback; -import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; -import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl; -import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -92,12 +97,8 @@ import org.apache.tez.common.TezUtils; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.TaskAttempt; -import org.apache.tez.dag.app.dag.Vertex; -import org.apache.tez.dag.app.dag.impl.Edge; import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; @@ -108,9 +109,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -249,11 +248,6 @@ public class LlapTaskSchedulerService extends TaskScheduler { private LlapTaskCommunicator communicator; private final int amPort; private final String serializedToken, jobIdForToken; - // We expect the DAGs to not be super large, so store full dependency set for each vertex to - // avoid traversing the tree later. To save memory, this could be an array (of byte arrays?). - private final Object outputsLock = new Object(); - private boolean isInitialized = false; - private Map<Integer, Set<Integer>> transitiveOutputs; public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { this(taskSchedulerContext, new MonotonicClock(), true); @@ -375,6 +369,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { hostsString, numSchedulableTasksPerNode, nodeBlacklistConf, localityDelayConf); this.amRegistry = TezAmRegistryImpl.create(conf, true); + synchronized (LlapTaskCommunicator.pluginInitLock) { LlapTaskCommunicator peer = LlapTaskCommunicator.instance; if (peer != null) { @@ -388,84 +383,6 @@ public class LlapTaskSchedulerService extends TaskScheduler { } } - private Map<Integer, Set<Integer>> getDependencyInfo() { - synchronized (outputsLock) { - if (isInitialized) return transitiveOutputs; - isInitialized = true; - if (!HiveConf.getBoolVar(conf, ConfVars.LLAP_TASK_SCHEDULER_PREEMPT_INDEPENDENT)) { - this.transitiveOutputs = getTransitiveVertexOutputs(getContext().getCurrentDagInfo()); - } - return this.transitiveOutputs; - } - } - - private static Map<Integer, Set<Integer>> getTransitiveVertexOutputs(DagInfo info) { - if (!(info instanceof DAG)) { - LOG.warn("DAG info is not a DAG - cannot derive dependencies"); - return null; - } - DAG dag = (DAG) info; - int vc = dag.getVertices().size(); - // All the vertices belong to the same DAG, so we just use numbers. - Map<Integer, Set<Integer>> result = Maps.newHashMapWithExpectedSize(vc); - LinkedList<TezVertexID> queue = new LinkedList<>(); - // We assume a DAG is a DAG, and that it's connected. Add direct dependencies. - for (Vertex v : dag.getVertices().values()) { - Map<Vertex, Edge> out = v.getOutputVertices(); - if (out == null) { - result.put(v.getVertexId().getId(), Sets.newHashSet()); - } else { - Set<Integer> set = Sets.newHashSetWithExpectedSize(vc); - for (Vertex outV : out.keySet()) { - set.add(outV.getVertexId().getId()); - } - result.put(v.getVertexId().getId(), set); - } - if (v.getOutputVerticesCount() == 0) { - queue.add(v.getVertexId()); - } - } - Set<Integer> processed = Sets.newHashSetWithExpectedSize(vc); - while (!queue.isEmpty()) { - TezVertexID id = queue.poll(); - if (processed.contains(id.getId())) continue; // Already processed. See backtracking. - Vertex v = dag.getVertex(id); - Map<Vertex, Edge> out = v.getOutputVertices(); - if (out != null) { - // Check that all the outputs have been processed; if not, insert them into queue - // before the current vertex and try again. It's possible e.g. in a structure like this: - // _1 - // / 2 - // 3 4 where 1 may be added to the queue before 2 - boolean doBacktrack = false; - for (Vertex outV : out.keySet()) { - TezVertexID outId = outV.getVertexId(); - int outNum = outId.getId(); - if (!processed.contains(outNum)) { - if (!doBacktrack) { - queue.addFirst(id); - doBacktrack = true; - } - queue.addFirst(outId); - } - } - if (doBacktrack) continue; - } - int num = id.getId(); - processed.add(num); - Set<Integer> deps = result.get(num); - Map<Vertex, Edge> in = v.getInputVertices(); - if (in != null) { - for (Vertex inV : in.keySet()) { - queue.add(inV.getVertexId()); - // Our outputs are the transitive outputs of our inputs. - result.get(inV.getVertexId().getId()).addAll(deps); - } - } - } - return result; - } - private static Token<JobTokenIdentifier> createAmsToken(ApplicationId id) { if (!UserGroupInformation.isSecurityEnabled()) return null; JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(id.toString())); @@ -1707,13 +1624,12 @@ public class LlapTaskSchedulerService extends TaskScheduler { break; } } - if (shouldPreempt) { if (LOG.isDebugEnabled()) { LOG.debug("Attempting to preempt for {} on potential hosts={}. TotalPendingPreemptions={}", taskInfo.task, Arrays.toString(potentialHosts), pendingPreemptions.get()); } - preemptTasks(entry.getKey().getPriority(), vertexNum(taskInfo), 1, potentialHosts); + preemptTasks(entry.getKey().getPriority(), 1, potentialHosts); } else { if (LOG.isDebugEnabled()) { LOG.debug("Not preempting for {} on potential hosts={}. An existing preemption request exists", @@ -1731,7 +1647,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { "Attempting to preempt for task={}, priority={} on any available host", taskInfo.task, taskInfo.priority); } - preemptTasks(entry.getKey().getPriority(), vertexNum(taskInfo), 1, null); + preemptTasks(entry.getKey().getPriority(), 1, null); } else { if (LOG.isDebugEnabled()) { LOG.debug( @@ -1770,10 +1686,6 @@ public class LlapTaskSchedulerService extends TaskScheduler { } } - private static int vertexNum(TaskInfo taskInfo) { - return taskInfo.getAttemptId().getTaskID().getVertexID().getId(); // Sigh... - } - private String constructPendingTaskCountsLogMessage() { StringBuilder sb = new StringBuilder(); int totalCount = 0; @@ -1861,21 +1773,19 @@ public class LlapTaskSchedulerService extends TaskScheduler { // Removes tasks from the runningList and sends out a preempt request to the system. // Subsequent tasks will be scheduled again once the de-allocate request for the preempted // task is processed. - private void preemptTasks( - int forPriority, int forVertex, int numTasksToPreempt, String []potentialHosts) { + private void preemptTasks(int forPriority, int numTasksToPreempt, String []potentialHosts) { Set<String> preemptHosts = null; writeLock.lock(); List<TaskInfo> preemptedTaskList = null; try { - // TODO: numTasksToPreempt is currently always 1. - preemptedTaskList = preemptTasksFromMap(speculativeTasks, forPriority, forVertex, - numTasksToPreempt, potentialHosts, preemptHosts, preemptedTaskList); + preemptedTaskList = preemptTasksFromMap(speculativeTasks, forPriority, numTasksToPreempt, + potentialHosts, preemptHosts, preemptedTaskList); if (preemptedTaskList != null) { numTasksToPreempt -= preemptedTaskList.size(); } if (numTasksToPreempt > 0) { - preemptedTaskList = preemptTasksFromMap(guaranteedTasks, forPriority, forVertex, - numTasksToPreempt, potentialHosts, preemptHosts, preemptedTaskList); + preemptedTaskList = preemptTasksFromMap(guaranteedTasks, forPriority, numTasksToPreempt, + potentialHosts, preemptHosts, preemptedTaskList); } } finally { writeLock.unlock(); @@ -1894,8 +1804,8 @@ public class LlapTaskSchedulerService extends TaskScheduler { } private List<TaskInfo> preemptTasksFromMap(TreeMap<Integer, TreeSet<TaskInfo>> runningTasks, - int forPriority, int forVertex, int numTasksToPreempt, String[] potentialHosts, - Set<String> preemptHosts, List<TaskInfo> preemptedTaskList) { + int forPriority, int numTasksToPreempt, String[] potentialHosts, Set<String> preemptHosts, + List<TaskInfo> preemptedTaskList) { NavigableMap<Integer, TreeSet<TaskInfo>> orderedMap = runningTasks.descendingMap(); Iterator<Entry<Integer, TreeSet<TaskInfo>>> iterator = orderedMap.entrySet().iterator(); int preemptedCount = 0; @@ -1908,27 +1818,21 @@ public class LlapTaskSchedulerService extends TaskScheduler { Iterator<TaskInfo> taskInfoIterator = entryAtPriority.getValue().iterator(); while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) { TaskInfo taskInfo = taskInfoIterator.next(); - if (preemptHosts != null && !preemptHosts.contains(taskInfo.assignedNode.getHost())) { - continue; // Not the right host. - } - Map<Integer,Set<Integer>> depInfo = getDependencyInfo(); - if (depInfo != null && !depInfo.get(forVertex).contains(vertexNum(taskInfo))) { - // Only preempt if the task being preempted is "below" us in the dag. - continue; - } - // Candidate for preemption. - preemptedCount++; - LOG.info("preempting {} for task at priority {} with potentialHosts={}", taskInfo, - forPriority, potentialHosts == null ? "" : Arrays.toString(potentialHosts)); - taskInfo.setPreemptedInfo(clock.getTime()); - if (preemptedTaskList == null) { - preemptedTaskList = new LinkedList<>(); + if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedNode.getHost())) { + // Candidate for preemption. + preemptedCount++; + LOG.info("preempting {} for task at priority {} with potentialHosts={}", taskInfo, + forPriority, potentialHosts == null ? "" : Arrays.toString(potentialHosts)); + taskInfo.setPreemptedInfo(clock.getTime()); + if (preemptedTaskList == null) { + preemptedTaskList = new LinkedList<>(); + } + dagStats.registerTaskPreempted(taskInfo.assignedNode.getHost()); + preemptedTaskList.add(taskInfo); + registerPendingPreemption(taskInfo.assignedNode.getHost()); + // Remove from the runningTaskList + taskInfoIterator.remove(); } - dagStats.registerTaskPreempted(taskInfo.assignedNode.getHost()); - preemptedTaskList.add(taskInfo); - registerPendingPreemption(taskInfo.assignedNode.getHost()); - // Remove from the runningTaskList - taskInfoIterator.remove(); } // Remove entire priority level if it's been emptied. @@ -2819,6 +2723,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { return isPendingUpdate; } + @VisibleForTesting TezTaskAttemptID getAttemptId() { return attemptId; }
