This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 6f9c30507462205caffccb74205739bf27d5220a Author: Michael Blow <[email protected]> AuthorDate: Wed Feb 12 11:01:29 2020 -0500 [NO ISSUE][HYR][NET] Send accepted messages prior to ipc shutdown Change-Id: Ia7bb36a0552e21bd3d67b0882c8898af9b74d59d Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5023 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- .../hyracks/ipc/impl/IPCConnectionManager.java | 37 ++++++++++++++++++++-- .../org/apache/hyracks/ipc/impl/IPCHandle.java | 2 +- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java index 58aa39e..eaae8e7 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java @@ -37,12 +37,14 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.apache.hyracks.api.network.ISocketChannel; import org.apache.hyracks.api.network.ISocketChannelFactory; +import org.apache.hyracks.api.util.InvokeUtil; +import org.apache.hyracks.ipc.exceptions.IPCException; import org.apache.hyracks.util.ExitUtil; import org.apache.hyracks.util.NetworkUtil; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -52,6 +54,7 @@ public class IPCConnectionManager { // TODO(mblow): the next two could be config parameters private static final int INITIAL_RETRY_DELAY_MILLIS = 100; private static final int MAX_RETRY_DELAY_MILLIS = 15000; + private static final int MAX_STOP_JOIN_WAIT_MILLIS = 30000; private final IPCSystem system; @@ -107,6 +110,11 @@ public class IPCConnectionManager { stopped = true; NetworkUtil.closeQuietly(serverSocketChannel); networkThread.selector.wakeup(); + InvokeUtil.doUninterruptibly(() -> networkThread.join(MAX_STOP_JOIN_WAIT_MILLIS)); + if (networkThread.isAlive()) { + LOGGER.warn("giving up after waiting {}s for networkThread to exit", + TimeUnit.MILLISECONDS.toSeconds(MAX_STOP_JOIN_WAIT_MILLIS)); + } } IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int maxRetries) throws IOException, InterruptedException { @@ -154,7 +162,14 @@ public class IPCConnectionManager { } } - synchronized void write(Message msg) { + synchronized void send(Message msg) throws IPCException { + if (stopped) { + throw new IPCException("ipc system has been stopped"); + } + write(msg); + } + + private synchronized void write(Message msg) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Enqueued message: " + msg); } @@ -216,9 +231,24 @@ public class IPCConnectionManager { processSelectedKeys(); } } catch (Exception e) { - LOGGER.log(Level.ERROR, "Exception processing message", e); + LOGGER.error("Exception processing message", e); } } + // process any last work we accepted prior to being stopped, before we terminate + collectOutstandingWork(); + LOGGER.trace("had {} pending messages at stop time!", workingSendList.size()); + if (!workingSendList.isEmpty()) { + sendPendingMessages(); + } + try { + int n = selector.selectNow(); + LOGGER.trace("had {} keys remaining at stop time!", n); + if (n > 0) { + processSelectedKeys(); + } + } catch (Exception e) { + LOGGER.error("Exception processing message", e); + } } private void processSelectedKeys() { @@ -340,6 +370,7 @@ public class IPCConnectionManager { IPCHandle handle = msg.getIPCHandle(); if (handle.getState() == HandleState.CLOSED) { // message will never be sent + LOGGER.info("Could not send message: {}, due to {}", msg, handle); return true; } if (handle.full()) { diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java index 25abfe1..ddcc677 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java @@ -81,7 +81,7 @@ final class IPCHandle implements IIPCHandle { msg.setFlag(Message.NORMAL); msg.setPayload(req); } - system.getConnectionManager().write(msg); + system.getConnectionManager().send(msg); return mid; }
