cshannon commented on code in PR #1139:
URL: https://github.com/apache/activemq/pull/1139#discussion_r1455906273


##########
activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java:
##########
@@ -291,13 +294,43 @@ public void send(ProducerBrokerExchange producerExchange, 
final Message message)
             transaction = getTransaction(context, message.getTransactionId(), 
false);
         }
         context.setTransaction(transaction);
+
         try {
+            // [AMQ-9344] Limit uncommitted transactions by count
+            verifyUncommittedCount(producerExchange, transaction, message);
             next.send(producerExchange, message);
         } finally {
             context.setTransaction(originalTx);
         }
     }
 
+    protected void verifyUncommittedCount(ProducerBrokerExchange 
producerExchange, Transaction transaction, Message message) throws Exception {
+        // maxUncommittedCount <= 0 disables
+        int maxUncommittedCount = 
this.getBrokerService().getMaxUncommittedCount();

Review Comment:
   You should be able to just read this in the constructor and store it as a 
final variable on the TransactionBroker as it won't be changed later since 
there's no dynamic config for the property



##########
activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java:
##########
@@ -291,13 +294,43 @@ public void send(ProducerBrokerExchange producerExchange, 
final Message message)
             transaction = getTransaction(context, message.getTransactionId(), 
false);
         }
         context.setTransaction(transaction);
+
         try {
+            // [AMQ-9344] Limit uncommitted transactions by count
+            verifyUncommittedCount(producerExchange, transaction, message);
             next.send(producerExchange, message);
         } finally {
             context.setTransaction(originalTx);
         }
     }
 
+    protected void verifyUncommittedCount(ProducerBrokerExchange 
producerExchange, Transaction transaction, Message message) throws Exception {
+        // maxUncommittedCount <= 0 disables
+        int maxUncommittedCount = 
this.getBrokerService().getMaxUncommittedCount();
+        if (maxUncommittedCount > 0 && transaction.size() >= 
maxUncommittedCount) {

Review Comment:
   Can remove maxUncommittedCount > 0 here if moved up as my other comment says



##########
activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.JMX;
+import javax.management.MBeanServer;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import jakarta.jms.Connection;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.ResourceAllocationException;
+import jakarta.jms.Session;
+
+public class MaxUncommittedCountExceededTest {
+
+    public static final String DEFAULT_JMX_DOMAIN_NAME = "org.apache.activemq";
+    public static final String DEFAULT_JMX_BROKER_NAME = "localhost";
+
+    public static final String DEFAULT_JMS_USER = "admin";
+    public static final String DEFAULT_JMS_PASS = "admin";
+
+    protected ActiveMQConnectionFactory activemqConnectionFactory = null;
+    protected BrokerService brokerService = null;
+
+    @Rule
+    public TestName testName = new TestName();
+
+    // Control session
+    protected Connection connection = null;
+    protected Session session = null;
+    protected MessageProducer messageProducer = null;
+
+    protected String methodNameDestinationName = null;
+    protected MBeanServer mbeanServer = null;
+    protected QueueViewMBean queueViewMBean = null;
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setPersistent(true);
+        brokerService.setUseJmx(true);
+        brokerService.addConnector("tcp://localhost:0").setName("Default");
+        brokerService.setBrokerName("localhost");
+        brokerService.start();
+        brokerService.waitUntilStarted(30_000);
+        brokerService.deleteAllMessages();
+        assertNotNull(brokerService);
+
+        activemqConnectionFactory = new 
ActiveMQConnectionFactory(brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString());
+        connection = activemqConnectionFactory.createConnection();
+        connection.start();
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        methodNameDestinationName = "AMQ.TX." + 
cleanParameterizedMethodName(testName.getMethodName().toUpperCase());
+        Queue queue = session.createQueue(methodNameDestinationName);
+        messageProducer = session.createProducer(queue);
+        mbeanServer = ManagementFactory.getPlatformMBeanServer();
+        brokerService.getAdminView().addQueue(methodNameDestinationName);
+        queueViewMBean = getQueueViewMBean(new 
ActiveMQQueue(methodNameDestinationName));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (messageProducer != null) {
+            try {
+                messageProducer.close();
+            } catch (Exception e) {
+            } finally {
+                messageProducer = null;
+            }
+        }
+
+        if (session != null) {
+            try {
+                session.close();
+            } catch (Exception e) {
+            } finally {
+                session = null;
+            }
+        }
+
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Exception e) {
+            } finally {
+                connection = null;
+            }
+        }
+
+        methodNameDestinationName = null;
+        activemqConnectionFactory = null;
+        if(brokerService != null) {
+            brokerService.deleteAllMessages();
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+
+    protected static String cleanParameterizedMethodName(String methodName) {
+        // clean up parameterized method string:
+        // TESTMESSAGETIMESTAMPTIMETOLIVE[DESTINATIONTYPE=QUEUE, 
MESSAGETYPE=BYTES]
+        // returns: TESTMESSAGETIMESTAMPTIMETOLIVE.QUEUE.BYTES
+
+        if (methodName == null || (!methodName.contains("[") && 
!methodName.contains("]"))) {
+            return methodName;
+        }
+
+        String[] step1 = methodName.split("\\[", 2);
+        String[] step2 = step1[1].split("\\]", 2);
+        String[] step3 = step2[0].split(",", 16);
+
+        return step1[0] + "." + step3[0].split("=", 2)[1] + "." + 
step3[1].split("=", 2)[1];
+    }
+
+    protected QueueViewMBean getQueueViewMBean(ActiveMQDestination 
destination) throws Exception {
+        return JMX.newMBeanProxy(mbeanServer, 
BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME,
 DEFAULT_JMX_BROKER_NAME).toString(), destination), QueueViewMBean.class);
+    }
+
+    @Test
+    public void testUncommittedCountExceededSync() throws Exception {

Review Comment:
   A lot  of the code in these two tests are identical and copy and pasted. 
This could be refactored to re-use some of the code or maybe even using 
parameterized tests to reduce the bloat. 



##########
activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.JMX;
+import javax.management.MBeanServer;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import jakarta.jms.Connection;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.ResourceAllocationException;
+import jakarta.jms.Session;
+
+public class MaxUncommittedCountExceededTest {
+
+    public static final String DEFAULT_JMX_DOMAIN_NAME = "org.apache.activemq";
+    public static final String DEFAULT_JMX_BROKER_NAME = "localhost";
+
+    public static final String DEFAULT_JMS_USER = "admin";
+    public static final String DEFAULT_JMS_PASS = "admin";
+
+    protected ActiveMQConnectionFactory activemqConnectionFactory = null;
+    protected BrokerService brokerService = null;
+
+    @Rule
+    public TestName testName = new TestName();
+
+    // Control session
+    protected Connection connection = null;
+    protected Session session = null;
+    protected MessageProducer messageProducer = null;
+
+    protected String methodNameDestinationName = null;
+    protected MBeanServer mbeanServer = null;
+    protected QueueViewMBean queueViewMBean = null;
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setPersistent(true);
+        brokerService.setUseJmx(true);
+        brokerService.addConnector("tcp://localhost:0").setName("Default");
+        brokerService.setBrokerName("localhost");
+        brokerService.start();
+        brokerService.waitUntilStarted(30_000);
+        brokerService.deleteAllMessages();
+        assertNotNull(brokerService);
+
+        activemqConnectionFactory = new 
ActiveMQConnectionFactory(brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString());
+        connection = activemqConnectionFactory.createConnection();
+        connection.start();
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        methodNameDestinationName = "AMQ.TX." + 
cleanParameterizedMethodName(testName.getMethodName().toUpperCase());
+        Queue queue = session.createQueue(methodNameDestinationName);
+        messageProducer = session.createProducer(queue);
+        mbeanServer = ManagementFactory.getPlatformMBeanServer();
+        brokerService.getAdminView().addQueue(methodNameDestinationName);
+        queueViewMBean = getQueueViewMBean(new 
ActiveMQQueue(methodNameDestinationName));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (messageProducer != null) {

Review Comment:
   The code here to close the producer and session seems un-necessary as 
closing the connection will also close these. You also don't really need to set 
the reference to null or any references to null as they will be reset in the 
setUp() method when running in the future or just cleaned up by the garbage 
collector when the test is done. 
   
   It's not a huge deal but the code base has a ton of this kind of extra code 
laying around that is unnecessary and just leads to more bloat. We really 
should just create utilities or  move this clean up stuff to a common section 
anyways.



##########
activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java:
##########
@@ -291,13 +294,43 @@ public void send(ProducerBrokerExchange producerExchange, 
final Message message)
             transaction = getTransaction(context, message.getTransactionId(), 
false);
         }
         context.setTransaction(transaction);
+
         try {
+            // [AMQ-9344] Limit uncommitted transactions by count
+            verifyUncommittedCount(producerExchange, transaction, message);

Review Comment:
   
   ```suggestion
               if (maxUncommittedCount > 0) {
                   verifyUncommittedCount(producerExchange, transaction, 
message);
               }
   ```
   I would move the check here for enabled, it's a small performance hit 
(likely not really noticable) but still I don't see a reason to always invoke a 
method for every single message when this is disabled.



##########
activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java:
##########
@@ -291,13 +294,43 @@ public void send(ProducerBrokerExchange producerExchange, 
final Message message)
             transaction = getTransaction(context, message.getTransactionId(), 
false);
         }
         context.setTransaction(transaction);
+
         try {
+            // [AMQ-9344] Limit uncommitted transactions by count
+            verifyUncommittedCount(producerExchange, transaction, message);
             next.send(producerExchange, message);
         } finally {
             context.setTransaction(originalTx);
         }
     }
 
+    protected void verifyUncommittedCount(ProducerBrokerExchange 
producerExchange, Transaction transaction, Message message) throws Exception {
+        // maxUncommittedCount <= 0 disables
+        int maxUncommittedCount = 
this.getBrokerService().getMaxUncommittedCount();
+        if (maxUncommittedCount > 0 && transaction.size() >= 
maxUncommittedCount) {
+
+            try {
+                // Rollback as we are throwing an error the client as throwing 
the error will cause
+                // the client to reset to a new transaction so we need to 
clean up
+                transaction.rollback();
+
+                // Send ResourceAllocationException which will translate to a 
JMSException
+                final ResourceAllocationException e = new 
ResourceAllocationException(
+                    "Can not send message on transaction with id: '" + 
transaction.getTransactionId().toString()
+                      + "', Transaction has reached the maximum allowed number 
of pending send operations before commit of '"
+                      + maxUncommittedCount + "'", "42900");
+                if(LOG.isDebugEnabled()) {
+                    LOG.warn("ConnectionId:{} exceeded maxUncommittedCount:{} 
for destination:{} in transactionId:{}", 
(producerExchange.getConnectionContext() != null ? 
producerExchange.getConnectionContext().getConnectionId() : "<not set>"), 
maxUncommittedCount, message.getDestination().getQualifiedName(), 
transaction.getTransactionId().toString(), e);

Review Comment:
   This is logging at the warn level which doesn't make sense since you are 
checking if debug is enabled.
   
   The formatting here is also bad and should be multiple lines and not one 
line so it's readable



##########
activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MaxUncommittedCountExceededTest.java:
##########
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.JMX;
+import javax.management.MBeanServer;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import jakarta.jms.Connection;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.ResourceAllocationException;
+import jakarta.jms.Session;
+
+public class MaxUncommittedCountExceededTest {
+
+    public static final String DEFAULT_JMX_DOMAIN_NAME = "org.apache.activemq";
+    public static final String DEFAULT_JMX_BROKER_NAME = "localhost";
+
+    public static final String DEFAULT_JMS_USER = "admin";
+    public static final String DEFAULT_JMS_PASS = "admin";
+
+    protected ActiveMQConnectionFactory activemqConnectionFactory = null;
+    protected BrokerService brokerService = null;
+
+    @Rule
+    public TestName testName = new TestName();
+
+    // Control session
+    protected Connection connection = null;
+    protected Session session = null;
+    protected MessageProducer messageProducer = null;
+
+    protected String methodNameDestinationName = null;
+    protected MBeanServer mbeanServer = null;
+    protected QueueViewMBean queueViewMBean = null;
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setPersistent(true);
+        brokerService.setUseJmx(true);
+        brokerService.addConnector("tcp://localhost:0").setName("Default");
+        brokerService.setBrokerName("localhost");
+        brokerService.start();
+        brokerService.waitUntilStarted(30_000);
+        brokerService.deleteAllMessages();
+        assertNotNull(brokerService);
+
+        activemqConnectionFactory = new 
ActiveMQConnectionFactory(brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString());
+        connection = activemqConnectionFactory.createConnection();
+        connection.start();
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        methodNameDestinationName = "AMQ.TX." + 
cleanParameterizedMethodName(testName.getMethodName().toUpperCase());
+        Queue queue = session.createQueue(methodNameDestinationName);
+        messageProducer = session.createProducer(queue);
+        mbeanServer = ManagementFactory.getPlatformMBeanServer();
+        brokerService.getAdminView().addQueue(methodNameDestinationName);
+        queueViewMBean = getQueueViewMBean(new 
ActiveMQQueue(methodNameDestinationName));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (messageProducer != null) {
+            try {
+                messageProducer.close();
+            } catch (Exception e) {
+            } finally {
+                messageProducer = null;
+            }
+        }
+
+        if (session != null) {
+            try {
+                session.close();
+            } catch (Exception e) {
+            } finally {
+                session = null;
+            }
+        }
+
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Exception e) {
+            } finally {
+                connection = null;
+            }
+        }
+
+        methodNameDestinationName = null;
+        activemqConnectionFactory = null;
+        if(brokerService != null) {
+            brokerService.deleteAllMessages();
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+
+    protected static String cleanParameterizedMethodName(String methodName) {
+        // clean up parameterized method string:
+        // TESTMESSAGETIMESTAMPTIMETOLIVE[DESTINATIONTYPE=QUEUE, 
MESSAGETYPE=BYTES]
+        // returns: TESTMESSAGETIMESTAMPTIMETOLIVE.QUEUE.BYTES
+
+        if (methodName == null || (!methodName.contains("[") && 
!methodName.contains("]"))) {
+            return methodName;
+        }
+
+        String[] step1 = methodName.split("\\[", 2);
+        String[] step2 = step1[1].split("\\]", 2);
+        String[] step3 = step2[0].split(",", 16);
+
+        return step1[0] + "." + step3[0].split("=", 2)[1] + "." + 
step3[1].split("=", 2)[1];
+    }
+
+    protected QueueViewMBean getQueueViewMBean(ActiveMQDestination 
destination) throws Exception {
+        return JMX.newMBeanProxy(mbeanServer, 
BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME,
 DEFAULT_JMX_BROKER_NAME).toString(), destination), QueueViewMBean.class);
+    }
+
+    @Test
+    public void testUncommittedCountExceededSync() throws Exception {
+        assertEquals(Long.valueOf(0l), 
Long.valueOf(brokerService.getAdminView().getTotalMaxUncommittedExceededCount()));
+        assertEquals(Long.valueOf(0l), 
Long.valueOf(queueViewMBean.getMaxUncommittedExceededCount()));
+
+        brokerService.setMaxUncommittedCount(10);
+        boolean caught = false;
+        JMSException caughtException = null;
+
+        ActiveMQConnection activemqConnection = (ActiveMQConnection)connection;
+        activemqConnection.setAlwaysSyncSend(true);
+        activemqConnection.setUseAsyncSend(false);
+        activemqConnection.setProducerWindowSize(10);
+
+        try {
+            for(int i=0; i < 20; i++) {
+                Message message = session.createBytesMessage();
+                message.setIntProperty("IDX", i);
+                messageProducer.send(message);
+            }
+        } catch (ResourceAllocationException e) { 
+            caught = true;
+            caughtException = e;
+        }
+
+        //Thread.sleep(10_000l);

Review Comment:
   Can remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to