This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 8.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit ad1ff1e265bca10cf10add025ad0fd042a76f0a9
Author: aw924 <[email protected]>
AuthorDate: Fri Mar 12 11:41:07 2021 +0100

    QPID-8509 - java.util.NoSuchElementException in 
AMQPConnection_1_0Impl.next()
    
    This closes #82
---
 .../protocol/v1_0/AMQPConnection_1_0Impl.java      | 129 ++++++++-------------
 1 file changed, 48 insertions(+), 81 deletions(-)

diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 8fcd9e0..d46810f 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
@@ -45,6 +46,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -132,9 +134,9 @@ import 
org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 
 public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>
         implements DescribedTypeConstructorRegistry.Source,
-                   ValueWriter.Registry.Source,
-                   SASLEndpoint,
-                   AMQPConnection_1_0<AMQPConnection_1_0Impl>
+        ValueWriter.Registry.Source,
+        SASLEndpoint,
+        AMQPConnection_1_0<AMQPConnection_1_0Impl>
 {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AMQPConnection_1_0Impl.class);
@@ -200,11 +202,11 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
     private volatile ConnectionState _connectionState = 
ConnectionState.AWAIT_AMQP_OR_SASL_HEADER;
 
     private final AMQPDescribedTypeRegistry _describedTypeRegistry = 
AMQPDescribedTypeRegistry.newInstance()
-                                                                               
               .registerTransportLayer()
-                                                                               
               .registerMessagingLayer()
-                                                                               
               .registerTransactionLayer()
-                                                                               
               .registerSecurityLayer()
-                                                                               
               .registerExtensionSoleconnLayer();
+            .registerTransportLayer()
+            .registerMessagingLayer()
+            .registerTransactionLayer()
+            .registerSecurityLayer()
+            .registerExtensionSoleconnLayer();
 
     private final Map<Symbol, Object> _properties = new LinkedHashMap<>();
     private volatile boolean _saslComplete;
@@ -504,7 +506,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
         if (channel > getChannelMax())
         {
             Error error = new Error(ConnectionError.FRAMING_ERROR,
-                                    String.format("specified channel %d larger 
than maximum channel %d", channel, getChannelMax()));
+                    String.format("specified channel %d larger than maximum 
channel %d", channel, getChannelMax()));
             handleError(error);
             return;
         }
@@ -534,7 +536,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
             case AWAIT_OPEN:
                 closeReceived();
                 closeConnection(ConnectionError.CONNECTION_FORCED,
-                                "Connection close sent before connection was 
opened");
+                        "Connection close sent before connection was opened");
                 break;
             case OPENED:
                 _connectionState = ConnectionState.CLOSE_RECEIVED;
@@ -545,7 +547,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
                     ErrorCondition condition = error.getCondition();
                     Symbol errorCondition = condition == null ? null : 
condition.getValue();
                     LOGGER.info("{} : Connection closed with error : {} - {}", 
getLogSubject(),
-                                errorCondition, 
close.getError().getDescription());
+                            errorCondition, close.getError().getDescription());
                 }
                 sendClose(new Close());
                 _connectionState = ConnectionState.CLOSED;
@@ -596,15 +598,15 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
     public boolean isClosed()
     {
         return _connectionState == ConnectionState.CLOSED
-               || _connectionState == ConnectionState.CLOSE_RECEIVED;
+                || _connectionState == ConnectionState.CLOSE_RECEIVED;
     }
 
     @Override
     public boolean isClosing()
     {
         return _connectionState == ConnectionState.CLOSED
-               || _connectionState == ConnectionState.CLOSE_RECEIVED
-               || _connectionState == ConnectionState.CLOSE_SENT;
+                || _connectionState == ConnectionState.CLOSE_RECEIVED
+                || _connectionState == ConnectionState.CLOSE_SENT;
     }
 
     @Override
@@ -702,7 +704,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
         if (begin.getRemoteChannel() != null)
         {
             closeConnection(ConnectionError.FRAMING_ERROR,
-                            "BEGIN received on channel "
+                    "BEGIN received on channel "
                             + receivingChannelId
                             + " with given remote-channel "
                             + begin.getRemoteChannel()
@@ -718,17 +720,17 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
                 if (sendingChannelId == -1)
                 {
                     closeConnection(ConnectionError.FRAMING_ERROR,
-                                    "BEGIN received on channel "
+                            "BEGIN received on channel "
                                     + receivingChannelId
                                     + ". There are no free channels for the 
broker to respond on.");
                 }
                 else
                 {
                     Session_1_0 session = new Session_1_0(this,
-                                                          begin,
-                                                          sendingChannelId,
-                                                          receivingChannelId,
-                                                          
getContextValue(Long.class, 
AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
+                            begin,
+                            sendingChannelId,
+                            receivingChannelId,
+                            getContextValue(Long.class, 
AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
                     session.create();
 
                     _receivingSessions[receivingChannelId] = session;
@@ -754,7 +756,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
             else
             {
                 closeConnection(ConnectionError.FRAMING_ERROR,
-                                "BEGIN received on channel " + 
receivingChannelId + " which is already in use.");
+                        "BEGIN received on channel " + receivingChannelId + " 
which is already in use.");
             }
 
         }
@@ -821,15 +823,15 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
         int channelMax = getPort().getSessionCountLimit() - 1;
         _channelMax = open.getChannelMax() == null ? channelMax
                 : open.getChannelMax().intValue() < channelMax
-                        ? open.getChannelMax().intValue()
-                        : channelMax;
+                ? open.getChannelMax().intValue()
+                : channelMax;
         if (_receivingSessions == null)
         {
             _receivingSessions = new Session_1_0[_channelMax + 1];
             _sendingSessions = new Session_1_0[_channelMax + 1];
         }
         _maxFrameSize = open.getMaxFrameSize() == null
-                        || open.getMaxFrameSize().longValue() > 
getBroker().getNetworkBufferSize()
+                || open.getMaxFrameSize().longValue() > 
getBroker().getNetworkBufferSize()
                 ? getBroker().getNetworkBufferSize()
                 : open.getMaxFrameSize().intValue();
         _remoteContainerId = open.getContainerId();
@@ -884,7 +886,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
         if (_outgoingIdleTimeout != 0L && _outgoingIdleTimeout < 
MINIMUM_SUPPORTED_IDLE_TIMEOUT)
         {
             closeConnection(ConnectionError.CONNECTION_FORCED,
-                            "Requested idle timeout of "
+                    "Requested idle timeout of "
                             + _outgoingIdleTimeout
                             + " is too low. The minimum supported timeout is"
                             + MINIMUM_SUPPORTED_IDLE_TIMEOUT);
@@ -929,14 +931,14 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
                         {
                             List<ListenableFuture<Void>> rescheduleFutures = 
new ArrayList<>();
                             for (AMQPConnection<?> existingConnection : 
StreamSupport.stream(existingConnections.spliterator(), false)
-                                                                               
      .filter(con -> con instanceof AMQPConnection_1_0)
-                                                                               
      .filter(con -> !con.isClosing())
-                                                                               
      .filter(con -> 
con.getRemoteContainerName().equals(newConnection.getRemoteContainerName()))
-                                                                               
      .collect(Collectors.toList()))
+                                    .filter(con -> con instanceof 
AMQPConnection_1_0)
+                                    .filter(con -> !con.isClosing())
+                                    .filter(con -> 
con.getRemoteContainerName().equals(newConnection.getRemoteContainerName()))
+                                    .collect(Collectors.toList()))
                             {
                                 SoleConnectionEnforcementPolicy 
soleConnectionEnforcementPolicy = null;
                                 if (((AMQPConnection_1_0Impl) 
existingConnection)._soleConnectionEnforcementPolicy
-                                    != null)
+                                        != null)
                                 {
                                     soleConnectionEnforcementPolicy =
                                             ((AMQPConnection_1_0Impl) 
existingConnection)._soleConnectionEnforcementPolicy;
@@ -950,9 +952,9 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
                                 {
                                     
_properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
                                     Error error = new 
Error(AmqpError.INVALID_FIELD,
-                                                            String.format(
-                                                                    
"Connection closed due to sole-connection-enforcement-policy '%s'",
-                                                                    
soleConnectionEnforcementPolicy.toString()));
+                                            String.format(
+                                                    "Connection closed due to 
sole-connection-enforcement-policy '%s'",
+                                                    
soleConnectionEnforcementPolicy.toString()));
                                     
error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), 
Symbol.valueOf("container-id")));
                                     newConnection.doOnIOThreadAsync(() -> 
((AMQPConnection_1_0Impl) newConnection).closeConnection(error));
                                     proceedWithRegistration = false;
@@ -961,9 +963,9 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
                                 else if 
(SoleConnectionEnforcementPolicy.CLOSE_EXISTING.equals(soleConnectionEnforcementPolicy))
                                 {
                                     final Error error = new 
Error(AmqpError.RESOURCE_LOCKED,
-                                                                  
String.format(
-                                                                          
"Connection closed due to sole-connection-enforcement-policy '%s'",
-                                                                          
soleConnectionEnforcementPolicy.toString()));
+                                            String.format(
+                                                    "Connection closed due to 
sole-connection-enforcement-policy '%s'",
+                                                    
soleConnectionEnforcementPolicy.toString()));
                                     
error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"),
 true));
                                     
rescheduleFutures.add(existingConnection.doOnIOThreadAsync(
                                             () -> ((AMQPConnection_1_0Impl) 
existingConnection).closeConnection(error)));
@@ -1461,9 +1463,9 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
     {
         updateLastWriteTime();
         FRAME_LOGGER.debug("SEND[{}|{}] : {}",
-                           getNetwork().getRemoteAddress(),
-                           amqFrame.getChannel(),
-                           amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : 
amqFrame.getFrameBody());
+                getNetwork().getRemoteAddress(),
+                amqFrame.getChannel(),
+                amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : 
amqFrame.getFrameBody());
 
         int size = _frameWriter.send(amqFrame);
         if (size > getMaxFrameSize())
@@ -1726,10 +1728,10 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
         }
 
         if (_remoteDesiredCapabilities != null
-            && 
_remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
+                && 
_remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER))
         {
             
_properties.put(SoleConnectionConnectionProperties.SOLE_CONNECTION_DETECTION_POLICY,
-                            SoleConnectionDetectionPolicy.STRONG);
+                    SoleConnectionDetectionPolicy.STRONG);
         }
 
         if (_soleConnectionEnforcementPolicy == 
SoleConnectionEnforcementPolicy.CLOSE_EXISTING)
@@ -1844,46 +1846,11 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
     @Override
     public Iterator<ServerTransaction> getOpenTransactions()
     {
-        return new Iterator<ServerTransaction>()
-        {
-            int _index = 0;
-
-            @Override
-            public boolean hasNext()
-            {
-                for(int i = _index; i < _openTransactions.length; i++)
-                {
-                    if(_openTransactions[i] != null)
-                    {
-                        return true;
-                    }
-                }
-                return false;
-            }
-
-            @Override
-            public ServerTransaction next()
-            {
-                IdentifiedTransaction txn;
-                for( ; _index < _openTransactions.length; _index++)
-                {
-                    if(_openTransactions[_index] != null)
-                    {
-                        txn = new IdentifiedTransaction(_index, 
_openTransactions[_index]);
-                        _index++;
-                        return txn.getServerTransaction();
-                    }
-                }
-
-                throw new NoSuchElementException();
-            }
-
-            @Override
-            public void remove()
-            {
-                _openTransactions[_index] = null;
-            }
-        };
+        final AtomicInteger counter = new AtomicInteger(0);
+        return Arrays.stream(_openTransactions)
+                .filter(Objects::nonNull)
+                .map(transaction -> new 
IdentifiedTransaction(counter.getAndIncrement(), 
transaction).getServerTransaction())
+                .collect(Collectors.toList()).iterator();
     }
 
     @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to