This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 4e0194dc1ff8bd809ada2ddb52523b3789f8efd5 Author: Murtadha Hubail <[email protected]> AuthorDate: Fri Oct 17 15:49:31 2025 +0300 [NO ISSUE][RT] Respond to NC in case of unexpected failures - user model changes: no - storage format changes: no - interface changes: yes Details: - On unexpected failures while sending messages from CC to NC, send a failure response to the NC to avoid requests hanging on the NC until their timeout. Ext-ref: MB-68985 Change-Id: I5ff53205d7cbb23c91a482b22165e1645247745f Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20496 Reviewed-by: Murtadha Hubail <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Tested-by: Murtadha Hubail <[email protected]> --- .../message/AbstractInternalRequestMessage.java | 2 ++ .../message/ExecuteStatementRequestMessage.java | 32 ++++++++++++++-------- .../apache/asterix/messaging/CCMessageBroker.java | 9 ++++++ .../common/messaging/api/ICCMessageBroker.java | 8 ++++++ 4 files changed, 40 insertions(+), 11 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractInternalRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractInternalRequestMessage.java index 7d58e4e2e8..41c4906f84 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractInternalRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractInternalRequestMessage.java @@ -113,6 +113,8 @@ public abstract class AbstractInternalRequestMessage implements ICcAddressedMess messageBroker.sendApplicationMessageToNC(responseMsg, nodeRequestId); } catch (Exception e) { LOGGER.log(Level.WARN, e.toString(), e); + responseMsg.setError(new Exception(e.getMessage())); + messageBroker.sendMessageQuietly(responseMsg, nodeRequestId); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index 0cdbf95f25..94d6309d55 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -143,7 +143,7 @@ public class ExecuteStatementRequestMessage implements ICcAddressedMessage { CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker(); final RuntimeDataException rejectionReason = getRejectionReason(ccSrv, requestNodeId); if (rejectionReason != null) { - sendRejection(rejectionReason, messageBroker, requestMessageId, requestNodeId); + sendRejection(rejectionReason, messageBroker); return; } CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager(); @@ -195,11 +195,7 @@ public class ExecuteStatementRequestMessage implements ICcAddressedMessage { GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, "Unexpected exception", e); responseMsg.setError(e); } - try { - messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId); - } catch (Exception e) { - LOGGER.log(Level.WARN, e.toString(), e); - } + sendResponseToNc(messageBroker, responseMsg); } protected IRequestParameters createRequestParameters(IStatementExecutor.StatementProperties statementProperties, @@ -231,11 +227,8 @@ public class ExecuteStatementRequestMessage implements ICcAddressedMessage { return null; } - protected void sendRejection(Exception reason, CCMessageBroker messageBroker, long requestMessageId, - String requestNodeId) { - ExecuteStatementResponseMessage responseMsg = - new ExecuteStatementResponseMessage(requestMessageId, clientContextID, requestReference.getUuid()); - responseMsg.setError(reason); + protected void sendRejection(Exception reason, CCMessageBroker messageBroker) { + ExecuteStatementResponseMessage responseMsg = createFailureMessage(reason); try { messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId); } catch (Exception e) { @@ -243,6 +236,23 @@ public class ExecuteStatementRequestMessage implements ICcAddressedMessage { } } + private void sendResponseToNc(CCMessageBroker messageBroker, ExecuteStatementResponseMessage responseMsg) { + try { + messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId); + } catch (Exception e) { + LOGGER.error("unexpected exception sending message to node {}", requestNodeId, e); + ExecuteStatementResponseMessage failureMessage = createFailureMessage(new Exception(e.getMessage())); + messageBroker.sendMessageQuietly(failureMessage, requestNodeId); + } + } + + private ExecuteStatementResponseMessage createFailureMessage(Exception reason) { + ExecuteStatementResponseMessage responseMsg = + new ExecuteStatementResponseMessage(requestMessageId, clientContextID, requestReference.getUuid()); + responseMsg.setError(reason); + return responseMsg; + } + @Override public String toString() { if (statementsText != null && (statementsText.startsWith("UPSERT") || statementsText.startsWith("INSERT"))) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java index 06832074c9..eb538f0681 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java @@ -69,6 +69,15 @@ public class CCMessageBroker implements ICCMessageBroker { return sendMessage(msg, nodeId, false); } + @Override + public void sendMessageQuietly(INcAddressedMessage msg, String nodeId) { + try { + sendMessage(msg, nodeId, false); + } catch (Exception e) { + LOGGER.warn("failed to send message to node {}", nodeId, e); + } + } + @Override public boolean sendRealTimeApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception { return sendMessage(msg, nodeId, true); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java index e628bc7b41..72b0040243 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java @@ -38,6 +38,14 @@ public interface ICCMessageBroker extends IMessageBroker { */ boolean sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception; + /** + * Attempts to send {@code msg} to the specified {@code nodeId} without throwing exceptions + * + * @param msg + * @param nodeId + */ + void sendMessageQuietly(INcAddressedMessage msg, String nodeId); + /** * Sends the passed message to the specified {@code nodeId} *
