Repository: asterixdb Updated Branches: refs/heads/master c0aa3217f -> 860fcde90
[ASTERIXDB-1076][HYR] Generate heartbeats in their own thread - Generate & send NC heartbeats in their own thread to prevent starvation / scheduling issues - Fix retries on IPC connections - Don't spin on heartbeat send failure Change-Id: Ieae21b1596013a699f27975fb21894244c536395 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2060 Reviewed-by: Murtadha Hubail <[email protected]> Integration-Tests: Murtadha Hubail <[email protected]> Tested-by: Murtadha Hubail <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/860fcde9 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/860fcde9 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/860fcde9 Branch: refs/heads/master Commit: 860fcde90bd799e00eaa61cbf81badfea1c25dda Parents: c0aa321 Author: Michael Blow <[email protected]> Authored: Tue Oct 10 21:56:23 2017 -0400 Committer: Michael Blow <[email protected]> Committed: Tue Oct 10 18:57:22 2017 -0700 ---------------------------------------------------------------------- .../control/common/controllers/NCConfig.java | 2 +- .../ipc/ClusterControllerRemoteProxy.java | 7 +- .../common/ipc/ControllerRemoteProxy.java | 38 ++++++----- .../common/ipc/NodeControllerRemoteProxy.java | 5 +- .../control/nc/NodeControllerService.java | 67 ++++++++++++++------ .../hyracks/ipc/impl/IPCConnectionManager.java | 27 ++++---- .../org/apache/hyracks/ipc/impl/IPCSystem.java | 4 +- 7 files changed, 92 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index 02a469d..bd5895e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -168,7 +168,7 @@ public class NCConfig extends ControllerConfig { case MESSAGING_PUBLIC_PORT: return "Public IP port to announce messaging listener"; case CLUSTER_CONNECT_RETRIES: - return "Number of attempts to contact CC before giving up"; + return "Number of attempts to retry contacting CC before giving up"; case IODEVICES: return "Comma separated list of IO Device mount points"; case NET_THREAD_COUNT: http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index 4707487..ede2c41 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -51,8 +51,9 @@ public class ClusterControllerRemoteProxy extends ControllerRemoteProxy implemen } @Override - protected int getRetries(boolean first) { - return first ? clusterConnectRetries : 0; + protected int getMaxRetries(boolean first) { + // -1 == retry forever + return first ? clusterConnectRetries : -1; } @Override @@ -104,7 +105,7 @@ public class ClusterControllerRemoteProxy extends ControllerRemoteProxy implemen @Override public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception { NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData); - ensureIpcHandle().send(-1, fn, null); + ensureIpcHandle(0).send(-1, fn, null); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java index d4ccbd9..83972d5 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java @@ -45,22 +45,25 @@ public abstract class ControllerRemoteProxy { } protected IIPCHandle ensureIpcHandle() throws HyracksDataException { + return ensureIpcHandle(getMaxRetries(ipcHandle == null)); + } + + protected IIPCHandle ensureIpcHandle(int maxRetries) throws HyracksDataException { + if (ipcHandle != null && ipcHandle.isConnected()) { + return ipcHandle; + } try { final boolean first = ipcHandle == null; - if (first || !ipcHandle.isConnected()) { - if (!first) { - getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection"); - eventListener.ipcHandleDisconnected(ipcHandle); - } - ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first)); - if (ipcHandle.isConnected()) { - if (first) { - eventListener.ipcHandleConnected(ipcHandle); - } else { - getLogger().warning("ipcHandle " + ipcHandle + " restored"); - eventListener.ipcHandleRestored(ipcHandle); - } - } + if (!first) { + getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection"); + eventListener.ipcHandleDisconnected(ipcHandle); + } + ipcHandle = ipc.getHandle(inetSocketAddress, maxRetries); + if (first) { + eventListener.ipcHandleConnected(ipcHandle); + } else { + getLogger().warning("ipcHandle " + ipcHandle + " restored"); + eventListener.ipcHandleRestored(ipcHandle); } } catch (IPCException e) { throw HyracksDataException.create(e); @@ -68,7 +71,12 @@ public abstract class ControllerRemoteProxy { return ipcHandle; } - protected abstract int getRetries(boolean first); + /** + * Maximum number of times to retry a failed connection attempt + * @param first true if the initial connection attempt (i.e. server start) + * @return the maximum number of retries, if any. <0 means retry forever + */ + protected abstract int getMaxRetries(boolean first); protected abstract Logger getLogger(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index 68a5b76..41284a9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -48,8 +48,9 @@ public class NodeControllerRemoteProxy extends ControllerRemoteProxy implements } @Override - protected int getRetries(boolean first) { - return 0; + protected int getMaxRetries(boolean first) { + // -1 == retry forever + return -1; } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 350343b..69137e5 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -27,7 +27,6 @@ import java.lang.management.MemoryUsage; import java.lang.management.OperatingSystemMXBean; import java.lang.management.RuntimeMXBean; import java.lang.management.ThreadMXBean; -import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Hashtable; @@ -37,6 +36,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; @@ -98,6 +98,7 @@ public class NodeControllerService implements IControllerService { private static final Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName()); private static final double MEMORY_FUDGE_FACTOR = 0.8; + private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1); private NCConfig ncConfig; @@ -133,7 +134,7 @@ public class NodeControllerService implements IControllerService { private NodeParameters nodeParameters; - private HeartbeatTask heartbeatTask; + private Thread heartbeatThread; private final ServerContext serverCtx; @@ -308,15 +309,6 @@ public class NodeControllerService implements IControllerService { workQueue.start(); - heartbeatTask = new HeartbeatTask(ccs); - - // Use reflection to set the priority of the timer thread. - Field threadField = timer.getClass().getDeclaredField("thread"); - threadField.setAccessible(true); - Thread timerThread = (Thread) threadField.get(timer); // The internal timer thread of the Timer object. - timerThread.setPriority(Thread.MAX_PRIORITY); - // Schedule heartbeat generator. - timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod()); // Schedule tracing a human-readable datetime timer.schedule(new TraceCurrentTimeTask(serviceCtx.getTracer()), 0, 60000); @@ -362,6 +354,12 @@ public class NodeControllerService implements IControllerService { registrationException); throw registrationException; } + // Start heartbeat generator. + heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat"); + heartbeatThread.setPriority(Thread.MAX_PRIORITY); + heartbeatThread.setDaemon(true); + heartbeatThread.start(); + serviceCtx.setDistributedState(nodeParameters.getDistributedState()); application.onRegisterNode(); LOGGER.info("Registering with Cluster Controller complete"); @@ -401,7 +399,10 @@ public class NodeControllerService implements IControllerService { * Stop heartbeat after NC has stopped to avoid false node failure detection * on CC if an NC takes a long time to stop. */ - heartbeatTask.cancel(); + if (heartbeatThread != null) { + heartbeatThread.interrupt(); + heartbeatThread.join(1000); // give it 1s to stop gracefully + } LOGGER.log(Level.INFO, "Stopped NodeControllerService"); } else { LOGGER.log(Level.SEVERE, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack), @@ -478,17 +479,16 @@ public class NodeControllerService implements IControllerService { return workQueue; } - public ThreadMXBean getThreadMXBean() { - return threadMXBean; - } - - private class HeartbeatTask extends TimerTask { - private IClusterController cc; + private class HeartbeatTask implements Runnable { + private final Semaphore delayBlock = new Semaphore(0); + private final IClusterController cc; + private final long heartbeatPeriodNanos; private final HeartbeatData hbData; - public HeartbeatTask(IClusterController cc) { + HeartbeatTask(IClusterController cc, int heartbeatPeriod) { this.cc = cc; + this.heartbeatPeriodNanos = TimeUnit.MILLISECONDS.toNanos(heartbeatPeriod); hbData = new HeartbeatData(); hbData.gcCollectionCounts = new long[gcMXBeans.size()]; hbData.gcCollectionTimes = new long[gcMXBeans.size()]; @@ -496,6 +496,28 @@ public class NodeControllerService implements IControllerService { @Override public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + long nextFireNanoTime = System.nanoTime() + heartbeatPeriodNanos; + final boolean success = execute(); + sleepUntilNextFire(success ? nextFireNanoTime - System.nanoTime() : ONE_SECOND_NANOS); + } catch (InterruptedException e) { // NOSONAR + break; + } + } + LOGGER.log(Level.INFO, "Heartbeat thread interrupted; shutting down"); + } + + private void sleepUntilNextFire(long delayNanos) throws InterruptedException { + if (delayNanos > 0) { + delayBlock.tryAcquire(delayNanos, TimeUnit.NANOSECONDS); //NOSONAR - ignore result of tryAcquire + } else { + LOGGER.warning("After sending heartbeat, next one is already late by " + + TimeUnit.NANOSECONDS.toMillis(-delayNanos) + "ms; sending without delay"); + } + } + + private boolean execute() throws InterruptedException { MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage(); hbData.heapInitSize = heapUsage.getInit(); hbData.heapUsedSize = heapUsage.getUsed(); @@ -541,8 +563,13 @@ public class NodeControllerService implements IControllerService { try { cc.nodeHeartbeat(id, hbData); + LOGGER.log(Level.FINE, "Successfully sent heartbeat"); + return true; + } catch (InterruptedException e) { + throw e; } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Exception sending heartbeat", e); + LOGGER.log(Level.SEVERE, "Exception sending heartbeat; will retry after 1s", e); + return false; } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java index d1659a8..36cf2fd 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java @@ -47,6 +47,10 @@ import org.apache.commons.io.IOUtils; public class IPCConnectionManager { private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName()); + // TODO(mblow): the next two could be config parameters + private static final int INITIAL_RETRY_DELAY_MILLIS = 100; + private static final int MAX_RETRY_DELAY_MILLIS = 15000; + private final IPCSystem system; private final NetworkThread networkThread; @@ -99,9 +103,10 @@ public class IPCConnectionManager { networkThread.selector.wakeup(); } - IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) throws IOException, InterruptedException { + IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int maxRetries) throws IOException, InterruptedException { IPCHandle handle; - int attempt = 1; + int retries = 0; + int delay = INITIAL_RETRY_DELAY_MILLIS; while (true) { synchronized (this) { handle = ipcHandleMap.get(remoteAddress); @@ -114,19 +119,11 @@ public class IPCConnectionManager { if (handle.waitTillConnected()) { return handle; } - if (retries < 0) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Connection to " + remoteAddress + " failed, retrying..."); - attempt++; - Thread.sleep(5000); - } - } else if (attempt < retries) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Connection to " + remoteAddress + " failed (Attempt " + attempt + " of " + retries - + ")"); - attempt++; - Thread.sleep(5000); - } + if (maxRetries < 0 || retries++ < maxRetries) { + LOGGER.warning("Connection to " + remoteAddress + " failed; retrying" + (maxRetries <= 0 ? "" + : " (retry attempt " + retries + " of " + maxRetries + ") after " + delay + "ms")); + Thread.sleep(delay); + delay = Math.min(MAX_RETRY_DELAY_MILLIS, (int) (delay * 1.5)); } else { throw new IOException("Connection failed to " + remoteAddress); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/860fcde9/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java index dea48bd..f27b268 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java @@ -68,9 +68,9 @@ public class IPCSystem { return getHandle(remoteAddress, 0); } - public IIPCHandle getHandle(InetSocketAddress remoteAddress, int retries) throws IPCException { + public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries) throws IPCException { try { - return cMgr.getIPCHandle(remoteAddress, retries); + return cMgr.getIPCHandle(remoteAddress, maxRetries); } catch (IOException e) { throw new IPCException(e); } catch (InterruptedException e) {
