ARTEMIS-2044 Add onSendException, onMessageRouteException to ActiveMQServerMessagePlugin
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/95ec8ea4 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/95ec8ea4 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/95ec8ea4 Branch: refs/heads/master Commit: 95ec8ea433d1bd5cf8294b0018c1edb7d45c07f0 Parents: 46bc10e Author: Carsten Lohmann <[email protected]> Authored: Wed Aug 22 10:06:30 2018 +0200 Committer: Michael Andre Pearce <[email protected]> Committed: Fri Aug 24 21:35:34 2018 +0100 ---------------------------------------------------------------------- .../core/postoffice/impl/PostOfficeImpl.java | 89 +++++++++++--------- .../core/server/impl/ServerSessionImpl.java | 78 +++++++++-------- .../plugin/ActiveMQServerMessagePlugin.java | 30 +++++++ .../impl/LoggingActiveMQServerPlugin.java | 46 ++++++++++ .../impl/LoggingActiveMQServerPluginLogger.java | 28 ++++++ .../plugin/MethodCalledVerifier.java | 19 +++++ 6 files changed, 213 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 9a3e844..598c32b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -877,62 +877,69 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding logger.trace("Message after routed=" + message); } - if (context.getQueueCount() == 0) { - // Send to DLA if appropriate + try { + if (context.getQueueCount() == 0) { + // Send to DLA if appropriate - AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); + AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString()); - boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute(); + boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute(); - if (sendToDLA) { - // Send to the DLA for the address + if (sendToDLA) { + // Send to the DLA for the address - SimpleString dlaAddress = addressSettings.getDeadLetterAddress(); + SimpleString dlaAddress = addressSettings.getDeadLetterAddress(); - if (logger.isDebugEnabled()) { - logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message); - } + if (logger.isDebugEnabled()) { + logger.debug("sending message to dla address = " + dlaAddress + ", message=" + message); + } - if (dlaAddress == null) { - result = RoutingStatus.NO_BINDINGS; - ActiveMQServerLogger.LOGGER.noDLA(address); - } else { - message.referenceOriginalMessage(message, null); + if (dlaAddress == null) { + result = RoutingStatus.NO_BINDINGS; + ActiveMQServerLogger.LOGGER.noDLA(address); + } else { + message.referenceOriginalMessage(message, null); - message.setAddress(dlaAddress); + message.setAddress(dlaAddress); - message.reencode(); + message.reencode(); - route(message, context.getTransaction(), false); - result = RoutingStatus.NO_BINDINGS_DLA; - } - } else { - result = RoutingStatus.NO_BINDINGS; + route(message, context.getTransaction(), false); + result = RoutingStatus.NO_BINDINGS_DLA; + } + } else { + result = RoutingStatus.NO_BINDINGS; - if (logger.isDebugEnabled()) { - logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address); - } + if (logger.isDebugEnabled()) { + logger.debug("Message " + message + " is not going anywhere as it didn't have a binding on address:" + address); + } - if (message.isLargeMessage()) { - ((LargeServerMessage) message).deleteFile(); + if (message.isLargeMessage()) { + ((LargeServerMessage) message).deleteFile(); + } } - } - } else { - result = RoutingStatus.OK; - try { - processRoute(message, context, direct); - } catch (ActiveMQAddressFullException e) { - if (startedTX.get()) { - context.getTransaction().rollback(); - } else if (context.getTransaction() != null) { - context.getTransaction().markAsRollbackOnly(e); + } else { + result = RoutingStatus.OK; + try { + processRoute(message, context, direct); + } catch (ActiveMQAddressFullException e) { + if (startedTX.get()) { + context.getTransaction().rollback(); + } else if (context.getTransaction() != null) { + context.getTransaction().markAsRollbackOnly(e); + } + throw e; } - throw e; } - } - if (startedTX.get()) { - context.getTransaction().commit(); + if (startedTX.get()) { + context.getTransaction().commit(); + } + } catch (Exception e) { + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e)); + } + throw e; } if (server.hasBrokerMessagePlugins()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 4910e66..d868a2f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1426,54 +1426,60 @@ public class ServerSessionImpl implements ServerSession, FailureListener { server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue)); } - // If the protocol doesn't support flow control, we have no choice other than fail the communication - if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { - ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(); - this.getRemotingConnection().fail(exception); - throw exception; - } - final RoutingStatus result; - //large message may come from StompSession directly, in which - //case the id header already generated. - if (!message.isLargeMessage()) { - long id = storageManager.generateID(); - // This will re-encode the message - message.setMessageID(id); - } + try { + // If the protocol doesn't support flow control, we have no choice other than fail the communication + if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) { + ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit(); + this.getRemotingConnection().fail(exception); + throw exception; + } - SimpleString address = message.getAddressSimpleString(); + //large message may come from StompSession directly, in which + //case the id header already generated. + if (!message.isLargeMessage()) { + long id = storageManager.generateID(); + // This will re-encode the message + message.setMessageID(id); + } - if (defaultAddress == null && address != null) { - defaultAddress = address; - } + SimpleString address = message.getAddressSimpleString(); - if (address == null) { - // We don't want to force a re-encode when the message gets sent to the consumer - message.setAddress(defaultAddress); - } + if (defaultAddress == null && address != null) { + defaultAddress = address; + } - if (logger.isTraceEnabled()) { - logger.trace("send(message=" + message + ", direct=" + direct + ") being called"); - } + if (address == null) { + // We don't want to force a re-encode when the message gets sent to the consumer + message.setAddress(defaultAddress); + } - if (message.getAddress() == null) { - // This could happen with some tests that are ignoring messages - throw ActiveMQMessageBundle.BUNDLE.noAddress(); - } + if (logger.isTraceEnabled()) { + logger.trace("send(message=" + message + ", direct=" + direct + ") being called"); + } - if (message.getAddressSimpleString().equals(managementAddress)) { - // It's a management message + if (message.getAddress() == null) { + // This could happen with some tests that are ignoring messages + throw ActiveMQMessageBundle.BUNDLE.noAddress(); + } - result = handleManagementMessage(tx, message, direct); - } else { - result = doSend(tx, message, address, direct, noAutoCreateQueue); - } + if (message.getAddressSimpleString().equals(managementAddress)) { + // It's a management message + result = handleManagementMessage(tx, message, direct); + } else { + result = doSend(tx, message, address, direct, noAutoCreateQueue); + } + + } catch (Exception e) { + if (server.hasBrokerMessagePlugins()) { + server.callBrokerMessagePlugins(plugin -> plugin.onSendException(this, tx, message, direct, noAutoCreateQueue, e)); + } + throw e; + } if (server.hasBrokerMessagePlugins()) { server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, result)); } - return result; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java index aef0970..404e8a4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java @@ -65,6 +65,21 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin { this.afterSend(tx, message, direct, noAutoCreateQueue, result); } + /** + * When there was an exception sending the message + * + * @param session + * @param tx + * @param message + * @param direct + * @param noAutoCreateQueue + * @param e the exception that occurred when sending the message + * @throws ActiveMQException + */ + default void onSendException(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, + Exception e) throws ActiveMQException { + + } /** * Before a message is sent @@ -129,6 +144,21 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin { } /** + * When there was an error routing the message + * + * @param message + * @param context + * @param direct + * @param rejectDuplicates + * @param e the exception that occurred during message routing + * @throws ActiveMQException + */ + default void onMessageRouteException(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, + Exception e) throws ActiveMQException { + + } + + /** * Before a message is delivered to a client consumer * * @param consumer the consumer the message will be delivered to http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java index ff23b59..3483472 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java @@ -491,6 +491,32 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial } } + @Override + public void onSendException(ServerSession session, + Transaction tx, + Message message, + boolean direct, + boolean noAutoCreateQueue, + Exception e) throws ActiveMQException { + if (logAll || logSendingEvents) { + + if (LoggingActiveMQServerPluginLogger.LOGGER.isDebugEnabled()) { + //details - debug level + LoggingActiveMQServerPluginLogger.LOGGER.onSendErrorDetails((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), + message, (session == null ? UNAVAILABLE : session.getName()), + tx, session, direct, noAutoCreateQueue); + } + + if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) { + //info level log + LoggingActiveMQServerPluginLogger.LOGGER.onSendError((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), + (session == null ? UNAVAILABLE : session.getName()), + (session == null ? UNAVAILABLE : session.getConnectionID().toString()), + e); + } + } + } + /** * Before a message is routed * @@ -540,6 +566,26 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial } } + @Override + public void onMessageRouteException(Message message, + RoutingContext context, + boolean direct, + boolean rejectDuplicates, + Exception e) throws ActiveMQException { + if (logAll || logSendingEvents) { + + //details - debug level logging + LoggingActiveMQServerPluginLogger.LOGGER.onMessageRouteErrorDetails(message, context, direct, rejectDuplicates); + + if (LoggingActiveMQServerPluginLogger.LOGGER.isInfoEnabled()) { + //info level log + LoggingActiveMQServerPluginLogger.LOGGER.onMessageRouteError((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), + e); + } + + } + } + /** * Before a message is delivered to a client consumer * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java index f519dd0..fa697ba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java @@ -141,6 +141,15 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger { @Message(id = 841016, value = "criticalFailure called with criticalComponent: {0}", format = Message.Format.MESSAGE_FORMAT) void criticalFailure(CriticalComponent components); + @LogMessage(level = Logger.Level.INFO) + @Message(id = 841017, value = "error sending message with ID: {0}, session name: {1}, session connectionID: {2}," + + " exception: {3}", format = Message.Format.MESSAGE_FORMAT) + void onSendError(String messageID, String sessionName, String sessionConnectionID, Exception e); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 841018, value = "error routing message with ID: {0}, exception: {1}", format = Message.Format.MESSAGE_FORMAT) + void onMessageRouteError(String messageID, Exception e); + //DEBUG messages @LogMessage(level = Logger.Level.DEBUG) @@ -258,4 +267,23 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger { @Message(id = 843015, value = "beforeDeployBridge called with bridgeConfiguration: {0}", format = Message.Format.MESSAGE_FORMAT) void beforeDeployBridge(BridgeConfiguration config); + @LogMessage(level = Logger.Level.DEBUG) + @Message(id = 843016, value = "onSendError message ID: {0}, message {1}, session name: {2} with tx: {3}, session: {4}, direct: {5}," + + " noAutoCreateQueue: {6}", format = Message.Format.MESSAGE_FORMAT) + void onSendErrorDetails(String messageID, + org.apache.activemq.artemis.api.core.Message message, + String sessionName, + Transaction tx, + ServerSession session, + boolean direct, + boolean noAutoCreateQueue); + + @LogMessage(level = Logger.Level.DEBUG) + @Message(id = 843017, value = "onMessageRouteError message: {0}, with context: {1}, direct: {2}, rejectDuplicates: {3}", + format = Message.Format.MESSAGE_FORMAT) + void onMessageRouteErrorDetails(org.apache.activemq.artemis.api.core.Message message, + RoutingContext context, + boolean direct, + boolean rejectDuplicates); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95ec8ea4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java index 9c24505..0d802cf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java @@ -90,8 +90,10 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { public static final String MESSAGE_ACKED = "messageAcknowledged"; public static final String BEFORE_SEND = "beforeSend"; public static final String AFTER_SEND = "afterSend"; + public static final String ON_SEND_EXCEPTION = "onSendException"; public static final String BEFORE_MESSAGE_ROUTE = "beforeMessageRoute"; public static final String AFTER_MESSAGE_ROUTE = "afterMessageRoute"; + public static final String ON_MESSAGE_ROUTE_EXCEPTION = "onMessageRouteException"; public static final String BEFORE_DELIVER = "beforeDeliver"; public static final String AFTER_DELIVER = "afterDeliver"; public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge"; @@ -305,6 +307,14 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { } @Override + public void onSendException(ServerSession session, Transaction tx, Message message, boolean direct, + boolean noAutoCreateQueue, Exception e) { + Preconditions.checkNotNull(message); + Preconditions.checkNotNull(e); + methodCalled(ON_SEND_EXCEPTION); + } + + @Override public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) { Preconditions.checkNotNull(message); Preconditions.checkNotNull(context); @@ -321,6 +331,15 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin { } @Override + public void onMessageRouteException(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, + Exception e) { + Preconditions.checkNotNull(message); + Preconditions.checkNotNull(context); + Preconditions.checkNotNull(e); + methodCalled(ON_MESSAGE_ROUTE_EXCEPTION); + } + + @Override public void beforeDeliver(ServerConsumer consumer, MessageReference reference) { Preconditions.checkNotNull(reference); methodCalled(BEFORE_DELIVER);
