Repository: samza Updated Branches: refs/heads/0.14.0 b6d0e2e02 -> 194e1fc4a
SAMZA-1506: Fix for robust ContainerHeartbeatMonitor exception handling. The Fix includes the following changes: - Catch all exceptions inside the heartbeat thread and not just IOException. - A time based force kill when the heartbeat is invalid, this makes the monitor immune to threads that may keep the container stuck in the shutdown sequence. When the timeout occurs, a System.exit(1) is called. - Increasing number of retries for failed heartbeats from 3 to 6. This prevents short intermittent network failurs from causing the containers to be invalidated. Author: Abhishek Shivanna <[email protected]> Reviewers: Jacob Maes <[email protected]> Closes #375 from abhishekshivanna/container-heartbeat Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9c9bbc45 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9c9bbc45 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9c9bbc45 Branch: refs/heads/0.14.0 Commit: 9c9bbc45cfdbf4d14f27c84ab3dc3314169ade7c Parents: 1ca9e32 Author: Abhishek Shivanna <[email protected]> Authored: Fri Dec 1 15:23:12 2017 -0800 Committer: Jacob Maes <[email protected]> Committed: Fri Dec 1 15:23:12 2017 -0800 ---------------------------------------------------------------------- .../container/ContainerHeartbeatClient.java | 20 +++++++++++++------- .../container/ContainerHeartbeatMonitor.java | 11 ++++++++++- .../samza/runtime/LocalContainerRunner.java | 9 +++++++-- 3 files changed, 30 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9c9bbc45/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java index f2c2651..7273d54 100644 --- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java +++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java @@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory; */ public class ContainerHeartbeatClient { private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatClient.class); - private static final int NUM_RETRIES = 3; + private static final int NUM_RETRIES = 6; private static final int TIMEOUT_MS = 5000; private static final int BACKOFF_MULTIPLIER = 2; private final String heartbeatEndpoint; @@ -72,9 +72,10 @@ public class ContainerHeartbeatClient { LOG.debug("Container Heartbeat got response {}", reply); response = mapper.readValue(reply, ContainerHeartbeatResponse.class); return response; - } catch (IOException e) { - LOG.error("Error in container heart beat protocol. Query url: {} response: {}", heartbeatEndpoint, reply); + } catch (Exception e) { + LOG.error("Error in container heartbeat to JobCoordinator.", e); } + LOG.error("Container heartbeat expired"); response = new ContainerHeartbeatResponse(false); return response; } @@ -82,10 +83,11 @@ public class ContainerHeartbeatClient { String httpGet(URL url) throws IOException { HttpURLConnection conn; int delayMillis = 1000; - + BufferedReader br = null; for (int currentTry = 0; currentTry < NUM_RETRIES; currentTry++) { - conn = Util.getHttpConnection(url, TIMEOUT_MS); - try (BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream()))) { + try { + conn = Util.getHttpConnection(url, TIMEOUT_MS); + br = new BufferedReader(new InputStreamReader(conn.getInputStream())); if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { throw new IOException(String.format("HTTP error fetching url %s. Returned status code %d", url.toString(), conn.getResponseCode())); @@ -93,9 +95,13 @@ public class ContainerHeartbeatClient { return br.lines().collect(Collectors.joining()); } } catch (Exception e) { - LOG.error("Error in heartbeat request", e); + LOG.error(String.format("Error in heartbeat request. Retrying [%d/%d].", currentTry + 1, NUM_RETRIES), e); sleepUninterruptibly(delayMillis); delayMillis = delayMillis * BACKOFF_MULTIPLIER; + } finally { + if (br != null) { + br.close(); + } } } throw new IOException(String.format("Error fetching url: %s. Tried %d time(s).", url.toString(), NUM_RETRIES)); http://git-wip-us.apache.org/repos/asf/samza/blob/9c9bbc45/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java index 940e80f..64e7450 100644 --- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java +++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java @@ -27,10 +27,12 @@ import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class ContainerHeartbeatMonitor { private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class); private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory(); private static final int SCHEDULE_MS = 60000; + private static final int SHUTDOWN_TIMOUT_MS = 120000; private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY); private final Runnable onContainerExpired; private final ContainerHeartbeatClient containerHeartbeatClient; @@ -50,6 +52,11 @@ public class ContainerHeartbeatMonitor { scheduler.scheduleAtFixedRate(() -> { ContainerHeartbeatResponse response = containerHeartbeatClient.requestHeartbeat(); if (!response.isAlive()) { + scheduler.schedule(() -> { + // On timeout of container shutting down, force exit. + LOG.error("Graceful shutdown timeout expired. Force exiting."); + System.exit(1); + }, SHUTDOWN_TIMOUT_MS, TimeUnit.MILLISECONDS); onContainerExpired.run(); } }, 0, SCHEDULE_MS, TimeUnit.MILLISECONDS); @@ -69,7 +76,9 @@ public class ContainerHeartbeatMonitor { @Override public Thread newThread(Runnable runnable) { - return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement()); + Thread t = new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement()); + t.setDaemon(true); + return t; } } } http://git-wip-us.apache.org/repos/asf/samza/blob/9c9bbc45/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java index 50c8181..6750ccd 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -153,8 +153,13 @@ public class LocalContainerRunner extends AbstractApplicationRunner { if (executionEnvContainerId != null) { log.info("Got execution environment container id: {}", executionEnvContainerId); containerHeartbeatMonitor = new ContainerHeartbeatMonitor(() -> { - container.shutdown(); - containerRunnerException = new SamzaException("Container shutdown due to expired heartbeat"); + try { + container.shutdown(); + containerRunnerException = new SamzaException("Container shutdown due to expired heartbeat"); + } catch (Exception e) { + log.error("Heartbeat monitor failed to shutdown the container gracefully. Exiting process.", e); + System.exit(1); + } }, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId)); containerHeartbeatMonitor.start(); } else {
