HIVE-16094. queued containers may timeout if they don't get to run for a long time.
Change-Id: Ieb412a66dbe53c6709f7bd840b3dfa543225d826 (cherry picked from commit 3def1d7f19982b4a710e7cb867a8b6b7bbf8fb97) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f3c949c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f3c949c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f3c949c Branch: refs/heads/branch-2.2 Commit: 2f3c949c20ca51ac835bedd9bff2ac1c07e000a1 Parents: 29db808 Author: Siddharth Seth <[email protected]> Authored: Thu Mar 2 22:27:39 2017 -0800 Committer: Owen O'Malley <[email protected]> Committed: Tue Mar 28 15:27:57 2017 -0700 ---------------------------------------------------------------------- .../hive/llap/daemon/impl/AMReporter.java | 95 ++++++++++++-------- .../llap/daemon/impl/ContainerRunnerImpl.java | 3 +- .../llap/daemon/impl/TaskRunnerCallable.java | 7 +- .../llap/tezplugins/LlapTaskCommunicator.java | 2 +- 4 files changed, 64 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2f3c949c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index b01a495..af4a1f2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -100,7 +100,7 @@ public class AMReporter extends AbstractService { private final AtomicBoolean isShutdown = new AtomicBoolean(false); // Tracks appMasters to which heartbeats are being sent. This should not be used for any other // messages like taskKilled, etc. - private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>(); + private final Map<QueryIdentifier, AMNodeInfo> knownAppMasters = new HashMap<>(); volatile ListenableFuture<Void> queueLookupFuture; private final DaemonId daemonId; @@ -186,35 +186,42 @@ public class AMReporter extends AbstractService { public void registerTask(String amLocation, int port, String user, Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier) { if (LOG.isTraceEnabled()) { - LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for queryIdentifier=" + queryIdentifier); + LOG.trace( + "Registering for heartbeat: {}, queryIdentifier={}", + (amLocation + ":" + port), queryIdentifier); } AMNodeInfo amNodeInfo; + + // Since we don't have an explicit AM end signal yet - we're going to create + // and discard AMNodeInfo instances per query. synchronized (knownAppMasters) { LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); - amNodeInfo = knownAppMasters.get(amNodeId); + amNodeInfo = knownAppMasters.get(queryIdentifier); if (amNodeInfo == null) { amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); - knownAppMasters.put(amNodeId, amNodeInfo); + knownAppMasters.put(queryIdentifier, amNodeInfo); // Add to the queue only the first time this is registered, and on // subsequent instances when it's taken off the queue. amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + heartbeatInterval); pendingHeartbeatQueeu.add(amNodeInfo); + // AMNodeInfo will only be cleared when a queryComplete is received for this query, or + // when we detect a failure on the AM side (failure to heartbeat). + // A single queueLookupCallable is added here. We have to make sure one instance stays + // in the queue till the query completes. } - amNodeInfo.setCurrentQueryIdentifier(queryIdentifier); amNodeInfo.incrementAndGetTaskCount(); } } - public void unregisterTask(String amLocation, int port) { + public void unregisterTask(String amLocation, int port, QueryIdentifier queryIdentifier) { if (LOG.isTraceEnabled()) { - LOG.trace("Un-registering for heartbeat: " + amLocation + ":" + port); + LOG.trace("Un-registering for heartbeat: {}", (amLocation + ":" + port)); } AMNodeInfo amNodeInfo; - LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); synchronized (knownAppMasters) { - amNodeInfo = knownAppMasters.get(amNodeId); + amNodeInfo = knownAppMasters.get(queryIdentifier); if (amNodeInfo == null) { LOG.info(("Ignoring duplicate unregisterRequest for am at: " + amLocation + ":" + port)); } else { @@ -230,7 +237,7 @@ public class AMReporter extends AbstractService { LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); AMNodeInfo amNodeInfo; synchronized (knownAppMasters) { - amNodeInfo = knownAppMasters.get(amNodeId); + amNodeInfo = knownAppMasters.get(queryIdentifier); if (amNodeInfo == null) { amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, conf); @@ -255,10 +262,17 @@ public class AMReporter extends AbstractService { }); } - public void queryComplete(LlapNodeId llapNodeId) { - if (llapNodeId != null) { + public void queryComplete(QueryIdentifier queryIdentifier) { + if (queryIdentifier != null) { synchronized (knownAppMasters) { - AMNodeInfo amNodeInfo = knownAppMasters.remove(llapNodeId); + AMNodeInfo amNodeInfo = knownAppMasters.remove(queryIdentifier); + + // The AM can be used for multiple queries. This is an indication that a single query is complete. + // We don't have a good mechanism to know when an app ends. Removing this right now ensures + // that a new one gets created for the next query on the same AM. + if (amNodeInfo != null) { + amNodeInfo.setIsDone(true); + } // TODO: not stopping umbilical explicitly as some taskKill requests may get scheduled during queryComplete // which will be using the umbilical. HIVE-16021 should fix this, until then leave umbilical open and wait for // it to be closed after max idle timeout (10s default) @@ -276,22 +290,26 @@ public class AMReporter extends AbstractService { while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { try { final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take(); - if (amNodeInfo.hasAmFailed()) { + if (amNodeInfo.hasAmFailed() || amNodeInfo.isDone()) { synchronized (knownAppMasters) { if (LOG.isDebugEnabled()) { LOG.debug( - "Removing am {} with last associated dag {} from heartbeat with taskCount={}, amFailed={}", - amNodeInfo.amNodeId, amNodeInfo.getCurrentQueryIdentifier(), amNodeInfo.getTaskCount(), - amNodeInfo.hasAmFailed(), amNodeInfo); + "Removing am {} with last associated dag {} from heartbeat with taskCount={}, amFailed={}, isDone={}", + amNodeInfo.amNodeId, amNodeInfo.getQueryIdentifier(), amNodeInfo.getTaskCount(), + amNodeInfo.hasAmFailed(), amNodeInfo.isDone()); } - knownAppMasters.remove(amNodeInfo.amNodeId); + knownAppMasters.remove(amNodeInfo.getQueryIdentifier()); } } else { + // Always re-schedule the next callable - irrespective of task count, + // in case new tasks come in later. + long next = System.currentTimeMillis() + heartbeatInterval; + amNodeInfo.setNextHeartbeatTime(next); + pendingHeartbeatQueeu.add(amNodeInfo); + + // Send an actual heartbeat only if the task count is > 0 if (amNodeInfo.getTaskCount() > 0) { // Add back to the queue for the next heartbeat, and schedule the actual heartbeat - long next = System.currentTimeMillis() + heartbeatInterval; - amNodeInfo.setNextHeartbeatTime(next); - pendingHeartbeatQueeu.add(amNodeInfo); ListenableFuture<Void> future = executor.submit(new AMHeartbeatCallable(amNodeInfo)); Futures.addCallback(future, new FutureCallback<Void>() { @Override @@ -301,9 +319,9 @@ public class AMReporter extends AbstractService { @Override public void onFailure(Throwable t) { - QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); + QueryIdentifier currentQueryIdentifier = amNodeInfo.getQueryIdentifier(); amNodeInfo.setAmFailed(true); - LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}", + LOG.warn("Heartbeat failed to AM {}. Marking query as failed. query={}", amNodeInfo.amNodeId, currentQueryIdentifier, t); queryFailedHandler.queryFailed(currentQueryIdentifier); } @@ -369,7 +387,7 @@ public class AMReporter extends AbstractService { amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()), new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort()); } catch (IOException e) { - QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); + QueryIdentifier currentQueryIdentifier = amNodeInfo.getQueryIdentifier(); amNodeInfo.setAmFailed(true); LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}", amNodeInfo.amNodeId, currentQueryIdentifier, e); @@ -379,11 +397,7 @@ public class AMReporter extends AbstractService { LOG.warn("Interrupted while trying to send heartbeat to AM {}", amNodeInfo.amNodeId, e); } } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping node heartbeat to AM: " + amNodeInfo + ", since ref count is 0"); - } - } + } return null; } } @@ -400,9 +414,10 @@ public class AMReporter extends AbstractService { private final long timeout; private final SocketFactory socketFactory; private final AtomicBoolean amFailed = new AtomicBoolean(false); - private QueryIdentifier currentQueryIdentifier; + private final QueryIdentifier queryIdentifier; private LlapTaskUmbilicalProtocol umbilical; private long nextHeartbeatTime; + private final AtomicBoolean isDone = new AtomicBoolean(false); public AMNodeInfo(LlapNodeId amNodeId, String user, @@ -414,7 +429,7 @@ public class AMReporter extends AbstractService { Configuration conf) { this.user = user; this.jobToken = jobToken; - this.currentQueryIdentifier = currentQueryIdentifier; + this.queryIdentifier = currentQueryIdentifier; this.retryPolicy = retryPolicy; this.timeout = timeout; this.socketFactory = socketFactory; @@ -465,16 +480,20 @@ public class AMReporter extends AbstractService { return amFailed.get(); } - int getTaskCount() { - return taskCount.get(); + void setIsDone(boolean val) { + isDone.set(val); } - public synchronized QueryIdentifier getCurrentQueryIdentifier() { - return currentQueryIdentifier; + boolean isDone() { + return isDone.get(); + } + + int getTaskCount() { + return taskCount.get(); } - public synchronized void setCurrentQueryIdentifier(QueryIdentifier queryIdentifier) { - this.currentQueryIdentifier = queryIdentifier; + public QueryIdentifier getQueryIdentifier() { + return queryIdentifier; } synchronized void setNextHeartbeatTime(long nextTime) { @@ -500,7 +519,7 @@ public class AMReporter extends AbstractService { @Override public String toString() { - return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount(); + return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount() + ", queryIdentifier=" + queryIdentifier; } } } http://git-wip-us.apache.org/repos/asf/hive/blob/2f3c949c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 1176e5e..2b9ece9 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -399,8 +399,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu fragmentInfo.getFragmentIdentifierString()); executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); } - LlapNodeId amNodeId = queryInfo.getAmNodeId(); - amReporter.queryComplete(amNodeId); + amReporter.queryComplete(queryIdentifier); } return QueryCompleteResponseProto.getDefaultInstance(); } http://git-wip-us.apache.org/repos/asf/hive/blob/2f3c949c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 8739d5b..716d05b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -178,7 +178,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { } // Unregister from the AMReporter, since the task is now running. - this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); + this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(), + fragmentInfo.getQueryInfo().getQueryIdentifier()); synchronized (this) { if (!shouldRunTask) { @@ -326,7 +327,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { // If the task hasn't started - inform about fragment completion immediately. It's possible for // the callable to never run. fragmentCompletionHanler.fragmentComplete(fragmentInfo); - this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); + this.amReporter + .unregisterTask(request.getAmHost(), request.getAmPort(), + fragmentInfo.getQueryInfo().getQueryIdentifier()); } } } else { http://git-wip-us.apache.org/repos/asf/hive/blob/2f3c949c/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index c716c5e..f6166a6 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -744,10 +744,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { @Override public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException { - nodePinged(hostname.toString(), uniqueId.toString(), port); if (LOG.isDebugEnabled()) { LOG.debug("Received heartbeat from [" + hostname + ":" + port +" (" + uniqueId +")]"); } + nodePinged(hostname.toString(), uniqueId.toString(), port); } @Override
