Repository: hive Updated Branches: refs/heads/master 8841bbf13 -> b8944fef4
HIVE-16094. queued containers may timeout if they don't get to run for a long time. (Siddharth Seth, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a6c1ff71 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a6c1ff71 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a6c1ff71 Branch: refs/heads/master Commit: a6c1ff71b68c0dbe98781d0b8162f328080066b7 Parents: 8841bbf Author: Siddharth Seth <[email protected]> Authored: Mon Mar 6 18:03:06 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Mon Mar 6 18:03:06 2017 -0800 ---------------------------------------------------------------------- .../hive/llap/daemon/impl/AMReporter.java | 93 ++++++++++++-------- .../llap/daemon/impl/ContainerRunnerImpl.java | 3 +- .../llap/daemon/impl/TaskRunnerCallable.java | 7 +- .../llap/tezplugins/LlapTaskCommunicator.java | 2 +- 4 files changed, 64 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a6c1ff71/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 65f7232..e5bd05e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -111,7 +111,7 @@ public class AMReporter extends AbstractService { private final AtomicBoolean isShutdown = new AtomicBoolean(false); // Tracks appMasters to which heartbeats are being sent. This should not be used for any other // messages like taskKilled, etc. - private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>(); + private final Map<QueryIdentifier, AMNodeInfo> knownAppMasters = new HashMap<>(); volatile ListenableFuture<Void> queueLookupFuture; private final DaemonId daemonId; @@ -198,34 +198,42 @@ public class AMReporter extends AbstractService { Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier, TezTaskAttemptID attemptId) { if (LOG.isTraceEnabled()) { - LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for queryIdentifier=" + queryIdentifier); + LOG.trace( + "Registering for heartbeat: {}, queryIdentifier={}, attemptId={}", + (amLocation + ":" + port), queryIdentifier, attemptId); } AMNodeInfo amNodeInfo; + + // Since we don't have an explicit AM end signal yet - we're going to create + // and discard AMNodeInfo instances per query. synchronized (knownAppMasters) { LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); - amNodeInfo = knownAppMasters.get(amNodeId); + amNodeInfo = knownAppMasters.get(queryIdentifier); if (amNodeInfo == null) { amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); - knownAppMasters.put(amNodeId, amNodeInfo); + knownAppMasters.put(queryIdentifier, amNodeInfo); // Add to the queue only the first time this is registered, and on // subsequent instances when it's taken off the queue. amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + heartbeatInterval); pendingHeartbeatQueeu.add(amNodeInfo); + // AMNodeInfo will only be cleared when a queryComplete is received for this query, or + // when we detect a failure on the AM side (failure to heartbeat). + // A single queueLookupCallable is added here. We have to make sure one instance stays + // in the queue till the query completes. } - amNodeInfo.setCurrentQueryIdentifier(queryIdentifier); amNodeInfo.addTaskAttempt(attemptId); } } - public void unregisterTask(String amLocation, int port, TezTaskAttemptID ta) { + public void unregisterTask(String amLocation, int port, QueryIdentifier queryIdentifier, TezTaskAttemptID ta) { + if (LOG.isTraceEnabled()) { - LOG.trace("Un-registering for heartbeat: " + amLocation + ":" + port); + LOG.trace("Un-registering for heartbeat: {}, attempt={}", (amLocation + ":" + port), ta); } AMNodeInfo amNodeInfo; - LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); synchronized (knownAppMasters) { - amNodeInfo = knownAppMasters.get(amNodeId); + amNodeInfo = knownAppMasters.get(queryIdentifier); if (amNodeInfo == null) { LOG.info(("Ignoring duplicate unregisterRequest for am at: " + amLocation + ":" + port)); } else { @@ -241,7 +249,7 @@ public class AMReporter extends AbstractService { LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); AMNodeInfo amNodeInfo; synchronized (knownAppMasters) { - amNodeInfo = knownAppMasters.get(amNodeId); + amNodeInfo = knownAppMasters.get(queryIdentifier); if (amNodeInfo == null) { amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); @@ -266,10 +274,17 @@ public class AMReporter extends AbstractService { }); } - public void queryComplete(LlapNodeId llapNodeId) { - if (llapNodeId != null) { + public void queryComplete(QueryIdentifier queryIdentifier) { + if (queryIdentifier != null) { synchronized (knownAppMasters) { - AMNodeInfo amNodeInfo = knownAppMasters.remove(llapNodeId); + AMNodeInfo amNodeInfo = knownAppMasters.remove(queryIdentifier); + + // The AM can be used for multiple queries. This is an indication that a single query is complete. + // We don't have a good mechanism to know when an app ends. Removing this right now ensures + // that a new one gets created for the next query on the same AM. + if (amNodeInfo != null) { + amNodeInfo.setIsDone(true); + } // TODO: not stopping umbilical explicitly as some taskKill requests may get scheduled during queryComplete // which will be using the umbilical. HIVE-16021 should fix this, until then leave umbilical open and wait for // it to be closed after max idle timeout (10s default) @@ -287,22 +302,26 @@ public class AMReporter extends AbstractService { while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { try { final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take(); - if (amNodeInfo.hasAmFailed()) { + if (amNodeInfo.hasAmFailed() || amNodeInfo.isDone()) { synchronized (knownAppMasters) { if (LOG.isDebugEnabled()) { LOG.debug( - "Removing am {} with last associated dag {} from heartbeat with taskCount={}, amFailed={}", - amNodeInfo.amNodeId, amNodeInfo.getCurrentQueryIdentifier(), amNodeInfo.getTaskCount(), - amNodeInfo.hasAmFailed(), amNodeInfo); + "Removing am {} with last associated dag {} from heartbeat with taskCount={}, amFailed={}, isDone={}", + amNodeInfo.amNodeId, amNodeInfo.getQueryIdentifier(), amNodeInfo.getTaskCount(), + amNodeInfo.hasAmFailed(), amNodeInfo.isDone()); } - knownAppMasters.remove(amNodeInfo.amNodeId); + knownAppMasters.remove(amNodeInfo.getQueryIdentifier()); } } else { + // Always re-schedule the next callable - irrespective of task count, + // in case new tasks come in later. + long next = System.currentTimeMillis() + heartbeatInterval; + amNodeInfo.setNextHeartbeatTime(next); + pendingHeartbeatQueeu.add(amNodeInfo); + + // Send an actual heartbeat only if the task count is > 0 if (amNodeInfo.getTaskCount() > 0) { // Add back to the queue for the next heartbeat, and schedule the actual heartbeat - long next = System.currentTimeMillis() + heartbeatInterval; - amNodeInfo.setNextHeartbeatTime(next); - pendingHeartbeatQueeu.add(amNodeInfo); ListenableFuture<Void> future = executor.submit(new AMHeartbeatCallable(amNodeInfo)); Futures.addCallback(future, new FutureCallback<Void>() { @Override @@ -312,9 +331,9 @@ public class AMReporter extends AbstractService { @Override public void onFailure(Throwable t) { - QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); + QueryIdentifier currentQueryIdentifier = amNodeInfo.getQueryIdentifier(); amNodeInfo.setAmFailed(true); - LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}", + LOG.warn("Heartbeat failed to AM {}. Marking query as failed. query={}", amNodeInfo.amNodeId, currentQueryIdentifier, t); queryFailedHandler.queryFailed(currentQueryIdentifier); } @@ -374,9 +393,6 @@ public class AMReporter extends AbstractService { } List<TezTaskAttemptID> tasks = amNodeInfo.getTasksSnapshot(); if (tasks.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping node heartbeat to AM: " + amNodeInfo + ", since ref count is 0"); - } return null; } try { @@ -388,7 +404,7 @@ public class AMReporter extends AbstractService { amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()), new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort(), aw); } catch (IOException e) { - QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); + QueryIdentifier currentQueryIdentifier = amNodeInfo.getQueryIdentifier(); amNodeInfo.setAmFailed(true); LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}", amNodeInfo.amNodeId, currentQueryIdentifier, e); @@ -416,9 +432,10 @@ public class AMReporter extends AbstractService { private final long timeout; private final SocketFactory socketFactory; private final AtomicBoolean amFailed = new AtomicBoolean(false); - private QueryIdentifier currentQueryIdentifier; + private final QueryIdentifier queryIdentifier; private LlapTaskUmbilicalProtocol umbilical; private long nextHeartbeatTime; + private final AtomicBoolean isDone = new AtomicBoolean(false); public AMNodeInfo(LlapNodeId amNodeId, String user, @@ -430,7 +447,7 @@ public class AMReporter extends AbstractService { Configuration conf) { this.user = user; this.jobToken = jobToken; - this.currentQueryIdentifier = currentQueryIdentifier; + this.queryIdentifier = currentQueryIdentifier; this.retryPolicy = retryPolicy; this.timeout = timeout; this.socketFactory = socketFactory; @@ -491,6 +508,14 @@ public class AMReporter extends AbstractService { return amFailed.get(); } + void setIsDone(boolean val) { + isDone.set(val); + } + + boolean isDone() { + return isDone.get(); + } + List<TezTaskAttemptID> getTasksSnapshot() { List<TezTaskAttemptID> result = new ArrayList<>(); synchronized (tasks) { @@ -499,12 +524,8 @@ public class AMReporter extends AbstractService { return result; } - public synchronized QueryIdentifier getCurrentQueryIdentifier() { - return currentQueryIdentifier; - } - - public synchronized void setCurrentQueryIdentifier(QueryIdentifier queryIdentifier) { - this.currentQueryIdentifier = queryIdentifier; + public QueryIdentifier getQueryIdentifier() { + return queryIdentifier; } synchronized void setNextHeartbeatTime(long nextTime) { @@ -530,7 +551,7 @@ public class AMReporter extends AbstractService { @Override public String toString() { - return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount(); + return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount() + ", queryIdentifier=" + queryIdentifier; } private int getTaskCount() { http://git-wip-us.apache.org/repos/asf/hive/blob/a6c1ff71/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index af8f5b0..ca476ec 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -401,8 +401,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu fragmentInfo.getFragmentIdentifierString()); executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); } - LlapNodeId amNodeId = queryInfo.getAmNodeId(); - amReporter.queryComplete(amNodeId); + amReporter.queryComplete(queryIdentifier); } return QueryCompleteResponseProto.getDefaultInstance(); } http://git-wip-us.apache.org/repos/asf/hive/blob/a6c1ff71/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 25dc569..f24a647 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -188,7 +188,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { // Unregister from the AMReporter, since the task is now running. TezTaskAttemptID ta = taskSpec.getTaskAttemptID(); - this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(), ta); + this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(), + fragmentInfo.getQueryInfo().getQueryIdentifier(), ta); synchronized (this) { if (!shouldRunTask) { @@ -355,7 +356,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { // If the task hasn't started - inform about fragment completion immediately. It's possible for // the callable to never run. fragmentCompletionHanler.fragmentComplete(fragmentInfo); - this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(), ta); + this.amReporter + .unregisterTask(request.getAmHost(), request.getAmPort(), + fragmentInfo.getQueryInfo().getQueryIdentifier(), ta); } } } else { http://git-wip-us.apache.org/repos/asf/hive/blob/a6c1ff71/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index c700913..8bb6cab 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -761,10 +761,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { @Override public void nodeHeartbeat( Text hostname, Text uniqueId, int port, TezAttemptArray aw) throws IOException { - nodePinged(hostname.toString(), uniqueId.toString(), port, aw); if (LOG.isDebugEnabled()) { LOG.debug("Received heartbeat from [" + hostname + ":" + port +" (" + uniqueId +")]"); } + nodePinged(hostname.toString(), uniqueId.toString(), port, aw); } @Override
