Repository: apex-core Updated Branches: refs/heads/master df8bc7e00 -> 16d1bf62d
APEXCORE-662 Raising StramEvent in case of heartbeat miss Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/16d1bf62 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/16d1bf62 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/16d1bf62 Branch: refs/heads/master Commit: 16d1bf62d7a4c83aec1c3bdb9a8e5878fae42323 Parents: df8bc7e Author: Hitesh-Scorpio <[email protected]> Authored: Tue Mar 14 16:31:43 2017 +0530 Committer: Hitesh-Scorpio <[email protected]> Committed: Thu Mar 23 22:06:53 2017 +0530 ---------------------------------------------------------------------- .../datatorrent/stram/StreamingContainerManager.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/16d1bf62/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 9b0c4f4..ee07af1 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -780,14 +780,22 @@ public class StreamingContainerManager implements PlanContext //LOG.debug("{} {} {}", c.getExternalId(), currentTms - sca.createdMillis, this.vars.heartbeatTimeoutMillis); // container allocated but process was either not launched or is not able to phone home if (currentTms - sca.createdMillis > 2 * this.vars.heartbeatTimeoutMillis) { - LOG.info("Container {}@{} startup timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.createdMillis); + LOG.error("Container {}@{} startup timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.createdMillis); containerStopRequests.put(c.getExternalId(), c.getExternalId()); } } else { if (currentTms - sca.lastHeartbeatMillis > this.vars.heartbeatTimeoutMillis) { if (!isApplicationIdle()) { + // Check if the heartbeat for this agent has already been missed to raise the StramEvent only once + if (sca.lastHeartbeatMillis != -1) { + String info = String.format("Container %s@%s heartbeat timeout (%d%n ms).", c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis); + LOG.error(info); + StramEvent stramEvent = new StramEvent.ContainerErrorEvent(c.getExternalId(), info, null); + stramEvent.setReason(info); + recordEventAsync(stramEvent); + sca.lastHeartbeatMillis = -1; + } // request stop (kill) as process may still be hanging around (would have been detected by Yarn otherwise) - LOG.info("Container {}@{} heartbeat timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis); containerStopRequests.put(c.getExternalId(), c.getExternalId()); } }
