This is an automated email from the ASF dual-hosted git repository. dlych pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 625d3827aa5839daf4ad811620814897659a0dc7 Author: Michael Blow <[email protected]> AuthorDate: Tue Apr 13 09:11:43 2021 -0400 [NO ISSUE][IPC] += high priority NC messaging, fail on sync send failure Change-Id: Id1d259917fbc98693d795eca41934e2d7d55f304 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11043 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Till Westmann <[email protected]> Tested-by: Jenkins <[email protected]> --- .../app/active/ActiveEntityEventsListener.java | 3 ++- .../hyracks/bootstrap/GlobalRecoveryManager.java | 2 +- .../apache/asterix/messaging/CCMessageBroker.java | 27 ++++++++++++++++++---- .../common/messaging/api/ICCMessageBroker.java | 14 +++++++++-- .../runtime/message/ResourceIdRequestMessage.java | 2 +- .../asterix/runtime/message/TxnIdBlockRequest.java | 2 +- .../control/common/base/INodeController.java | 2 ++ .../common/ipc/NodeControllerRemoteProxy.java | 7 ++++++ .../hyracks/control/nc/NodeControllerIPCI.java | 9 ++++++-- 9 files changed, 56 insertions(+), 12 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index cc4b25f..2891710 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -324,7 +324,8 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl requests.add(new ActiveStatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId)); } try { - List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout); + List<String> responses = + (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout, false); stats = formatStats(responses); statsTimestamp = System.currentTimeMillis(); notifySubscribers(statsUpdatedEvent); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java index ae50880..5870d3a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java @@ -150,7 +150,7 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager { requests.add(new StorageCleanupRequestMessage(reqId, validDatasetIds)); } messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, - TimeUnit.SECONDS.toMillis(storageGlobalCleanupTimeoutSecs)); + TimeUnit.SECONDS.toMillis(storageGlobalCleanupTimeoutSecs), false); } protected MetadataTransactionContext doRecovery(ICcApplicationContext appCtx, MetadataTransactionContext mdTxnCtx) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java index 1f9ec32..0683207 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java @@ -65,18 +65,34 @@ public class CCMessageBroker implements ICCMessageBroker { } @Override - public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception { + public boolean sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception { + return sendMessage(msg, nodeId, false); + } + + @Override + public boolean sendRealTimeApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception { + return sendMessage(msg, nodeId, true); + } + + private boolean sendMessage(INcAddressedMessage msg, String nodeId, boolean realTime) throws Exception { INodeManager nodeManager = ccs.getNodeManager(); NodeControllerState state = nodeManager.getNodeControllerState(nodeId); if (msg instanceof ICcIdentifiedMessage) { ((ICcIdentifiedMessage) msg).setCcId(ccs.getCcId()); } if (state != null) { - state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId); + byte[] payload = JavaSerializationUtils.serialize(msg); + if (realTime) { + state.getNodeController().sendRealTimeApplicationMessageToNC(payload, null, nodeId); + } else { + state.getNodeController().sendApplicationMessageToNC(payload, null, nodeId); + } + return true; } else { if (LOGGER.isWarnEnabled()) { LOGGER.warn("Couldn't send message to unregistered node (" + nodeId + ")"); } + return false; } } @@ -87,7 +103,7 @@ public class CCMessageBroker implements ICCMessageBroker { @Override public Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests, - long timeout) throws Exception { + long timeout, boolean realTime) throws Exception { MutableInt numRequired = new MutableInt(0); MutablePair<MutableInt, MutablePair<ResponseState, Object>> pair = MutablePair.of(numRequired, MutablePair.of(ResponseState.UNINITIALIZED, UNINITIALIZED)); @@ -101,7 +117,10 @@ public class CCMessageBroker implements ICCMessageBroker { if (!(message instanceof ICcIdentifiedMessage)) { throw new IllegalStateException("sync request message not cc identified: " + message); } - sendApplicationMessageToNC(message, nc); + if (!(realTime ? sendRealTimeApplicationMessageToNC(message, nc) + : sendApplicationMessageToNC(message, nc))) { + throw new RuntimeDataException(ErrorCode.ILLEGAL_STATE, "unable to send sync message to " + nc); + } } long time = System.currentTimeMillis(); while (pair.getLeft().getValue() > 0) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java index 208686c..e628bc7 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java @@ -36,7 +36,16 @@ public interface ICCMessageBroker extends IMessageBroker { * @param nodeId * @throws Exception */ - void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception; + boolean sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception; + + /** + * Sends the passed message to the specified {@code nodeId} + * + * @param msg + * @param nodeId + * @throws Exception + */ + boolean sendRealTimeApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception; /** * Sends the passed requests to all NCs and wait for the response @@ -44,10 +53,11 @@ public interface ICCMessageBroker extends IMessageBroker { * @param ncs * @param requests * @param timeout + * @param realTime * @throws Exception */ Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests, - long timeout) throws Exception; + long timeout, boolean realTime) throws Exception; /** * respond to a sync request diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java index 6198acc..b64f779 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java @@ -51,7 +51,7 @@ public class ResourceIdRequestMessage implements ICcAddressedMessage { response.setException(new Exception("One or more nodes has not reported max resource id.")); } } - broker.sendApplicationMessageToNC(response, src); + broker.sendRealTimeApplicationMessageToNC(response, src); } catch (Exception e) { throw HyracksDataException.create(e); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java index 4e1c3b1..2d9acf4 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java @@ -51,7 +51,7 @@ public class TxnIdBlockRequest implements ICcAddressedMessage { ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); long startingId = appCtx.getTxnIdFactory().getIdBlock(blockSizeRequested); TxnIdBlockResponse response = new TxnIdBlockResponse(startingId, blockSizeRequested); - broker.sendApplicationMessageToNC(response, nodeId); + broker.sendRealTimeApplicationMessageToNC(response, nodeId); } catch (Exception e) { throw HyracksDataException.create(e); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java index 42a0d66..a754baf 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java @@ -66,6 +66,8 @@ public interface INodeController { void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception; + void sendRealTimeApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception; + void takeThreadDump(String requestId) throws Exception; /** diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index 0b85c4e..ff5dd33 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -134,6 +134,13 @@ public class NodeControllerRemoteProxy implements INodeController { } @Override + public void sendRealTimeApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) + throws Exception { + SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, true, nodeId); + ipcHandle.send(-1, fn, null); + } + + @Override public void takeThreadDump(String requestId) throws Exception { ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId, ccId); ipcHandle.send(-1, fn, null); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java index e4c1d30..08a18f9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java @@ -61,8 +61,13 @@ final class NodeControllerIPCI implements IIPCI { switch (fn.getFunctionId()) { case SEND_APPLICATION_MESSAGE: CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn; - ncs.getWorkQueue().schedule( - new ApplicationMessageWork(ncs, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId())); + ApplicationMessageWork amfw = + new ApplicationMessageWork(ncs, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId()); + if (amf.isRealTime()) { + ncs.getExecutor().submit(amfw); + } else { + ncs.getWorkQueue().schedule(amfw); + } return; case START_TASKS: CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
