Repository: asterixdb
Updated Branches:
  refs/heads/master c0aa3217f -> 860fcde90


[ASTERIXDB-1076][HYR] Generate heartbeats in their own thread

- Generate & send NC heartbeats in their own thread to prevent starvation
/ scheduling issues
- Fix retries on IPC connections
- Don't spin on heartbeat send failure

Change-Id: Ieae21b1596013a699f27975fb21894244c536395
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2060
Reviewed-by: Murtadha Hubail <[email protected]>
Integration-Tests: Murtadha Hubail <[email protected]>
Tested-by: Murtadha Hubail <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/860fcde9
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/860fcde9
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/860fcde9

Branch: refs/heads/master
Commit: 860fcde90bd799e00eaa61cbf81badfea1c25dda
Parents: c0aa321
Author: Michael Blow <[email protected]>
Authored: Tue Oct 10 21:56:23 2017 -0400
Committer: Michael Blow <[email protected]>
Committed: Tue Oct 10 18:57:22 2017 -0700

----------------------------------------------------------------------
 .../control/common/controllers/NCConfig.java    |  2 +-
 .../ipc/ClusterControllerRemoteProxy.java       |  7 +-
 .../common/ipc/ControllerRemoteProxy.java       | 38 ++++++-----
 .../common/ipc/NodeControllerRemoteProxy.java   |  5 +-
 .../control/nc/NodeControllerService.java       | 67 ++++++++++++++------
 .../hyracks/ipc/impl/IPCConnectionManager.java  | 27 ++++----
 .../org/apache/hyracks/ipc/impl/IPCSystem.java  |  4 +-
 7 files changed, 92 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 02a469d..bd5895e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -168,7 +168,7 @@ public class NCConfig extends ControllerConfig {
                 case MESSAGING_PUBLIC_PORT:
                     return "Public IP port to announce messaging listener";
                 case CLUSTER_CONNECT_RETRIES:
-                    return "Number of attempts to contact CC before giving up";
+                    return "Number of attempts to retry contacting CC before 
giving up";
                 case IODEVICES:
                     return "Comma separated list of IO Device mount points";
                 case NET_THREAD_COUNT:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 4707487..ede2c41 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -51,8 +51,9 @@ public class ClusterControllerRemoteProxy extends 
ControllerRemoteProxy implemen
     }
 
     @Override
-    protected int getRetries(boolean first) {
-        return first ? clusterConnectRetries : 0;
+    protected int getMaxRetries(boolean first) {
+        // -1 == retry forever
+        return first ? clusterConnectRetries : -1;
     }
 
     @Override
@@ -104,7 +105,7 @@ public class ClusterControllerRemoteProxy extends 
ControllerRemoteProxy implemen
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws 
Exception {
         NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData);
-        ensureIpcHandle().send(-1, fn, null);
+        ensureIpcHandle(0).send(-1, fn, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
index d4ccbd9..83972d5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
@@ -45,22 +45,25 @@ public abstract class ControllerRemoteProxy {
     }
 
     protected IIPCHandle ensureIpcHandle() throws HyracksDataException {
+        return ensureIpcHandle(getMaxRetries(ipcHandle == null));
+    }
+
+    protected IIPCHandle ensureIpcHandle(int maxRetries) throws 
HyracksDataException {
+        if (ipcHandle != null && ipcHandle.isConnected()) {
+            return ipcHandle;
+        }
         try {
             final boolean first = ipcHandle == null;
-            if (first || !ipcHandle.isConnected()) {
-                if (!first) {
-                    getLogger().warning("ipcHandle " + ipcHandle + " 
disconnected; retrying connection");
-                    eventListener.ipcHandleDisconnected(ipcHandle);
-                }
-                ipcHandle = ipc.getHandle(inetSocketAddress, 
getRetries(first));
-                if (ipcHandle.isConnected()) {
-                    if (first) {
-                        eventListener.ipcHandleConnected(ipcHandle);
-                    } else {
-                        getLogger().warning("ipcHandle " + ipcHandle + " 
restored");
-                        eventListener.ipcHandleRestored(ipcHandle);
-                    }
-                }
+            if (!first) {
+                getLogger().warning("ipcHandle " + ipcHandle + " disconnected; 
retrying connection");
+                eventListener.ipcHandleDisconnected(ipcHandle);
+            }
+            ipcHandle = ipc.getHandle(inetSocketAddress, maxRetries);
+            if (first) {
+                eventListener.ipcHandleConnected(ipcHandle);
+            } else {
+                getLogger().warning("ipcHandle " + ipcHandle + " restored");
+                eventListener.ipcHandleRestored(ipcHandle);
             }
         } catch (IPCException e) {
             throw HyracksDataException.create(e);
@@ -68,7 +71,12 @@ public abstract class ControllerRemoteProxy {
         return ipcHandle;
     }
 
-    protected abstract int getRetries(boolean first);
+    /**
+     * Maximum number of times to retry a failed connection attempt
+     * @param first true if the initial connection attempt (i.e. server start)
+     * @return the maximum number of retries, if any.  <0 means retry forever
+     */
+    protected abstract int getMaxRetries(boolean first);
 
     protected abstract Logger getLogger();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 68a5b76..41284a9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -48,8 +48,9 @@ public class NodeControllerRemoteProxy extends 
ControllerRemoteProxy implements
     }
 
     @Override
-    protected int getRetries(boolean first) {
-        return 0;
+    protected int getMaxRetries(boolean first) {
+        // -1 == retry forever
+        return -1;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 350343b..69137e5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -27,7 +27,6 @@ import java.lang.management.MemoryUsage;
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
-import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Hashtable;
@@ -37,6 +36,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
@@ -98,6 +98,7 @@ public class NodeControllerService implements 
IControllerService {
     private static final Logger LOGGER = 
Logger.getLogger(NodeControllerService.class.getName());
 
     private static final double MEMORY_FUDGE_FACTOR = 0.8;
+    private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1);
 
     private NCConfig ncConfig;
 
@@ -133,7 +134,7 @@ public class NodeControllerService implements 
IControllerService {
 
     private NodeParameters nodeParameters;
 
-    private HeartbeatTask heartbeatTask;
+    private Thread heartbeatThread;
 
     private final ServerContext serverCtx;
 
@@ -308,15 +309,6 @@ public class NodeControllerService implements 
IControllerService {
 
         workQueue.start();
 
-        heartbeatTask = new HeartbeatTask(ccs);
-
-        // Use reflection to set the priority of the timer thread.
-        Field threadField = timer.getClass().getDeclaredField("thread");
-        threadField.setAccessible(true);
-        Thread timerThread = (Thread) threadField.get(timer); // The internal 
timer thread of the Timer object.
-        timerThread.setPriority(Thread.MAX_PRIORITY);
-        // Schedule heartbeat generator.
-        timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
         // Schedule tracing a human-readable datetime
         timer.schedule(new TraceCurrentTimeTask(serviceCtx.getTracer()), 0, 
60000);
 
@@ -362,6 +354,12 @@ public class NodeControllerService implements 
IControllerService {
                     registrationException);
             throw registrationException;
         }
+        // Start heartbeat generator.
+        heartbeatThread = new Thread(new HeartbeatTask(ccs, 
nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat");
+        heartbeatThread.setPriority(Thread.MAX_PRIORITY);
+        heartbeatThread.setDaemon(true);
+        heartbeatThread.start();
+
         serviceCtx.setDistributedState(nodeParameters.getDistributedState());
         application.onRegisterNode();
         LOGGER.info("Registering with Cluster Controller complete");
@@ -401,7 +399,10 @@ public class NodeControllerService implements 
IControllerService {
              * Stop heartbeat after NC has stopped to avoid false node failure 
detection
              * on CC if an NC takes a long time to stop.
              */
-            heartbeatTask.cancel();
+            if (heartbeatThread != null) {
+                heartbeatThread.interrupt();
+                heartbeatThread.join(1000); // give it 1s to stop gracefully
+            }
             LOGGER.log(Level.INFO, "Stopped NodeControllerService");
         } else {
             LOGGER.log(Level.SEVERE, "Duplicate shutdown call; original: " + 
Arrays.toString(shutdownCallStack),
@@ -478,17 +479,16 @@ public class NodeControllerService implements 
IControllerService {
         return workQueue;
     }
 
-    public ThreadMXBean getThreadMXBean() {
-        return threadMXBean;
-    }
-
-    private class HeartbeatTask extends TimerTask {
-        private IClusterController cc;
+    private class HeartbeatTask implements Runnable {
+        private final Semaphore delayBlock = new Semaphore(0);
+        private final IClusterController cc;
+        private final long heartbeatPeriodNanos;
 
         private final HeartbeatData hbData;
 
-        public HeartbeatTask(IClusterController cc) {
+        HeartbeatTask(IClusterController cc, int heartbeatPeriod) {
             this.cc = cc;
+            this.heartbeatPeriodNanos = 
TimeUnit.MILLISECONDS.toNanos(heartbeatPeriod);
             hbData = new HeartbeatData();
             hbData.gcCollectionCounts = new long[gcMXBeans.size()];
             hbData.gcCollectionTimes = new long[gcMXBeans.size()];
@@ -496,6 +496,28 @@ public class NodeControllerService implements 
IControllerService {
 
         @Override
         public void run() {
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    long nextFireNanoTime = System.nanoTime() + 
heartbeatPeriodNanos;
+                    final boolean success = execute();
+                    sleepUntilNextFire(success ? nextFireNanoTime - 
System.nanoTime() : ONE_SECOND_NANOS);
+                } catch (InterruptedException e) { // NOSONAR
+                    break;
+                }
+            }
+            LOGGER.log(Level.INFO, "Heartbeat thread interrupted; shutting 
down");
+        }
+
+        private void sleepUntilNextFire(long delayNanos) throws 
InterruptedException {
+            if (delayNanos > 0) {
+                delayBlock.tryAcquire(delayNanos, TimeUnit.NANOSECONDS); 
//NOSONAR - ignore result of tryAcquire
+            } else {
+                LOGGER.warning("After sending heartbeat, next one is already 
late by "
+                        + TimeUnit.NANOSECONDS.toMillis(-delayNanos) + "ms; 
sending without delay");
+            }
+        }
+
+        private boolean execute() throws InterruptedException {
             MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage();
             hbData.heapInitSize = heapUsage.getInit();
             hbData.heapUsedSize = heapUsage.getUsed();
@@ -541,8 +563,13 @@ public class NodeControllerService implements 
IControllerService {
 
             try {
                 cc.nodeHeartbeat(id, hbData);
+                LOGGER.log(Level.FINE, "Successfully sent heartbeat");
+                return true;
+            } catch (InterruptedException e) {
+                throw e;
             } catch (Exception e) {
-                LOGGER.log(Level.SEVERE, "Exception sending heartbeat", e);
+                LOGGER.log(Level.SEVERE, "Exception sending heartbeat; will 
retry after 1s", e);
+                return false;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index d1659a8..36cf2fd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -47,6 +47,10 @@ import org.apache.commons.io.IOUtils;
 public class IPCConnectionManager {
     private static final Logger LOGGER = 
Logger.getLogger(IPCConnectionManager.class.getName());
 
+    // TODO(mblow): the next two could be config parameters
+    private static final int INITIAL_RETRY_DELAY_MILLIS = 100;
+    private static final int MAX_RETRY_DELAY_MILLIS = 15000;
+
     private final IPCSystem system;
 
     private final NetworkThread networkThread;
@@ -99,9 +103,10 @@ public class IPCConnectionManager {
         networkThread.selector.wakeup();
     }
 
-    IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) 
throws IOException, InterruptedException {
+    IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int maxRetries) 
throws IOException, InterruptedException {
         IPCHandle handle;
-        int attempt = 1;
+        int retries = 0;
+        int delay = INITIAL_RETRY_DELAY_MILLIS;
         while (true) {
             synchronized (this) {
                 handle = ipcHandleMap.get(remoteAddress);
@@ -114,19 +119,11 @@ public class IPCConnectionManager {
             if (handle.waitTillConnected()) {
                 return handle;
             }
-            if (retries < 0) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Connection to " + remoteAddress + " failed, 
retrying...");
-                    attempt++;
-                    Thread.sleep(5000);
-                }
-            } else if (attempt < retries) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Connection to " + remoteAddress + " failed 
(Attempt " + attempt + " of " + retries
-                            + ")");
-                    attempt++;
-                    Thread.sleep(5000);
-                }
+            if (maxRetries < 0 || retries++ < maxRetries) {
+                LOGGER.warning("Connection to " + remoteAddress + " failed; 
retrying" + (maxRetries <= 0 ? ""
+                        : " (retry attempt " + retries + " of " + maxRetries + 
") after " + delay + "ms"));
+                Thread.sleep(delay);
+                delay = Math.min(MAX_RETRY_DELAY_MILLIS, (int) (delay * 1.5));
             } else {
                 throw new IOException("Connection failed to " + remoteAddress);
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index dea48bd..f27b268 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -68,9 +68,9 @@ public class IPCSystem {
         return getHandle(remoteAddress, 0);
     }
 
-    public IIPCHandle getHandle(InetSocketAddress remoteAddress, int retries) 
throws IPCException {
+    public IIPCHandle getHandle(InetSocketAddress remoteAddress, int 
maxRetries) throws IPCException {
         try {
-            return cMgr.getIPCHandle(remoteAddress, retries);
+            return cMgr.getIPCHandle(remoteAddress, maxRetries);
         } catch (IOException e) {
             throw new IPCException(e);
         } catch (InterruptedException e) {

Reply via email to