Repository: samza
Updated Branches:
  refs/heads/0.14.0 b6d0e2e02 -> 194e1fc4a


SAMZA-1506: Fix for robust ContainerHeartbeatMonitor exception handling.

The Fix includes the following changes:
- Catch all exceptions inside the heartbeat thread and not just
  IOException.
- A time based force kill when the heartbeat is invalid,
  this makes the monitor immune to threads that may keep the
  container stuck in the shutdown sequence. When the timeout
  occurs, a System.exit(1) is called.
- Increasing number of retries for failed heartbeats from 3 to 6.
  This prevents short intermittent network failurs from causing the
  containers to be invalidated.

Author: Abhishek Shivanna <[email protected]>

Reviewers: Jacob Maes <[email protected]>

Closes #375 from abhishekshivanna/container-heartbeat


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9c9bbc45
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9c9bbc45
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9c9bbc45

Branch: refs/heads/0.14.0
Commit: 9c9bbc45cfdbf4d14f27c84ab3dc3314169ade7c
Parents: 1ca9e32
Author: Abhishek Shivanna <[email protected]>
Authored: Fri Dec 1 15:23:12 2017 -0800
Committer: Jacob Maes <[email protected]>
Committed: Fri Dec 1 15:23:12 2017 -0800

----------------------------------------------------------------------
 .../container/ContainerHeartbeatClient.java     | 20 +++++++++++++-------
 .../container/ContainerHeartbeatMonitor.java    | 11 ++++++++++-
 .../samza/runtime/LocalContainerRunner.java     |  9 +++++++--
 3 files changed, 30 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9c9bbc45/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
 
b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
index f2c2651..7273d54 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ContainerHeartbeatClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerHeartbeatClient.class);
-  private static final int NUM_RETRIES = 3;
+  private static final int NUM_RETRIES = 6;
   private static final int TIMEOUT_MS = 5000;
   private static final int BACKOFF_MULTIPLIER = 2;
   private final String heartbeatEndpoint;
@@ -72,9 +72,10 @@ public class ContainerHeartbeatClient {
       LOG.debug("Container Heartbeat got response {}", reply);
       response = mapper.readValue(reply, ContainerHeartbeatResponse.class);
       return response;
-    } catch (IOException e) {
-      LOG.error("Error in container heart beat protocol. Query url: {} 
response: {}", heartbeatEndpoint, reply);
+    } catch (Exception e) {
+      LOG.error("Error in container heartbeat to JobCoordinator.", e);
     }
+    LOG.error("Container heartbeat expired");
     response = new ContainerHeartbeatResponse(false);
     return response;
   }
@@ -82,10 +83,11 @@ public class ContainerHeartbeatClient {
   String httpGet(URL url) throws IOException {
     HttpURLConnection conn;
     int delayMillis = 1000;
-
+    BufferedReader br = null;
     for (int currentTry = 0; currentTry < NUM_RETRIES; currentTry++) {
-      conn = Util.getHttpConnection(url, TIMEOUT_MS);
-      try (BufferedReader br = new BufferedReader(new 
InputStreamReader(conn.getInputStream()))) {
+      try {
+        conn = Util.getHttpConnection(url, TIMEOUT_MS);
+        br = new BufferedReader(new InputStreamReader(conn.getInputStream()));
         if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
           throw new IOException(String.format("HTTP error fetching url %s. 
Returned status code %d", url.toString(),
               conn.getResponseCode()));
@@ -93,9 +95,13 @@ public class ContainerHeartbeatClient {
           return br.lines().collect(Collectors.joining());
         }
       } catch (Exception e) {
-        LOG.error("Error in heartbeat request", e);
+        LOG.error(String.format("Error in heartbeat request. Retrying 
[%d/%d].", currentTry + 1, NUM_RETRIES), e);
         sleepUninterruptibly(delayMillis);
         delayMillis = delayMillis * BACKOFF_MULTIPLIER;
+      } finally {
+        if (br != null) {
+          br.close();
+        }
       }
     }
     throw new IOException(String.format("Error fetching url: %s. Tried %d 
time(s).", url.toString(), NUM_RETRIES));

http://git-wip-us.apache.org/repos/asf/samza/blob/9c9bbc45/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
 
b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
index 940e80f..64e7450 100644
--- 
a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
+++ 
b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -27,10 +27,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class ContainerHeartbeatMonitor {
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
   private static final ThreadFactory THREAD_FACTORY = new 
HeartbeatThreadFactory();
   private static final int SCHEDULE_MS = 60000;
+  private static final int SHUTDOWN_TIMOUT_MS = 120000;
   private final ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
   private final Runnable onContainerExpired;
   private final ContainerHeartbeatClient containerHeartbeatClient;
@@ -50,6 +52,11 @@ public class ContainerHeartbeatMonitor {
     scheduler.scheduleAtFixedRate(() -> {
         ContainerHeartbeatResponse response = 
containerHeartbeatClient.requestHeartbeat();
         if (!response.isAlive()) {
+          scheduler.schedule(() -> {
+              // On timeout of container shutting down, force exit.
+              LOG.error("Graceful shutdown timeout expired. Force exiting.");
+              System.exit(1);
+            }, SHUTDOWN_TIMOUT_MS, TimeUnit.MILLISECONDS);
           onContainerExpired.run();
         }
       }, 0, SCHEDULE_MS, TimeUnit.MILLISECONDS);
@@ -69,7 +76,9 @@ public class ContainerHeartbeatMonitor {
 
     @Override
     public Thread newThread(Runnable runnable) {
-      return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
+      Thread t = new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
+      t.setDaemon(true);
+      return t;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9c9bbc45/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 50c8181..6750ccd 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -153,8 +153,13 @@ public class LocalContainerRunner extends 
AbstractApplicationRunner {
     if (executionEnvContainerId != null) {
       log.info("Got execution environment container id: {}", 
executionEnvContainerId);
       containerHeartbeatMonitor = new ContainerHeartbeatMonitor(() -> {
-          container.shutdown();
-          containerRunnerException = new SamzaException("Container shutdown 
due to expired heartbeat");
+          try {
+            container.shutdown();
+            containerRunnerException = new SamzaException("Container shutdown 
due to expired heartbeat");
+          } catch (Exception e) {
+            log.error("Heartbeat monitor failed to shutdown the container 
gracefully. Exiting process.", e);
+            System.exit(1);
+          }
         }, new ContainerHeartbeatClient(coordinatorUrl, 
executionEnvContainerId));
       containerHeartbeatMonitor.start();
     } else {

Reply via email to