This is an automated email from the ASF dual-hosted git repository. dlych pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 1f15c812c692ece6a32eeea116fa6140b21030f1 Author: Michael Blow <[email protected]> AuthorDate: Thu Apr 8 09:47:27 2021 -0400 [NO ISSUE][HYR] += ability to bypass work queue for high priority app messages - use said ability for active stats responses Change-Id: I5d4747e08a380a585d0c4a9312873ea39b80abbf Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10964 Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Till Westmann <[email protected]> Contrib: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> --- .../main/java/org/apache/asterix/active/ActiveManager.java | 4 ++-- .../apache/hyracks/control/cc/ClusterControllerIPCI.java | 12 ++++++++++-- .../hyracks/control/common/base/IClusterController.java | 2 ++ .../apache/hyracks/control/common/ipc/CCNCFunctions.java | 14 ++++++++++---- .../control/common/ipc/ClusterControllerRemoteProxy.java | 9 ++++++++- .../control/common/ipc/NodeControllerRemoteProxy.java | 2 +- .../apache/hyracks/control/nc/NodeControllerService.java | 4 ++++ 7 files changed, 37 insertions(+), 10 deletions(-) diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java index 26b5068..b99e4f2 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java @@ -153,8 +153,8 @@ public class ActiveManager { String stats = runtime.getStats(); LOGGER.debug("Sending stats response for {} ", runtimeId); ActiveStatsResponse response = new ActiveStatsResponse(reqId, stats, null); - ((NodeControllerService) serviceCtx.getControllerService()).sendApplicationMessageToCC(message.getCcId(), - JavaSerializationUtils.serialize(response), null); + ((NodeControllerService) serviceCtx.getControllerService()).sendRealTimeApplicationMessageToCC( + message.getCcId(), JavaSerializationUtils.serialize(response), null); } catch (Exception e) { throw HyracksDataException.create(e); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java index 16e8ed8..d350f61 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java @@ -129,8 +129,16 @@ class ClusterControllerIPCI implements IIPCI { break; case SEND_APPLICATION_MESSAGE: CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn; - ccs.getWorkQueue().schedule( - new ApplicationMessageWork(ccs, rsf.getMessage(), rsf.getDeploymentId(), rsf.getNodeId())); + ApplicationMessageWork work = + new ApplicationMessageWork(ccs, rsf.getMessage(), rsf.getDeploymentId(), rsf.getNodeId()); + if (rsf.isRealTime()) { + final ExecutorService executor = ccs.getExecutor(); + if (executor != null) { + executor.execute(work); + } + } else { + ccs.getWorkQueue().schedule(work); + } break; case GET_NODE_CONTROLLERS_INFO: ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs.getNodeManager(), diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java index c8106e8..1c91183 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java @@ -66,6 +66,8 @@ public interface IClusterController { void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception; + void sendRealTimeApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception; + void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, IResultMetadata metadata, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index 6b5b5db..27a8b02 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -127,9 +127,10 @@ public class CCNCFunctions { } public static class SendApplicationMessageFunction extends Function { - private static final long serialVersionUID = 1L; - private byte[] serializedMessage; - private DeploymentId deploymentId; + private static final long serialVersionUID = 2L; + private final byte[] serializedMessage; + private final DeploymentId deploymentId; + private final boolean realTime; private String nodeId; public DeploymentId getDeploymentId() { @@ -148,9 +149,14 @@ public class CCNCFunctions { return serializedMessage; } - public SendApplicationMessageFunction(byte[] data, DeploymentId deploymentId, String nodeId) { + public boolean isRealTime() { + return realTime; + } + + public SendApplicationMessageFunction(byte[] data, DeploymentId deploymentId, boolean realTime, String nodeId) { this.serializedMessage = data; this.deploymentId = deploymentId; + this.realTime = realTime; this.nodeId = nodeId; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index 344c3fb..09dc04d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -128,7 +128,14 @@ public class ClusterControllerRemoteProxy implements IClusterController { @Override public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception { - SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, nodeId); + SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, false, nodeId); + ipcHandle.send(-1, fn, null); + } + + @Override + public void sendRealTimeApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) + throws Exception { + SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, true, nodeId); ipcHandle.send(-1, fn, null); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index d32ee32..0b85c4e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -129,7 +129,7 @@ public class NodeControllerRemoteProxy implements INodeController { @Override public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception { - SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, nodeId); + SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, false, nodeId); ipcHandle.send(-1, fn, null); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 1356f4c..c774317 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -633,6 +633,10 @@ public class NodeControllerService implements IControllerService { getClusterController(ccId).sendApplicationMessageToCC(data, deploymentId, id); } + public void sendRealTimeApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception { + getClusterController(ccId).sendRealTimeApplicationMessageToCC(data, deploymentId, id); + } + public IResultPartitionManager getResultPartitionManager() { return resultPartitionManager; }
