ARTEMIS-2159 OpenWire would allow one extra send

Thanks to Otavio Piske collaborating a test change here.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/02a6d5bb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/02a6d5bb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/02a6d5bb

Branch: refs/heads/master
Commit: 02a6d5bb493d6e0eea1ed847157d4e6b57aacf7f
Parents: 46588c8
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Wed Oct 31 09:13:05 2018 -0400
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Wed Oct 31 12:46:12 2018 -0400

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQSession.java  | 100 +++++++++----------
 .../core/paging/impl/PagingStoreImpl.java       |   4 +-
 .../openwire/OpenWireFlowControlFailTest.java   |  30 ++++--
 3 files changed, 76 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/02a6d5bb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 0250f1c..a107ba7 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -443,63 +443,63 @@ public class AMQSession implements SessionCallback {
                                         final AtomicInteger count,
                                         final 
org.apache.activemq.artemis.api.core.Message coreMsg,
                                         final SimpleString address) throws 
ResourceAllocationException {
-      if (!store.checkMemory(() -> {
-         Exception exceptionToSend = null;
-
-         try {
-            getCoreSession().send(coreMsg, false, dest.isTemporary());
-         } catch (Exception e) {
-            logger.warn(e.getMessage(), e);
-            exceptionToSend = e;
-         }
+      if (!store.checkMemory(null)) {
+         this.connection.getContext().setDontSendReponse(false);
          connection.enableTtl();
-         if (count == null || count.decrementAndGet() == 0) {
-            if (exceptionToSend != null) {
-               this.connection.getContext().setDontSendReponse(false);
-               connection.sendException(exceptionToSend);
-            } else {
-               server.getStorageManager().afterCompleteOperations(new 
IOCallback() {
-                  @Override
-                  public void done() {
-                     if (sendProducerAck) {
-                        try {
-                           ProducerAck ack = new 
ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
-                           connection.dispatchAsync(ack);
-                        } catch (Exception e) {
-                           connection.getContext().setDontSendReponse(false);
-                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
-                           connection.sendException(e);
-                        }
-                     } else {
+         throw new ResourceAllocationException("Queue is full " + address);
+      }
+
+      Exception exceptionToSend = null;
+
+      try {
+         getCoreSession().send(coreMsg, false, dest.isTemporary());
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+         exceptionToSend = e;
+      }
+      connection.enableTtl();
+      if (count == null || count.decrementAndGet() == 0) {
+         if (exceptionToSend != null) {
+            this.connection.getContext().setDontSendReponse(false);
+            connection.sendException(exceptionToSend);
+         } else {
+            server.getStorageManager().afterCompleteOperations(new 
IOCallback() {
+               @Override
+               public void done() {
+                  if (sendProducerAck) {
+                     try {
+                        ProducerAck ack = new 
ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+                        connection.dispatchAsync(ack);
+                     } catch (Exception e) {
                         connection.getContext().setDontSendReponse(false);
-                        try {
-                           Response response = new Response();
-                           
response.setCorrelationId(messageSend.getCommandId());
-                           connection.dispatchAsync(response);
-                        } catch (Exception e) {
-                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
-                           connection.sendException(e);
-                        }
+                        ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+                        connection.sendException(e);
                      }
-                  }
-
-                  @Override
-                  public void onError(int errorCode, String errorMessage) {
+                  } else {
+                     connection.getContext().setDontSendReponse(false);
                      try {
-                        final IOException e = new IOException(errorMessage);
-                        ActiveMQServerLogger.LOGGER.warn(errorMessage);
-                        connection.serviceException(e);
-                     } catch (Exception ex) {
-                        ActiveMQServerLogger.LOGGER.debug(ex);
+                        Response response = new Response();
+                        response.setCorrelationId(messageSend.getCommandId());
+                        connection.dispatchAsync(response);
+                     } catch (Exception e) {
+                        ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+                        connection.sendException(e);
                      }
                   }
-               });
-            }
+               }
+
+               @Override
+               public void onError(int errorCode, String errorMessage) {
+                  try {
+                     final IOException e = new IOException(errorMessage);
+                     ActiveMQServerLogger.LOGGER.warn(errorMessage);
+                     connection.serviceException(e);
+                  } catch (Exception ex) {
+                     ActiveMQServerLogger.LOGGER.debug(ex);
+                  }
+               }
+            });
          }
-      })) {
-         this.connection.getContext().setDontSendReponse(false);
-         connection.enableTtl();
-         throw new ResourceAllocationException("Queue is full " + address);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/02a6d5bb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 00001cc..908ab9f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -693,7 +693,9 @@ public class PagingStoreImpl implements PagingStore {
          }
       }
 
-      runWhenAvailable.run();
+      if (runWhenAvailable != null) {
+         runWhenAvailable.run();
+      }
 
       return true;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/02a6d5bb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
index 341f920..a2685b0 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireFlowControlFailTest.java
@@ -18,6 +18,7 @@ package 
org.apache.activemq.artemis.tests.integration.openwire;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -58,34 +59,49 @@ public class OpenWireFlowControlFailTest extends 
OpenWireTestBase {
          textBody.append(" ");
       }
       ConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
+      int numberOfMessage = 0;
       try (Connection connection = factory.createConnection()) {
          Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
          javax.jms.Queue queue = 
session.createQueue(addressInfo.getName().toString());
          MessageProducer producer = session.createProducer(queue);
-         int numberOfMessage = 0;
          boolean failed = false;
          try {
             for (int i = 0; i < 1000; i++) {
-               producer.send(session.createTextMessage(textBody.toString()));
+               TextMessage message = 
session.createTextMessage(textBody.toString());
+               message.setIntProperty("i", i);
+
+               producer.send(message);
                numberOfMessage++;
             }
          } catch (Exception e) {
             e.printStackTrace(System.out);
             failed = true;
+            try {
+               producer.send(session.createTextMessage(textBody.toString()));
+               Assert.fail("Exception expected");
+            } catch (JMSException expected) {
+               expected.printStackTrace();
+
+            }
          }
+         Assert.assertTrue(failed);
+      }
 
-         System.out.println("Message failed with " + numberOfMessage);
+      factory = new ActiveMQConnectionFactory(urlString);
+      try (Connection connection2 = factory.createConnection()) {
+         Session session2 = connection2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = 
session2.createQueue(addressInfo.getName().toString());
 
-         Assert.assertTrue(failed);
-         MessageConsumer consumer = session.createConsumer(queue);
-         connection.start();
+         MessageConsumer consumer = session2.createConsumer(queue);
+         connection2.start();
          for (int i = 0; i < numberOfMessage; i++) {
             TextMessage message = (TextMessage) consumer.receive(5000);
             Assert.assertNotNull(message);
             Assert.assertEquals(textBody.toString(), message.getText());
          }
 
-         Assert.assertNull(consumer.receiveNoWait());
+         TextMessage msg = (TextMessage)consumer.receive(500);
+         Assert.assertNull(msg);
       }
    }
 }

Reply via email to