This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new e719622de5 ARTEMIS-4285 Limit number of redelivery records
e719622de5 is described below

commit e719622de5488f859f70beda926afaa51d5b0ff9
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Thu May 18 13:27:13 2023 -0400

    ARTEMIS-4285 Limit number of redelivery records
---
 .../activemq/artemis/cli/commands/etc/broker.xml   |   4 +
 .../api/config/ActiveMQDefaultConfiguration.java   |   7 +
 .../artemis/core/config/Configuration.java         |   7 +
 .../core/config/impl/ConfigurationImpl.java        |  13 ++
 .../deployers/impl/FileConfigurationParser.java    |   2 +
 .../journal/AbstractJournalStorageManager.java     |   7 +
 .../journal/codec/ScheduledDeliveryEncoding.java   |   2 +-
 .../resources/schema/artemis-configuration.xsd     |  11 ++
 .../config/impl/FileConfigurationParserTest.java   |  16 ++
 .../artemis/tests/util/ActiveMQTestBase.java       |   2 +-
 .../resources/ConfigurationTest-full-config.xml    |   1 +
 docs/user-manual/en/configuration-index.md         |   1 +
 docs/user-manual/en/undelivered-messages.md        |   6 +
 .../client/RescheduleJDBCDeliveryTest.java         | 180 +++++++++++++++++++++
 14 files changed, 257 insertions(+), 2 deletions(-)

diff --git 
a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
 
b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
index accc00a171..9ea75a2d4f 100644
--- 
a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
+++ 
b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
@@ -31,6 +31,10 @@ under the License.
 ${jdbc}
       <persistence-enabled>${persistence-enabled}</persistence-enabled>
 
+      <!-- It is recommended to keep this value as 1, maximizing the number of 
records stored about redeliveries.
+           However if you must preserve state of individual redeliveries, you 
may increase this value or set it to -1 (infinite). -->
+      <max-redelivery-records>1</max-redelivery-records>
+
       <!-- this could be ASYNCIO, MAPPED, NIO
            ASYNCIO: Linux Libaio
            MAPPED: mmap files
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 69c1aba3c9..0686cdd985 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -248,6 +248,9 @@ public final class ActiveMQDefaultConfiguration {
    // True means that the delivery count is persisted before delivery. False 
means that this only happens after a message has been cancelled.
    private static boolean DEFAULT_PERSIST_DELIVERY_COUNT_BEFORE_DELIVERY = 
false;
 
+   // Default Maximum number of records we would store for redeliveries
+   private static int DEFAULT_MAX_REDELIVERY_RECORDS = 10;
+
    // the directory to store paged messages in
    private static String DEFAULT_PAGING_DIR = "data/paging";
 
@@ -685,6 +688,10 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_PERSISTENCE_ENABLED;
    }
 
+   public static int getDefaultMaxRedeliveryRecords() {
+      return DEFAULT_MAX_REDELIVERY_RECORDS;
+   }
+
    public static boolean isDefaultJournalDatasync() {
       return DEFAULT_JOURNAL_DATASYNC;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 29dd21b2c5..44cab79f97 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -138,6 +138,13 @@ public interface Configuration {
     */
    Configuration setPersistenceEnabled(boolean enable);
 
+   /**
+    * Maximum number of redelivery records stored on the journal per message 
reference.
+    */
+   Configuration setMaxRedeliveryRecords(int maxPersistRedelivery);
+
+   int getMaxRedeliveryRecords();
+
    /**
     * Should use fdatasync on journal files.
     *
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 6d2b006ce0..45a452cb4e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -148,6 +148,8 @@ public class ConfigurationImpl implements Configuration, 
Serializable {
 
    private boolean persistenceEnabled = 
ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled();
 
+   private int maxRedeliveryRecords = 
ActiveMQDefaultConfiguration.getDefaultMaxRedeliveryRecords();
+
    private boolean journalDatasync = 
ActiveMQDefaultConfiguration.isDefaultJournalDatasync();
 
    protected long fileDeploymentScanPeriod = 
ActiveMQDefaultConfiguration.getDefaultFileDeployerScanPeriod();
@@ -928,6 +930,17 @@ public class ConfigurationImpl implements Configuration, 
Serializable {
       return this;
    }
 
+   @Override
+   public Configuration setMaxRedeliveryRecords(int max) {
+      maxRedeliveryRecords = max;
+      return this;
+   }
+
+   @Override
+   public int getMaxRedeliveryRecords() {
+      return maxRedeliveryRecords;
+   }
+
    @Override
    public boolean isJournalDatasync() {
       return journalDatasync;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 82e083cced..842b72681c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -397,6 +397,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
       config.setPersistDeliveryCountBeforeDelivery(getBoolean(e, 
"persist-delivery-count-before-delivery", 
config.isPersistDeliveryCountBeforeDelivery()));
 
+      config.setMaxRedeliveryRecords(getInteger(e, "max-redelivery-records", 
config.getMaxRedeliveryRecords(), Validators.MINUS_ONE_OR_GE_ZERO));
+
       config.setScheduledThreadPoolMaxSize(getInteger(e, 
"scheduled-thread-pool-max-size", config.getScheduledThreadPoolMaxSize(), 
Validators.GT_ZERO));
 
       config.setThreadPoolMaxSize(getInteger(e, "thread-pool-max-size", 
config.getThreadPoolMaxSize(), Validators.MINUS_ONE_OR_GT_ZERO));
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 1d430d302f..91736ea5ed 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -475,6 +475,9 @@ public abstract class AbstractJournalStorageManager extends 
CriticalComponentImp
 
    @Override
    public void updateScheduledDeliveryTime(final MessageReference ref) throws 
Exception {
+      if (config.getMaxRedeliveryRecords() >= 0 && ref.getDeliveryCount() > 
config.getMaxRedeliveryRecords()) {
+         return;
+      }
       ScheduledDeliveryEncoding encoding = new 
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), 
ref.getQueue().getID());
       try (ArtemisCloseable lock = closeableReadLock()) {
          messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), 
JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, 
true, this::recordNotFoundCallback, getContext(syncNonTransactional));
@@ -706,6 +709,10 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
          return;
       }
 
+      if (config.getMaxRedeliveryRecords() >= 0 && ref.getDeliveryCount() > 
config.getMaxRedeliveryRecords()) {
+         return;
+      }
+
       ref.setPersistedCount(ref.getDeliveryCount());
       DeliveryCountUpdateEncoding updateInfo = new 
DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/ScheduledDeliveryEncoding.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/ScheduledDeliveryEncoding.java
index d0559ec3ee..1f80f505b6 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/ScheduledDeliveryEncoding.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/ScheduledDeliveryEncoding.java
@@ -24,7 +24,7 @@ public class ScheduledDeliveryEncoding extends QueueEncoding {
 
    @Override
    public String toString() {
-      return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + 
scheduledDeliveryTime + "]";
+      return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + 
scheduledDeliveryTime + ", queueID=" + queueID + "]";
    }
 
    public ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final 
long queueID) {
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 863be22f4c..538f6278cd 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -412,6 +412,17 @@
             </xsd:annotation>
          </xsd:element>
 
+         <xsd:element name="max-redelivery-records" type="xsd:long" 
default="10" maxOccurs="1"
+                      minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  The default for this value is 10, however the recommended 
set for this is 1.
+                  The system will add a store update for every redelivery 
happening on the system.
+                  It is recommended to keep max-redelivery-records=1 in 
situations where you are operating with very short redelivery delays as you 
will be creating unecessary records on the journal.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
+
          <xsd:element name="populate-validated-user" type="xsd:boolean" 
default="false" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index f100623800..f092466589 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -535,6 +535,22 @@ public class FileConfigurationParserTest extends 
ActiveMQTestBase {
       Assert.assertEquals(1000, configuration.getGlobalMaxMessages());
    }
 
+   @Test
+   public void testConfigurationPersistRedelivery() throws Exception {
+      StringPrintStream stringPrintStream = new StringPrintStream();
+      PrintStream stream = stringPrintStream.newStream();
+
+      stream.println("<configuration><core>");
+      stream.println("<max-redelivery-records>0</max-redelivery-records>");
+      stream.println("</core></configuration>");
+
+      ByteArrayInputStream inputStream = new 
ByteArrayInputStream(stringPrintStream.getBytes());
+      FileConfigurationParser parser = new FileConfigurationParser();
+      Configuration configuration = parser.parseMainConfig(inputStream);
+
+      Assert.assertEquals(0, configuration.getMaxRedeliveryRecords());
+   }
+
    @Test
    public void testExceptionMaxSize() throws Exception {
       StringPrintStream stringPrintStream = new StringPrintStream();
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 736c55e496..a7d4ddb471 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -919,7 +919,7 @@ public abstract class ActiveMQTestBase extends Assert {
       return "memory:" + getTestDir();
    }
 
-   private String getTestJDBCConnectionUrl() {
+   protected final String getTestJDBCConnectionUrl() {
       return System.getProperty("jdbc.connection.url", "jdbc:derby:" + 
getEmbeddedDataBaseName() + ";create=true");
    }
 
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index e3980a8bce..6758183f63 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -77,6 +77,7 @@
          
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor2</class-name>
       </remoting-outgoing-interceptors>
       
<persist-delivery-count-before-delivery>true</persist-delivery-count-before-delivery>
+      <max-redelivery-records>7</max-redelivery-records>
       <connectors>
          <connector name="connector1">
             tcp://localhost1:5678?
diff --git a/docs/user-manual/en/configuration-index.md 
b/docs/user-manual/en/configuration-index.md
index 8f522934eb..55818d3940 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -210,6 +210,7 @@ name | node name; used in topology notifications if set. | 
n/a
 [read-whole-page](paging.md) | If true the whole page would be read, otherwise 
just seek and read while getting message. | `false`
 [paging-directory](paging.md#configuration)| the directory to store paged 
messages in. | `data/paging`
 
[persist-delivery-count-before-delivery](undelivered-messages.md#delivery-count-persistence)
 | True means that the delivery count is persisted before delivery. False means 
that this only happens after a message has been cancelled. | `false`
+[max-redelivery-records](undelivered-messages.md#persist-redelivery) | Maximum 
number of records the system will store for redeliveries. In most cases this 
should be set to '1'. | `10`
 [persistence-enabled](persistence.md#zero-persistence)| true means that the 
server will use the file based journal for persistence. | `true`
 [persist-id-cache](duplicate-detection.md#configuring-the-duplicate-id-cache) 
| true means that ID's are persisted to the journal. | `true`
 queues | **deprecated** [use addresses](#address-type) | n/a
diff --git a/docs/user-manual/en/undelivered-messages.md 
b/docs/user-manual/en/undelivered-messages.md
index c24073ba9d..d081509835 100644
--- a/docs/user-manual/en/undelivered-messages.md
+++ b/docs/user-manual/en/undelivered-messages.md
@@ -31,6 +31,12 @@ fail or rollback. Without a delayed redelivery, the system 
can get into a
 and delivery being re-attempted ad infinitum in quick succession,
 consuming valuable CPU and network resources.
 
+#Persist Redelivery
+
+Two Journal update records are stored every time a redelivery happens. One for 
the number of deliveries that happened, and one in case a scheduled redelivery 
is being used. 
+
+It is recommended to keep max-redelivery-records=1 in situations where you are 
operating with very short redelivery delays as you will be creating unecessary 
records on the journal.
+
 ### Configuring Delayed Redelivery
 
 Delayed redelivery is defined in the address-setting configuration:
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RescheduleJDBCDeliveryTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RescheduleJDBCDeliveryTest.java
new file mode 100644
index 0000000000..f1d0156574
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RescheduleJDBCDeliveryTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.cli.commands.tools.PrintData;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RescheduleJDBCDeliveryTest extends ActiveMQTestBase {
+
+   // Set this to true if you're debugging what happened in the journal
+   private final boolean PRINT_DATA = false;
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+   @Test
+   public void testRescheduledRedeliveryCORE() throws Exception {
+      testRescheduledRedelivery("CORE", 0);
+   }
+
+   @Test
+   public void testRescheduledRedeliveryCORE_1() throws Exception {
+      testRescheduledRedelivery("CORE", 1);
+   }
+
+   @Test
+   public void testRescheduledRedeliveryAMQP_1() throws Exception {
+      testRescheduledRedelivery("AMQP", 1);
+   }
+
+   @Test
+   public void testRescheduledRedeliveryAMQP() throws Exception {
+      testRescheduledRedelivery("AMQP", 0);
+   }
+
+   @Test
+   public void testRescheduledRedeliveryCOREInfinite() throws Exception {
+      testRescheduledRedelivery("CORE", -1);
+   }
+
+   @Test
+   public void testRescheduledRedeliveryAMQPInfinite() throws Exception {
+      testRescheduledRedelivery("AMQP", -1);
+   }
+
+   private void testRescheduledRedelivery(String protocol, int maxRecords) 
throws Exception {
+      int maxRedeliveries = 100;
+      String testQueue = getName();
+      Configuration configuration = createDefaultJDBCConfig(true);
+      configuration.setMaxRedeliveryRecords(maxRecords);
+      configuration.addAddressSetting("#", new 
AddressSettings().setRedeliveryDelay(1).setMaxDeliveryAttempts(-1).setDeadLetterAddress(SimpleString.toSimpleString("DLQ")));
+      configuration.addAddressConfiguration(new 
CoreAddressConfiguration().setName("DLQ").addRoutingType(RoutingType.ANYCAST));
+      configuration.addQueueConfiguration(new 
QueueConfiguration("DLQ").setAddress("DLQ").setRoutingType(RoutingType.ANYCAST));
+      configuration.addAddressConfiguration(new 
CoreAddressConfiguration().setName(testQueue).addRoutingType(RoutingType.ANYCAST));
+      configuration.addQueueConfiguration(new 
QueueConfiguration(testQueue).setAddress(testQueue).setRoutingType(RoutingType.ANYCAST));
+      ActiveMQServer server = createServer(true, configuration, 
AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
+      server.start();
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(testQueue);
+         MessageProducer producer = session.createProducer(queue);
+         producer.send(session.createTextMessage("hello"));
+         session.commit();
+
+         connection.start();
+
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         for (int i = 0; i < maxRedeliveries; i++) {
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            logger.debug("received {}", message);
+            Assert.assertNotNull(message);
+            session.rollback();
+         }
+      }
+
+      server.stop();
+
+      java.sql.Connection jdbcConnection = 
DriverManager.getConnection(getTestJDBCConnectionUrl());
+      runAfter(jdbcConnection::close);
+
+      int records = executeQuery(jdbcConnection, "SELECT * FROM MESSAGE WHERE 
USERRECORDTYPE=36 OR USERRECORDTYPE=34");
+
+      // manually set this value to true if you need to understand what's in 
the journal
+      if (PRINT_DATA) {
+         PrintData printData = new PrintData();
+         printData.printDataJDBC(configuration, System.out);
+      }
+
+      if (maxRecords < 0) {
+         Assert.assertEquals(maxRedeliveries * 2, records);
+      } else {
+
+         Assert.assertEquals(maxRecords * 2, records);
+      }
+
+      server.start();
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(testQueue);
+         connection.start();
+
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         TextMessage message = (TextMessage) consumer.receive(5000);
+         logger.debug("received {}", message);
+         Assert.assertNotNull(message);
+         session.commit();
+         Assert.assertNull(consumer.receiveNoWait());
+      }
+   }
+
+   protected int executeQuery(java.sql.Connection connection, String sql) 
throws Exception {
+      PreparedStatement statement = connection.prepareStatement(sql);
+      ResultSet result = statement.executeQuery();
+      ResultSetMetaData metaData = result.getMetaData();
+      int columnCount = metaData.getColumnCount();
+      int records = 0;
+
+      while (result.next()) {
+         if (logger.isDebugEnabled()) {
+            StringBuffer line = new StringBuffer();
+            for (int i = 1; i <= columnCount; i++) {
+               Object value = result.getObject(i);
+               line.append(metaData.getColumnLabel(i) + " = " + value);
+               if (i + 1 <= columnCount)
+                  line.append(", ");
+            }
+            logger.info(line.toString());
+         }
+         records++;
+      }
+
+      return records;
+
+   }
+}

Reply via email to