This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new b8b7cc8  ARTEMIS-2300 Expiry notifications are not called from scanner
     new 58bf52a  This closes #2613
b8b7cc8 is described below

commit b8b7cc899f49bd1ce89d20058aa4990f34ae1c5a
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Wed Apr 10 10:08:12 2019 -0400

    ARTEMIS-2300 Expiry notifications are not called from scanner
---
 .../artemis/core/server/impl/QueueImpl.java        | 35 ++++++++++++++++++
 .../transaction/TransactionPropertyIndexes.java    |  2 ++
 .../integration/management/NotificationTest.java   | 42 ++++++++++++++++++++--
 3 files changed, 77 insertions(+), 2 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index cf74815..9d04f5b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -86,6 +86,7 @@ import 
org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeLis
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import 
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import 
org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
@@ -3023,8 +3024,42 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
          acknowledge(tx, ref, AckReason.EXPIRED, null);
       }
+
+      if (server != null && server.hasBrokerMessagePlugins()) {
+         ExpiryLogger expiryLogger = 
(ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
+         if (expiryLogger == null) {
+            expiryLogger = new ExpiryLogger();
+            tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, 
expiryLogger);
+            tx.addOperation(expiryLogger);
+         }
+
+         expiryLogger.addExpiry(address, ref);
+      }
+
+   }
+
+   private class ExpiryLogger extends TransactionOperationAbstract {
+
+      List<Pair<SimpleString, MessageReference>> expiries = new LinkedList<>();
+
+      public void addExpiry(SimpleString address, MessageReference ref) {
+         expiries.add(new Pair<>(address, ref));
+      }
+
+      @Override
+      public void afterCommit(Transaction tx) {
+         for (Pair<SimpleString, MessageReference> pair : expiries) {
+            try {
+               server.callBrokerMessagePlugins(plugin -> 
plugin.messageExpired(pair.getB(), pair.getA(), null));
+            } catch (Throwable e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         expiries.clear(); // just giving a hand to GC
+      }
    }
 
+
    @Override
    public void sendToDeadLetterAddress(final Transaction tx, final 
MessageReference ref) throws Exception {
       sendToDeadLetterAddress(tx, ref, 
addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java
index 19ae777..4ba31ee 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionPropertyIndexes.java
@@ -33,4 +33,6 @@ public class TransactionPropertyIndexes {
    public static final int PAGE_DELIVERY = 7;
 
    public static final int PAGE_CURSOR_POSITIONS = 8;
+
+   public static final int EXPIRY_LOGGER = 9;
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
index 196e939..ba229bf 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
@@ -349,6 +349,38 @@ public class NotificationTest extends ActiveMQTestBase {
       session.deleteQueue(queue);
    }
 
+   @Test
+   public void testMessageExpiredWithoutConsumers() throws Exception {
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession mySession = sf.createSession("myUser", "myPassword", 
false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize());
+
+      mySession.start();
+
+      SimpleString queue = RandomUtil.randomSimpleString();
+      SimpleString address = RandomUtil.randomSimpleString();
+      boolean durable = RandomUtil.randomBoolean();
+
+      session.createQueue(address, queue, durable);
+      ClientProducer producer = mySession.createProducer(address);
+
+      NotificationTest.flush(notifConsumer);
+
+      ClientMessage msg = session.createMessage(false);
+      msg.putStringProperty("someKey", "someValue");
+      msg.setExpiration(1);
+      producer.send(msg);
+      Thread.sleep(500);
+
+      ClientMessage[] notifications = NotificationTest.consumeMessages(1, 
notifConsumer, 5000);
+      Assert.assertEquals(MESSAGE_EXPIRED.toString(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+      
Assert.assertNotNull(notifications[0].getObjectProperty(ManagementHelper.HDR_MESSAGE_ID));
+      Assert.assertEquals(address, 
notifications[0].getObjectProperty(ManagementHelper.HDR_ADDRESS));
+      Assert.assertEquals(queue, 
notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_NAME));
+      Assert.assertEquals(RoutingType.MULTICAST.getType(), 
notifications[0].getObjectProperty(ManagementHelper.HDR_ROUTING_TYPE));
+
+      session.deleteQueue(queue);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -358,7 +390,7 @@ public class NotificationTest extends ActiveMQTestBase {
    public void setUp() throws Exception {
       super.setUp();
 
-      server = 
addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false));
+      server = 
addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig().setMessageExpiryScanPeriod(100),
 false));
       NotificationActiveMQServerPlugin notificationPlugin = new 
NotificationActiveMQServerPlugin();
       notificationPlugin.setSendAddressNotifications(true);
       notificationPlugin.setSendConnectionNotifications(true);
@@ -392,11 +424,17 @@ public class NotificationTest extends ActiveMQTestBase {
 
    protected static ClientMessage[] consumeMessages(final int expected,
                                                     final ClientConsumer 
consumer) throws Exception {
+      return consumeMessages(expected, consumer, 500);
+   }
+
+   protected static ClientMessage[] consumeMessages(final int expected,
+                                                    final ClientConsumer 
consumer,
+                                                    final int timeout) throws 
Exception {
       ClientMessage[] messages = new ClientMessage[expected];
 
       ClientMessage m = null;
       for (int i = 0; i < expected; i++) {
-         m = consumer.receive(500);
+         m = consumer.receive(timeout);
          if (m != null) {
             for (SimpleString key : m.getPropertyNames()) {
                System.out.println(key + "=" + m.getObjectProperty(key));

Reply via email to