This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 6f9c30507462205caffccb74205739bf27d5220a
Author: Michael Blow <[email protected]>
AuthorDate: Wed Feb 12 11:01:29 2020 -0500

    [NO ISSUE][HYR][NET] Send accepted messages prior to ipc shutdown
    
    Change-Id: Ia7bb36a0552e21bd3d67b0882c8898af9b74d59d
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5023
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Michael Blow <[email protected]>
    Reviewed-by: Till Westmann <[email protected]>
---
 .../hyracks/ipc/impl/IPCConnectionManager.java     | 37 ++++++++++++++++++++--
 .../org/apache/hyracks/ipc/impl/IPCHandle.java     |  2 +-
 2 files changed, 35 insertions(+), 4 deletions(-)

diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 58aa39e..eaae8e7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -37,12 +37,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.hyracks.api.network.ISocketChannelFactory;
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.hyracks.util.NetworkUtil;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -52,6 +54,7 @@ public class IPCConnectionManager {
     // TODO(mblow): the next two could be config parameters
     private static final int INITIAL_RETRY_DELAY_MILLIS = 100;
     private static final int MAX_RETRY_DELAY_MILLIS = 15000;
+    private static final int MAX_STOP_JOIN_WAIT_MILLIS = 30000;
 
     private final IPCSystem system;
 
@@ -107,6 +110,11 @@ public class IPCConnectionManager {
         stopped = true;
         NetworkUtil.closeQuietly(serverSocketChannel);
         networkThread.selector.wakeup();
+        InvokeUtil.doUninterruptibly(() -> 
networkThread.join(MAX_STOP_JOIN_WAIT_MILLIS));
+        if (networkThread.isAlive()) {
+            LOGGER.warn("giving up after waiting {}s for networkThread to 
exit",
+                    
TimeUnit.MILLISECONDS.toSeconds(MAX_STOP_JOIN_WAIT_MILLIS));
+        }
     }
 
     IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int maxRetries) 
throws IOException, InterruptedException {
@@ -154,7 +162,14 @@ public class IPCConnectionManager {
         }
     }
 
-    synchronized void write(Message msg) {
+    synchronized void send(Message msg) throws IPCException {
+        if (stopped) {
+            throw new IPCException("ipc system has been stopped");
+        }
+        write(msg);
+    }
+
+    private synchronized void write(Message msg) {
         if (LOGGER.isTraceEnabled()) {
             LOGGER.trace("Enqueued message: " + msg);
         }
@@ -216,9 +231,24 @@ public class IPCConnectionManager {
                         processSelectedKeys();
                     }
                 } catch (Exception e) {
-                    LOGGER.log(Level.ERROR, "Exception processing message", e);
+                    LOGGER.error("Exception processing message", e);
                 }
             }
+            // process any last work we accepted prior to being stopped, 
before we terminate
+            collectOutstandingWork();
+            LOGGER.trace("had {} pending messages at stop time!", 
workingSendList.size());
+            if (!workingSendList.isEmpty()) {
+                sendPendingMessages();
+            }
+            try {
+                int n = selector.selectNow();
+                LOGGER.trace("had {} keys remaining at stop time!", n);
+                if (n > 0) {
+                    processSelectedKeys();
+                }
+            } catch (Exception e) {
+                LOGGER.error("Exception processing message", e);
+            }
         }
 
         private void processSelectedKeys() {
@@ -340,6 +370,7 @@ public class IPCConnectionManager {
             IPCHandle handle = msg.getIPCHandle();
             if (handle.getState() == HandleState.CLOSED) {
                 // message will never be sent
+                LOGGER.info("Could not send message: {}, due to {}", msg, 
handle);
                 return true;
             }
             if (handle.full()) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index 25abfe1..ddcc677 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -81,7 +81,7 @@ final class IPCHandle implements IIPCHandle {
             msg.setFlag(Message.NORMAL);
             msg.setPayload(req);
         }
-        system.getConnectionManager().write(msg);
+        system.getConnectionManager().send(msg);
         return mid;
     }
 

Reply via email to