[
https://issues.apache.org/jira/browse/AMQ-9344?focusedWorklogId=900197&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-900197
]
ASF GitHub Bot logged work on AMQ-9344:
---------------------------------------
Author: ASF GitHub Bot
Created on: 17/Jan/24 18:38
Start Date: 17/Jan/24 18:38
Worklog Time Spent: 10m
Work Description: cshannon commented on code in PR #1139:
URL: https://github.com/apache/activemq/pull/1139#discussion_r1456284935
##########
activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java:
##########
@@ -291,13 +294,43 @@ public void send(ProducerBrokerExchange producerExchange,
final Message message)
transaction = getTransaction(context, message.getTransactionId(),
false);
}
context.setTransaction(transaction);
+
try {
+ // [AMQ-9344] Limit uncommitted transactions by count
+ verifyUncommittedCount(producerExchange, transaction, message);
next.send(producerExchange, message);
} finally {
context.setTransaction(originalTx);
}
}
+ protected void verifyUncommittedCount(ProducerBrokerExchange
producerExchange, Transaction transaction, Message message) throws Exception {
+ // maxUncommittedCount <= 0 disables
+ int maxUncommittedCount =
this.getBrokerService().getMaxUncommittedCount();
+ if (maxUncommittedCount > 0 && transaction.size() >=
maxUncommittedCount) {
+
+ try {
+ // Rollback as we are throwing an error the client as throwing
the error will cause
+ // the client to reset to a new transaction so we need to
clean up
+ transaction.rollback();
+
+ // Send ResourceAllocationException which will translate to a
JMSException
+ final ResourceAllocationException e = new
ResourceAllocationException(
+ "Can not send message on transaction with id: '" +
transaction.getTransactionId().toString()
+ + "', Transaction has reached the maximum allowed number
of pending send operations before commit of '"
+ + maxUncommittedCount + "'", "42900");
+ if(LOG.isDebugEnabled()) {
+ LOG.warn("ConnectionId:{} exceeded maxUncommittedCount:{}
for destination:{} in transactionId:{}",
(producerExchange.getConnectionContext() != null ?
producerExchange.getConnectionContext().getConnectionId() : "<not set>"),
maxUncommittedCount, message.getDestination().getQualifiedName(),
transaction.getTransactionId().toString(), e);
Review Comment:
If you are just trying to log the exception then just log the exception as a
separate statement at DEBUG only, it doesn't make a lot of sense to me to
duplicate both things here and log one or the other. Just always log the
message as a warn and then after that log only the exception at debug and it's
a lot simpler
Issue Time Tracking
-------------------
Worklog Id: (was: 900197)
Time Spent: 1h 10m (was: 1h)
> Ability to configure a limit on uncommitted message count in a transaction
> --------------------------------------------------------------------------
>
> Key: AMQ-9344
> URL: https://issues.apache.org/jira/browse/AMQ-9344
> Project: ActiveMQ
> Issue Type: New Feature
> Reporter: Matt Pavlovich
> Assignee: Matt Pavlovich
> Priority: Major
> Fix For: 6.1.0
>
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> 1. Add a config flag to limit the max number (by count and/or bytes) of
> uncommitted message per transaction
> 2. Add a config flag to limit the max number of simultaneous transactions per
> destination
> On limit exceeding, rollback the tx and return a ResourceAllocationException
> to the client
> Possible default values:
> 6.x - unlimited (current behavior)
> 7.x - 10,000
--
This message was sent by Atlassian Jira
(v8.20.10#820010)