Github user clebertsuconic commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1801#discussion_r163065589
--- Diff:
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
---
@@ -393,26 +395,47 @@ public void send(final ProducerInfo producerInfo,
this.connection.getContext().setDontSendReponse(false);
connection.sendException(exceptionToSend);
} else {
- if (sendProducerAck) {
- try {
- ProducerAck ack = new
ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
- connection.dispatchAsync(ack);
- } catch (Exception e) {
-
this.connection.getContext().setDontSendReponse(false);
-
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
- connection.sendException(e);
+
server.getStorageManager().afterCompleteOperations(new IOCallback() {
+ @Override
+ public void done() {
+ if (sendProducerAck) {
+ try {
+ ProducerAck ack = new
ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+ connection.dispatchAsync(ack);
+ } catch (Exception e) {
+
connection.getContext().setDontSendReponse(false);
+
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ connection.sendException(e);
+ }
+ } else {
+
connection.getContext().setDontSendReponse(false);
+ try {
+ Response response = new Response();
+
response.setCorrelationId(messageSend.getCommandId());
+ connection.dispatchAsync(response);
+ } catch (Exception e) {
+
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+ connection.sendException(e);
+ }
+ }
}
- } else {
- connection.getContext().setDontSendReponse(false);
- try {
- Response response = new Response();
-
response.setCorrelationId(messageSend.getCommandId());
- connection.dispatchAsync(response);
- } catch (Exception e) {
-
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
- connection.sendException(e);
+
+ @Override
+ public void onError(int errorCode, String
errorMessage) {
+ //failing here is severe and IO related
+ final Throwable criticalError = new
IOException(errorMessage);
+ try {
+ //it is handled async and hopefully will be
sent before the critical error shutdown the broker:
+ //it helps to fail fast the clients on
critical errors
+ connection.serviceException(criticalError);
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.debug(e);
+ } finally {
+ //it needs to be called ASAP: the broker
isn't in a safe state
+
server.getStorageManager().criticalError(criticalError);
--- End diff --
this is probably also called earlier by the IO layer.. but it doesn't hurt
here.
---