This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 1f15c812c692ece6a32eeea116fa6140b21030f1
Author: Michael Blow <[email protected]>
AuthorDate: Thu Apr 8 09:47:27 2021 -0400

    [NO ISSUE][HYR] += ability to bypass work queue for high priority app 
messages
    
    - use said ability for active stats responses
    
    Change-Id: I5d4747e08a380a585d0c4a9312873ea39b80abbf
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10964
    Reviewed-by: Michael Blow <[email protected]>
    Reviewed-by: Till Westmann <[email protected]>
    Contrib: Michael Blow <[email protected]>
    Tested-by: Michael Blow <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
---
 .../main/java/org/apache/asterix/active/ActiveManager.java |  4 ++--
 .../apache/hyracks/control/cc/ClusterControllerIPCI.java   | 12 ++++++++++--
 .../hyracks/control/common/base/IClusterController.java    |  2 ++
 .../apache/hyracks/control/common/ipc/CCNCFunctions.java   | 14 ++++++++++----
 .../control/common/ipc/ClusterControllerRemoteProxy.java   |  9 ++++++++-
 .../control/common/ipc/NodeControllerRemoteProxy.java      |  2 +-
 .../apache/hyracks/control/nc/NodeControllerService.java   |  4 ++++
 7 files changed, 37 insertions(+), 10 deletions(-)

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index 26b5068..b99e4f2 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -153,8 +153,8 @@ public class ActiveManager {
             String stats = runtime.getStats();
             LOGGER.debug("Sending stats response for {} ", runtimeId);
             ActiveStatsResponse response = new ActiveStatsResponse(reqId, 
stats, null);
-            ((NodeControllerService) 
serviceCtx.getControllerService()).sendApplicationMessageToCC(message.getCcId(),
-                    JavaSerializationUtils.serialize(response), null);
+            ((NodeControllerService) 
serviceCtx.getControllerService()).sendRealTimeApplicationMessageToCC(
+                    message.getCcId(), 
JavaSerializationUtils.serialize(response), null);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 16e8ed8..d350f61 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -129,8 +129,16 @@ class ClusterControllerIPCI implements IIPCI {
                 break;
             case SEND_APPLICATION_MESSAGE:
                 CCNCFunctions.SendApplicationMessageFunction rsf = 
(CCNCFunctions.SendApplicationMessageFunction) fn;
-                ccs.getWorkQueue().schedule(
-                        new ApplicationMessageWork(ccs, rsf.getMessage(), 
rsf.getDeploymentId(), rsf.getNodeId()));
+                ApplicationMessageWork work =
+                        new ApplicationMessageWork(ccs, rsf.getMessage(), 
rsf.getDeploymentId(), rsf.getNodeId());
+                if (rsf.isRealTime()) {
+                    final ExecutorService executor = ccs.getExecutor();
+                    if (executor != null) {
+                        executor.execute(work);
+                    }
+                } else {
+                    ccs.getWorkQueue().schedule(work);
+                }
                 break;
             case GET_NODE_CONTROLLERS_INFO:
                 ccs.getWorkQueue().schedule(new 
GetNodeControllersInfoWork(ccs.getNodeManager(),
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index c8106e8..1c91183 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -66,6 +66,8 @@ public interface IClusterController {
 
     void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, 
String nodeId) throws Exception;
 
+    void sendRealTimeApplicationMessageToCC(byte[] data, DeploymentId 
deploymentId, String nodeId) throws Exception;
+
     void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, 
IResultMetadata metadata, boolean emptyResult,
             int partition, int nPartitions, NetworkAddress networkAddress) 
throws Exception;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 6b5b5db..27a8b02 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -127,9 +127,10 @@ public class CCNCFunctions {
     }
 
     public static class SendApplicationMessageFunction extends Function {
-        private static final long serialVersionUID = 1L;
-        private byte[] serializedMessage;
-        private DeploymentId deploymentId;
+        private static final long serialVersionUID = 2L;
+        private final byte[] serializedMessage;
+        private final DeploymentId deploymentId;
+        private final boolean realTime;
         private String nodeId;
 
         public DeploymentId getDeploymentId() {
@@ -148,9 +149,14 @@ public class CCNCFunctions {
             return serializedMessage;
         }
 
-        public SendApplicationMessageFunction(byte[] data, DeploymentId 
deploymentId, String nodeId) {
+        public boolean isRealTime() {
+            return realTime;
+        }
+
+        public SendApplicationMessageFunction(byte[] data, DeploymentId 
deploymentId, boolean realTime, String nodeId) {
             this.serializedMessage = data;
             this.deploymentId = deploymentId;
+            this.realTime = realTime;
             this.nodeId = nodeId;
         }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 344c3fb..09dc04d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -128,7 +128,14 @@ public class ClusterControllerRemoteProxy implements 
IClusterController {
 
     @Override
     public void sendApplicationMessageToCC(byte[] data, DeploymentId 
deploymentId, String nodeId) throws Exception {
-        SendApplicationMessageFunction fn = new 
SendApplicationMessageFunction(data, deploymentId, nodeId);
+        SendApplicationMessageFunction fn = new 
SendApplicationMessageFunction(data, deploymentId, false, nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void sendRealTimeApplicationMessageToCC(byte[] data, DeploymentId 
deploymentId, String nodeId)
+            throws Exception {
+        SendApplicationMessageFunction fn = new 
SendApplicationMessageFunction(data, deploymentId, true, nodeId);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index d32ee32..0b85c4e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -129,7 +129,7 @@ public class NodeControllerRemoteProxy implements 
INodeController {
 
     @Override
     public void sendApplicationMessageToNC(byte[] data, DeploymentId 
deploymentId, String nodeId) throws Exception {
-        SendApplicationMessageFunction fn = new 
SendApplicationMessageFunction(data, deploymentId, nodeId);
+        SendApplicationMessageFunction fn = new 
SendApplicationMessageFunction(data, deploymentId, false, nodeId);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 1356f4c..c774317 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -633,6 +633,10 @@ public class NodeControllerService implements 
IControllerService {
         getClusterController(ccId).sendApplicationMessageToCC(data, 
deploymentId, id);
     }
 
+    public void sendRealTimeApplicationMessageToCC(CcId ccId, byte[] data, 
DeploymentId deploymentId) throws Exception {
+        getClusterController(ccId).sendRealTimeApplicationMessageToCC(data, 
deploymentId, id);
+    }
+
     public IResultPartitionManager getResultPartitionManager() {
         return resultPartitionManager;
     }

Reply via email to