This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 625d3827aa5839daf4ad811620814897659a0dc7
Author: Michael Blow <[email protected]>
AuthorDate: Tue Apr 13 09:11:43 2021 -0400

    [NO ISSUE][IPC] += high priority NC messaging, fail on sync send failure
    
    Change-Id: Id1d259917fbc98693d795eca41934e2d7d55f304
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11043
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Michael Blow <[email protected]>
    Reviewed-by: Till Westmann <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../app/active/ActiveEntityEventsListener.java     |  3 ++-
 .../hyracks/bootstrap/GlobalRecoveryManager.java   |  2 +-
 .../apache/asterix/messaging/CCMessageBroker.java  | 27 ++++++++++++++++++----
 .../common/messaging/api/ICCMessageBroker.java     | 14 +++++++++--
 .../runtime/message/ResourceIdRequestMessage.java  |  2 +-
 .../asterix/runtime/message/TxnIdBlockRequest.java |  2 +-
 .../control/common/base/INodeController.java       |  2 ++
 .../common/ipc/NodeControllerRemoteProxy.java      |  7 ++++++
 .../hyracks/control/nc/NodeControllerIPCI.java     |  9 ++++++--
 9 files changed, 56 insertions(+), 12 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index cc4b25f..2891710 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -324,7 +324,8 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
             requests.add(new ActiveStatsRequestMessage(new 
ActiveRuntimeId(entityId, runtimeName, i), reqId));
         }
         try {
-            List<String> responses = (List<String>) 
messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
+            List<String> responses =
+                    (List<String>) messageBroker.sendSyncRequestToNCs(reqId, 
ncs, requests, timeout, false);
             stats = formatStats(responses);
             statsTimestamp = System.currentTimeMillis();
             notifySubscribers(statsUpdatedEvent);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index ae50880..5870d3a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -150,7 +150,7 @@ public class GlobalRecoveryManager implements 
IGlobalRecoveryManager {
             requests.add(new StorageCleanupRequestMessage(reqId, 
validDatasetIds));
         }
         messageBroker.sendSyncRequestToNCs(reqId, ncs, requests,
-                TimeUnit.SECONDS.toMillis(storageGlobalCleanupTimeoutSecs));
+                TimeUnit.SECONDS.toMillis(storageGlobalCleanupTimeoutSecs), 
false);
     }
 
     protected MetadataTransactionContext doRecovery(ICcApplicationContext 
appCtx, MetadataTransactionContext mdTxnCtx)
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 1f9ec32..0683207 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -65,18 +65,34 @@ public class CCMessageBroker implements ICCMessageBroker {
     }
 
     @Override
-    public void sendApplicationMessageToNC(INcAddressedMessage msg, String 
nodeId) throws Exception {
+    public boolean sendApplicationMessageToNC(INcAddressedMessage msg, String 
nodeId) throws Exception {
+        return sendMessage(msg, nodeId, false);
+    }
+
+    @Override
+    public boolean sendRealTimeApplicationMessageToNC(INcAddressedMessage msg, 
String nodeId) throws Exception {
+        return sendMessage(msg, nodeId, true);
+    }
+
+    private boolean sendMessage(INcAddressedMessage msg, String nodeId, 
boolean realTime) throws Exception {
         INodeManager nodeManager = ccs.getNodeManager();
         NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
         if (msg instanceof ICcIdentifiedMessage) {
             ((ICcIdentifiedMessage) msg).setCcId(ccs.getCcId());
         }
         if (state != null) {
-            
state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg),
 null, nodeId);
+            byte[] payload = JavaSerializationUtils.serialize(msg);
+            if (realTime) {
+                
state.getNodeController().sendRealTimeApplicationMessageToNC(payload, null, 
nodeId);
+            } else {
+                state.getNodeController().sendApplicationMessageToNC(payload, 
null, nodeId);
+            }
+            return true;
         } else {
             if (LOGGER.isWarnEnabled()) {
                 LOGGER.warn("Couldn't send message to unregistered node (" + 
nodeId + ")");
             }
+            return false;
         }
     }
 
@@ -87,7 +103,7 @@ public class CCMessageBroker implements ICCMessageBroker {
 
     @Override
     public Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? 
extends INcAddressedMessage> requests,
-            long timeout) throws Exception {
+            long timeout, boolean realTime) throws Exception {
         MutableInt numRequired = new MutableInt(0);
         MutablePair<MutableInt, MutablePair<ResponseState, Object>> pair =
                 MutablePair.of(numRequired, 
MutablePair.of(ResponseState.UNINITIALIZED, UNINITIALIZED));
@@ -101,7 +117,10 @@ public class CCMessageBroker implements ICCMessageBroker {
                     if (!(message instanceof ICcIdentifiedMessage)) {
                         throw new IllegalStateException("sync request message 
not cc identified: " + message);
                     }
-                    sendApplicationMessageToNC(message, nc);
+                    if (!(realTime ? 
sendRealTimeApplicationMessageToNC(message, nc)
+                            : sendApplicationMessageToNC(message, nc))) {
+                        throw new 
RuntimeDataException(ErrorCode.ILLEGAL_STATE, "unable to send sync message to " 
+ nc);
+                    }
                 }
                 long time = System.currentTimeMillis();
                 while (pair.getLeft().getValue() > 0) {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index 208686c..e628bc7 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -36,7 +36,16 @@ public interface ICCMessageBroker extends IMessageBroker {
      * @param nodeId
      * @throws Exception
      */
-    void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) 
throws Exception;
+    boolean sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) 
throws Exception;
+
+    /**
+     * Sends the passed message to the specified {@code nodeId}
+     *
+     * @param msg
+     * @param nodeId
+     * @throws Exception
+     */
+    boolean sendRealTimeApplicationMessageToNC(INcAddressedMessage msg, String 
nodeId) throws Exception;
 
     /**
      * Sends the passed requests to all NCs and wait for the response
@@ -44,10 +53,11 @@ public interface ICCMessageBroker extends IMessageBroker {
      * @param ncs
      * @param requests
      * @param timeout
+     * @param realTime
      * @throws Exception
      */
     Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends 
INcAddressedMessage> requests,
-            long timeout) throws Exception;
+            long timeout, boolean realTime) throws Exception;
 
     /**
      * respond to a sync request
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 6198acc..b64f779 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -51,7 +51,7 @@ public class ResourceIdRequestMessage implements 
ICcAddressedMessage {
                     response.setException(new Exception("One or more nodes has 
not reported max resource id."));
                 }
             }
-            broker.sendApplicationMessageToNC(response, src);
+            broker.sendRealTimeApplicationMessageToNC(response, src);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
index 4e1c3b1..2d9acf4 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
@@ -51,7 +51,7 @@ public class TxnIdBlockRequest implements ICcAddressedMessage 
{
             ICCMessageBroker broker = (ICCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
             long startingId = 
appCtx.getTxnIdFactory().getIdBlock(blockSizeRequested);
             TxnIdBlockResponse response = new TxnIdBlockResponse(startingId, 
blockSizeRequested);
-            broker.sendApplicationMessageToNC(response, nodeId);
+            broker.sendRealTimeApplicationMessageToNC(response, nodeId);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 42a0d66..a754baf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -66,6 +66,8 @@ public interface INodeController {
 
     void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, 
String nodeId) throws Exception;
 
+    void sendRealTimeApplicationMessageToNC(byte[] data, DeploymentId 
deploymentId, String nodeId) throws Exception;
+
     void takeThreadDump(String requestId) throws Exception;
 
     /**
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 0b85c4e..ff5dd33 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -134,6 +134,13 @@ public class NodeControllerRemoteProxy implements 
INodeController {
     }
 
     @Override
+    public void sendRealTimeApplicationMessageToNC(byte[] data, DeploymentId 
deploymentId, String nodeId)
+            throws Exception {
+        SendApplicationMessageFunction fn = new 
SendApplicationMessageFunction(data, deploymentId, true, nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
     public void takeThreadDump(String requestId) throws Exception {
         ThreadDumpRequestFunction fn = new 
ThreadDumpRequestFunction(requestId, ccId);
         ipcHandle.send(-1, fn, null);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index e4c1d30..08a18f9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -61,8 +61,13 @@ final class NodeControllerIPCI implements IIPCI {
         switch (fn.getFunctionId()) {
             case SEND_APPLICATION_MESSAGE:
                 CCNCFunctions.SendApplicationMessageFunction amf = 
(CCNCFunctions.SendApplicationMessageFunction) fn;
-                ncs.getWorkQueue().schedule(
-                        new ApplicationMessageWork(ncs, amf.getMessage(), 
amf.getDeploymentId(), amf.getNodeId()));
+                ApplicationMessageWork amfw =
+                        new ApplicationMessageWork(ncs, amf.getMessage(), 
amf.getDeploymentId(), amf.getNodeId());
+                if (amf.isRealTime()) {
+                    ncs.getExecutor().submit(amfw);
+                } else {
+                    ncs.getWorkQueue().schedule(amfw);
+                }
                 return;
             case START_TASKS:
                 CCNCFunctions.StartTasksFunction stf = 
(CCNCFunctions.StartTasksFunction) fn;

Reply via email to