This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 8.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit f698e4dc9b2c9e475002179d45e29575eeeeafe2
Author: Marek Laca <[email protected]>
AuthorDate: Tue May 4 18:28:19 2021 +0200

    QPID-8510: [Broker-J] Incorect use of volatile modifier for array
    
    This closes #84
---
 .../protocol/v1_0/AMQPConnection_1_0Impl.java      | 57 +++++-----------------
 1 file changed, 13 insertions(+), 44 deletions(-)

diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index d46810f..1ffd46d 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -38,15 +38,15 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -233,7 +233,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
 
 
     // Multi session transactions
-    private volatile ServerTransaction[] _openTransactions = new 
ServerTransaction[16];
+    private final Map<Integer, ServerTransaction> _openTransactions = new 
ConcurrentSkipListMap<>();
     private volatile boolean _sendSaslFinalChallengeAsChallenge;
     private volatile String _closeCause;
 
@@ -1846,68 +1846,37 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
     @Override
     public Iterator<ServerTransaction> getOpenTransactions()
     {
-        final AtomicInteger counter = new AtomicInteger(0);
-        return Arrays.stream(_openTransactions)
-                .filter(Objects::nonNull)
-                .map(transaction -> new 
IdentifiedTransaction(counter.getAndIncrement(), 
transaction).getServerTransaction())
-                .collect(Collectors.toList()).iterator();
+        return _openTransactions.values().iterator();
     }
 
     @Override
     public IdentifiedTransaction createIdentifiedTransaction()
     {
-        ServerTransaction[] openTransactions = _openTransactions;
-        final int maxOpenTransactions = openTransactions.length;
+        final LocalTransaction serverTransaction = createLocalTransaction();
         int id = 0;
-        for(; id < maxOpenTransactions; id++)
+        while (id >= 0 && _openTransactions.putIfAbsent(id, serverTransaction) 
!= null)
         {
-            if(openTransactions[id] == null)
-            {
-                break;
-
-            }
+            id++;
         }
-
-        // we need to expand the transaction array;
-        if(id == maxOpenTransactions)
+        if (id < 0)
         {
-            final int newSize = maxOpenTransactions < 1024 ? 
2*maxOpenTransactions : maxOpenTransactions + 1024;
-
-            _openTransactions = new ServerTransaction[newSize];
-            System.arraycopy(openTransactions, 0, _openTransactions, 0, 
maxOpenTransactions);
-
+            throw new IllegalStateException("Unsupported state, too many 
opened transactions");
         }
-
-        final LocalTransaction serverTransaction = createLocalTransaction();
-
-        _openTransactions[id] = serverTransaction;
         return new IdentifiedTransaction(id, serverTransaction);
     }
 
     @Override
     public ServerTransaction getTransaction(final int txnId)
     {
-        try
-        {
-            return _openTransactions[txnId];
-        }
-        catch (ArrayIndexOutOfBoundsException e)
-        {
-            throw new UnknownTransactionException(txnId);
-        }
+        return Optional.ofNullable(_openTransactions.get(txnId))
+                .orElseThrow(() -> new UnknownTransactionException(txnId));
     }
 
     @Override
     public void removeTransaction(final int txnId)
     {
-        try
-        {
-            _openTransactions[txnId] = null;
-        }
-        catch (ArrayIndexOutOfBoundsException e)
-        {
-            throw new UnknownTransactionException(txnId);
-        }
+        Optional.ofNullable(_openTransactions.remove(txnId))
+                .orElseThrow(() -> new UnknownTransactionException(txnId));
     }
 
     @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to