[
https://issues.apache.org/jira/browse/AMQ-9344?focusedWorklogId=900191&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-900191
]
ASF GitHub Bot logged work on AMQ-9344:
---------------------------------------
Author: ASF GitHub Bot
Created on: 17/Jan/24 18:09
Start Date: 17/Jan/24 18:09
Worklog Time Spent: 10m
Work Description: mattrpav commented on code in PR #1139:
URL: https://github.com/apache/activemq/pull/1139#discussion_r1456232186
##########
activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java:
##########
@@ -291,13 +294,43 @@ public void send(ProducerBrokerExchange producerExchange,
final Message message)
transaction = getTransaction(context, message.getTransactionId(),
false);
}
context.setTransaction(transaction);
+
try {
+ // [AMQ-9344] Limit uncommitted transactions by count
+ verifyUncommittedCount(producerExchange, transaction, message);
next.send(producerExchange, message);
} finally {
context.setTransaction(originalTx);
}
}
+ protected void verifyUncommittedCount(ProducerBrokerExchange
producerExchange, Transaction transaction, Message message) throws Exception {
+ // maxUncommittedCount <= 0 disables
+ int maxUncommittedCount =
this.getBrokerService().getMaxUncommittedCount();
+ if (maxUncommittedCount > 0 && transaction.size() >=
maxUncommittedCount) {
+
+ try {
+ // Rollback as we are throwing an error the client as throwing
the error will cause
+ // the client to reset to a new transaction so we need to
clean up
+ transaction.rollback();
+
+ // Send ResourceAllocationException which will translate to a
JMSException
+ final ResourceAllocationException e = new
ResourceAllocationException(
+ "Can not send message on transaction with id: '" +
transaction.getTransactionId().toString()
+ + "', Transaction has reached the maximum allowed number
of pending send operations before commit of '"
+ + maxUncommittedCount + "'", "42900");
+ if(LOG.isDebugEnabled()) {
+ LOG.warn("ConnectionId:{} exceeded maxUncommittedCount:{}
for destination:{} in transactionId:{}",
(producerExchange.getConnectionContext() != null ?
producerExchange.getConnectionContext().getConnectionId() : "<not set>"),
maxUncommittedCount, message.getDestination().getQualifiedName(),
transaction.getTransactionId().toString(), e);
Review Comment:
When DEBUG is enabled, the log is at the same level, but includes the
exception
Issue Time Tracking
-------------------
Worklog Id: (was: 900191)
Time Spent: 40m (was: 0.5h)
> Ability to configure a limit on uncommitted message count in a transaction
> --------------------------------------------------------------------------
>
> Key: AMQ-9344
> URL: https://issues.apache.org/jira/browse/AMQ-9344
> Project: ActiveMQ
> Issue Type: New Feature
> Reporter: Matt Pavlovich
> Assignee: Matt Pavlovich
> Priority: Major
> Fix For: 6.1.0
>
> Time Spent: 40m
> Remaining Estimate: 0h
>
> 1. Add a config flag to limit the max number (by count and/or bytes) of
> uncommitted message per transaction
> 2. Add a config flag to limit the max number of simultaneous transactions per
> destination
> On limit exceeding, rollback the tx and return a ResourceAllocationException
> to the client
> Possible default values:
> 6.x - unlimited (current behavior)
> 7.x - 10,000
--
This message was sent by Atlassian Jira
(v8.20.10#820010)