This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
View the commit online: https://github.com/apache/asterixdb/commit/28f99d26f93bdfa1a14b67579bfe0694b7785a4e The following commit(s) were added to refs/heads/master by this push: new 28f99d2 [NO ISSUE][NET] Split delivery of messages and exceptions 28f99d2 is described below commit 28f99d26f93bdfa1a14b67579bfe0694b7785a4e Author: Michael Blow <[email protected]> AuthorDate: Fri Nov 1 13:47:07 2019 -0400 [NO ISSUE][NET] Split delivery of messages and exceptions - user model changes: no - storage format changes: no - interface changes: yes Change-Id: I5a97e1eb1e2a3ec207591b3d5b8b7f1949a80fbc Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4025 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- .../hyracks/control/cc/ClientInterfaceIPCI.java | 12 ++++++++- .../hyracks/control/cc/ClusterControllerIPCI.java | 10 ++++--- .../hyracks/control/nc/NodeControllerIPCI.java | 11 ++++++-- .../java/org/apache/hyracks/ipc/api/IIPCI.java | 16 ++++++++--- .../org/apache/hyracks/ipc/api/RPCInterface.java | 22 ++++++++------- .../org/apache/hyracks/ipc/impl/IPCSystem.java | 13 ++------- .../java/org/apache/hyracks/ipc/tests/IPCTest.java | 31 ++++++++++------------ 7 files changed, 68 insertions(+), 47 deletions(-) diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java index a78c269..2547476 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java @@ -64,7 +64,7 @@ class ClientInterfaceIPCI implements IIPCI { } @Override - public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) { + public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload) { HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload; switch (fn.getFunctionId()) { case GET_CLUSTER_CONTROLLER_INFO: @@ -200,4 +200,14 @@ class ClientInterfaceIPCI implements IIPCI { } } } + + @Override + public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) { + LOGGER.info("exception in/or processing message", exception); + try { + handle.send(mid, null, exception); + } catch (IPCException e) { + LOGGER.warn("error sending exception response", e); + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java index 0e4ad41..d263cc0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java @@ -60,8 +60,7 @@ class ClusterControllerIPCI implements IIPCI { } @Override - public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload, - Exception exception) { + public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload) { CCNCFunctions.Function fn = (Function) payload; switch (fn.getFunctionId()) { case REGISTER_NODE: @@ -170,6 +169,11 @@ class ClusterControllerIPCI implements IIPCI { } } + @Override + public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) { + LOGGER.info("exception in/or processing message", exception); + } + private static void processNodeHeartbeat(ClusterControllerService ccs, CCNCFunctions.Function fn) { final ExecutorService executor = ccs.getExecutor(); if (executor != null) { @@ -177,4 +181,4 @@ class ClusterControllerIPCI implements IIPCI { executor.execute(new NodeHeartbeatWork(ccs, nhf.getNodeId(), nhf.getHeartbeatData(), nhf.getNcAddress())); } } -} +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java index 836c624..df08c04 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java @@ -37,12 +37,15 @@ import org.apache.hyracks.control.nc.work.UnDeployBinaryWork; import org.apache.hyracks.control.nc.work.UndeployJobSpecWork; import org.apache.hyracks.ipc.api.IIPCHandle; import org.apache.hyracks.ipc.api.IIPCI; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * Interprocess communication in a node controller * This class must be refactored with each function carrying its own implementation */ final class NodeControllerIPCI implements IIPCI { + private static final Logger LOGGER = LogManager.getLogger(); private final NodeControllerService ncs; /** @@ -53,8 +56,7 @@ final class NodeControllerIPCI implements IIPCI { } @Override - public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload, - Exception exception) { + public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload) { CCNCFunctions.Function fn = (CCNCFunctions.Function) payload; switch (fn.getFunctionId()) { case SEND_APPLICATION_MESSAGE: @@ -150,4 +152,9 @@ final class NodeControllerIPCI implements IIPCI { } } + + @Override + public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) { + LOGGER.info("exception in/or processing message", exception); + } } \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java index 02698fa..bf1bc33 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java @@ -21,7 +21,6 @@ package org.apache.hyracks.ipc.api; /** * The interprocess communication interface that handles communication between different processes across the cluster */ -@FunctionalInterface public interface IIPCI { /** @@ -34,8 +33,19 @@ public interface IIPCI { * the request message id (if the message is a response to a request) * @param payload * the message payload + */ + void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload); + + /** + * handles an error message, or failure to unmarshall the message + * @param handle + * the message IPC handle + * @param mid + * the message id + * @param rmid + * the request message id (if the message is a response to a request) * @param exception - * an exception if the message was an error message + * an exception */ - void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception); + void onError(IIPCHandle handle, long mid, long rmid, Exception exception); } diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java index 7dae541..fd98b5c 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java @@ -31,7 +31,7 @@ public class RPCInterface implements IIPCI { public Object call(IIPCHandle handle, Object request) throws Exception { Request req; long mid; - synchronized (this) { + synchronized (reqMap) { req = new Request(handle, this); mid = handle.send(-1, request, null); reqMap.put(mid, req); @@ -40,21 +40,23 @@ public class RPCInterface implements IIPCI { } @Override - public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) { + public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload) { Request req; - synchronized (this) { + synchronized (reqMap) { req = reqMap.remove(rmid); } assert req != null; - if (exception != null) { - req.setException(exception); - } else { - req.setResult(payload); - } + req.setResult(payload); } - protected synchronized void removeRequest(Request r) { - reqMap.remove(r); + @Override + public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) { + Request req; + synchronized (reqMap) { + req = reqMap.remove(rmid); + } + assert req != null; + req.setException(exception); } private static class Request { diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java index 8d90ba3..4a19a33 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java @@ -29,13 +29,8 @@ import org.apache.hyracks.ipc.api.IIPCI; import org.apache.hyracks.ipc.api.IPCPerformanceCounters; import org.apache.hyracks.ipc.api.IPayloadSerializerDeserializer; import org.apache.hyracks.ipc.exceptions.IPCException; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public class IPCSystem { - private static final Logger LOGGER = LogManager.getLogger(); - private final IPCConnectionManager cMgr; private final IIPCI ipci; @@ -101,15 +96,11 @@ public class IPCSystem { void deliverIncomingMessage(final Message message) { long mid = message.getMessageId(); long rmid = message.getRequestMessageId(); - Object payload = null; - Exception exception = null; if (message.getFlag() == Message.ERROR) { - exception = (Exception) message.getPayload(); - LOGGER.log(Level.INFO, "Exception in message", exception); + ipci.onError(message.getIPCHandle(), mid, rmid, (Exception) message.getPayload()); } else { - payload = message.getPayload(); + ipci.deliverIncomingMessage(message.getIPCHandle(), mid, rmid, message.getPayload()); } - ipci.deliverIncomingMessage(message.getIPCHandle(), mid, rmid, payload, exception); } IPCConnectionManager getConnectionManager() { diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java index 00bd761..0d8d69b 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java @@ -47,7 +47,7 @@ public class IPCTest { IIPCHandle handle = client.getHandle(serverAddr, 0); for (int i = 0; i < 100; ++i) { - Assert.assertEquals(rpci.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i)); + Assert.assertEquals(rpci.call(handle, i), 2 * i); } try { @@ -62,27 +62,24 @@ public class IPCTest { final Executor executor = Executors.newCachedThreadPool(); IIPCI ipci = new IIPCI() { @Override - public void deliverIncomingMessage(final IIPCHandle handle, final long mid, long rmid, final Object payload, - Exception exception) { - executor.execute(new Runnable() { - @Override - public void run() { - Object result = null; - Exception exception = null; + public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload) { + executor.execute(() -> { + try { + handle.send(mid, (int) payload * 2, null); + } catch (Exception e) { try { - Integer i = (Integer) payload; - result = i.intValue() * 2; - } catch (Exception e) { - exception = e; - } - try { - handle.send(mid, result, exception); - } catch (IPCException e) { - e.printStackTrace(); + handle.send(mid, null, e); + } catch (IPCException e1) { + e1.printStackTrace(); } } }); } + + @Override + public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) { + exception.printStackTrace(); + } }; return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), PlainSocketChannelFactory.INSTANCE, ipci, new JavaSerializationBasedPayloadSerializerDeserializer());
