This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch 8.0.x in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit f698e4dc9b2c9e475002179d45e29575eeeeafe2 Author: Marek Laca <[email protected]> AuthorDate: Tue May 4 18:28:19 2021 +0200 QPID-8510: [Broker-J] Incorect use of volatile modifier for array This closes #84 --- .../protocol/v1_0/AMQPConnection_1_0Impl.java | 57 +++++----------------- 1 file changed, 13 insertions(+), 44 deletions(-) diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java index d46810f..1ffd46d 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java @@ -38,15 +38,15 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.NoSuchElementException; +import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -233,7 +233,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio // Multi session transactions - private volatile ServerTransaction[] _openTransactions = new ServerTransaction[16]; + private final Map<Integer, ServerTransaction> _openTransactions = new ConcurrentSkipListMap<>(); private volatile boolean _sendSaslFinalChallengeAsChallenge; private volatile String _closeCause; @@ -1846,68 +1846,37 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio @Override public Iterator<ServerTransaction> getOpenTransactions() { - final AtomicInteger counter = new AtomicInteger(0); - return Arrays.stream(_openTransactions) - .filter(Objects::nonNull) - .map(transaction -> new IdentifiedTransaction(counter.getAndIncrement(), transaction).getServerTransaction()) - .collect(Collectors.toList()).iterator(); + return _openTransactions.values().iterator(); } @Override public IdentifiedTransaction createIdentifiedTransaction() { - ServerTransaction[] openTransactions = _openTransactions; - final int maxOpenTransactions = openTransactions.length; + final LocalTransaction serverTransaction = createLocalTransaction(); int id = 0; - for(; id < maxOpenTransactions; id++) + while (id >= 0 && _openTransactions.putIfAbsent(id, serverTransaction) != null) { - if(openTransactions[id] == null) - { - break; - - } + id++; } - - // we need to expand the transaction array; - if(id == maxOpenTransactions) + if (id < 0) { - final int newSize = maxOpenTransactions < 1024 ? 2*maxOpenTransactions : maxOpenTransactions + 1024; - - _openTransactions = new ServerTransaction[newSize]; - System.arraycopy(openTransactions, 0, _openTransactions, 0, maxOpenTransactions); - + throw new IllegalStateException("Unsupported state, too many opened transactions"); } - - final LocalTransaction serverTransaction = createLocalTransaction(); - - _openTransactions[id] = serverTransaction; return new IdentifiedTransaction(id, serverTransaction); } @Override public ServerTransaction getTransaction(final int txnId) { - try - { - return _openTransactions[txnId]; - } - catch (ArrayIndexOutOfBoundsException e) - { - throw new UnknownTransactionException(txnId); - } + return Optional.ofNullable(_openTransactions.get(txnId)) + .orElseThrow(() -> new UnknownTransactionException(txnId)); } @Override public void removeTransaction(final int txnId) { - try - { - _openTransactions[txnId] = null; - } - catch (ArrayIndexOutOfBoundsException e) - { - throw new UnknownTransactionException(txnId); - } + Optional.ofNullable(_openTransactions.remove(txnId)) + .orElseThrow(() -> new UnknownTransactionException(txnId)); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
