This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch 8.0.x in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit ad1ff1e265bca10cf10add025ad0fd042a76f0a9 Author: aw924 <[email protected]> AuthorDate: Fri Mar 12 11:41:07 2021 +0100 QPID-8509 - java.util.NoSuchElementException in AMQPConnection_1_0Impl.next() This closes #82 --- .../protocol/v1_0/AMQPConnection_1_0Impl.java | 129 ++++++++------------- 1 file changed, 48 insertions(+), 81 deletions(-) diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java index 8fcd9e0..d46810f 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java @@ -38,6 +38,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.NoSuchElementException; import java.util.Queue; import java.util.Set; @@ -45,6 +46,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -132,9 +134,9 @@ import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler> implements DescribedTypeConstructorRegistry.Source, - ValueWriter.Registry.Source, - SASLEndpoint, - AMQPConnection_1_0<AMQPConnection_1_0Impl> + ValueWriter.Registry.Source, + SASLEndpoint, + AMQPConnection_1_0<AMQPConnection_1_0Impl> { private static final Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0Impl.class); @@ -200,11 +202,11 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio private volatile ConnectionState _connectionState = ConnectionState.AWAIT_AMQP_OR_SASL_HEADER; private final AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance() - .registerTransportLayer() - .registerMessagingLayer() - .registerTransactionLayer() - .registerSecurityLayer() - .registerExtensionSoleconnLayer(); + .registerTransportLayer() + .registerMessagingLayer() + .registerTransactionLayer() + .registerSecurityLayer() + .registerExtensionSoleconnLayer(); private final Map<Symbol, Object> _properties = new LinkedHashMap<>(); private volatile boolean _saslComplete; @@ -504,7 +506,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio if (channel > getChannelMax()) { Error error = new Error(ConnectionError.FRAMING_ERROR, - String.format("specified channel %d larger than maximum channel %d", channel, getChannelMax())); + String.format("specified channel %d larger than maximum channel %d", channel, getChannelMax())); handleError(error); return; } @@ -534,7 +536,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio case AWAIT_OPEN: closeReceived(); closeConnection(ConnectionError.CONNECTION_FORCED, - "Connection close sent before connection was opened"); + "Connection close sent before connection was opened"); break; case OPENED: _connectionState = ConnectionState.CLOSE_RECEIVED; @@ -545,7 +547,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio ErrorCondition condition = error.getCondition(); Symbol errorCondition = condition == null ? null : condition.getValue(); LOGGER.info("{} : Connection closed with error : {} - {}", getLogSubject(), - errorCondition, close.getError().getDescription()); + errorCondition, close.getError().getDescription()); } sendClose(new Close()); _connectionState = ConnectionState.CLOSED; @@ -596,15 +598,15 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio public boolean isClosed() { return _connectionState == ConnectionState.CLOSED - || _connectionState == ConnectionState.CLOSE_RECEIVED; + || _connectionState == ConnectionState.CLOSE_RECEIVED; } @Override public boolean isClosing() { return _connectionState == ConnectionState.CLOSED - || _connectionState == ConnectionState.CLOSE_RECEIVED - || _connectionState == ConnectionState.CLOSE_SENT; + || _connectionState == ConnectionState.CLOSE_RECEIVED + || _connectionState == ConnectionState.CLOSE_SENT; } @Override @@ -702,7 +704,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio if (begin.getRemoteChannel() != null) { closeConnection(ConnectionError.FRAMING_ERROR, - "BEGIN received on channel " + "BEGIN received on channel " + receivingChannelId + " with given remote-channel " + begin.getRemoteChannel() @@ -718,17 +720,17 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio if (sendingChannelId == -1) { closeConnection(ConnectionError.FRAMING_ERROR, - "BEGIN received on channel " + "BEGIN received on channel " + receivingChannelId + ". There are no free channels for the broker to respond on."); } else { Session_1_0 session = new Session_1_0(this, - begin, - sendingChannelId, - receivingChannelId, - getContextValue(Long.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE)); + begin, + sendingChannelId, + receivingChannelId, + getContextValue(Long.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE)); session.create(); _receivingSessions[receivingChannelId] = session; @@ -754,7 +756,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio else { closeConnection(ConnectionError.FRAMING_ERROR, - "BEGIN received on channel " + receivingChannelId + " which is already in use."); + "BEGIN received on channel " + receivingChannelId + " which is already in use."); } } @@ -821,15 +823,15 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio int channelMax = getPort().getSessionCountLimit() - 1; _channelMax = open.getChannelMax() == null ? channelMax : open.getChannelMax().intValue() < channelMax - ? open.getChannelMax().intValue() - : channelMax; + ? open.getChannelMax().intValue() + : channelMax; if (_receivingSessions == null) { _receivingSessions = new Session_1_0[_channelMax + 1]; _sendingSessions = new Session_1_0[_channelMax + 1]; } _maxFrameSize = open.getMaxFrameSize() == null - || open.getMaxFrameSize().longValue() > getBroker().getNetworkBufferSize() + || open.getMaxFrameSize().longValue() > getBroker().getNetworkBufferSize() ? getBroker().getNetworkBufferSize() : open.getMaxFrameSize().intValue(); _remoteContainerId = open.getContainerId(); @@ -884,7 +886,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio if (_outgoingIdleTimeout != 0L && _outgoingIdleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT) { closeConnection(ConnectionError.CONNECTION_FORCED, - "Requested idle timeout of " + "Requested idle timeout of " + _outgoingIdleTimeout + " is too low. The minimum supported timeout is" + MINIMUM_SUPPORTED_IDLE_TIMEOUT); @@ -929,14 +931,14 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio { List<ListenableFuture<Void>> rescheduleFutures = new ArrayList<>(); for (AMQPConnection<?> existingConnection : StreamSupport.stream(existingConnections.spliterator(), false) - .filter(con -> con instanceof AMQPConnection_1_0) - .filter(con -> !con.isClosing()) - .filter(con -> con.getRemoteContainerName().equals(newConnection.getRemoteContainerName())) - .collect(Collectors.toList())) + .filter(con -> con instanceof AMQPConnection_1_0) + .filter(con -> !con.isClosing()) + .filter(con -> con.getRemoteContainerName().equals(newConnection.getRemoteContainerName())) + .collect(Collectors.toList())) { SoleConnectionEnforcementPolicy soleConnectionEnforcementPolicy = null; if (((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy - != null) + != null) { soleConnectionEnforcementPolicy = ((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy; @@ -950,9 +952,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio { _properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true); Error error = new Error(AmqpError.INVALID_FIELD, - String.format( - "Connection closed due to sole-connection-enforcement-policy '%s'", - soleConnectionEnforcementPolicy.toString())); + String.format( + "Connection closed due to sole-connection-enforcement-policy '%s'", + soleConnectionEnforcementPolicy.toString())); error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), Symbol.valueOf("container-id"))); newConnection.doOnIOThreadAsync(() -> ((AMQPConnection_1_0Impl) newConnection).closeConnection(error)); proceedWithRegistration = false; @@ -961,9 +963,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio else if (SoleConnectionEnforcementPolicy.CLOSE_EXISTING.equals(soleConnectionEnforcementPolicy)) { final Error error = new Error(AmqpError.RESOURCE_LOCKED, - String.format( - "Connection closed due to sole-connection-enforcement-policy '%s'", - soleConnectionEnforcementPolicy.toString())); + String.format( + "Connection closed due to sole-connection-enforcement-policy '%s'", + soleConnectionEnforcementPolicy.toString())); error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true)); rescheduleFutures.add(existingConnection.doOnIOThreadAsync( () -> ((AMQPConnection_1_0Impl) existingConnection).closeConnection(error))); @@ -1461,9 +1463,9 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio { updateLastWriteTime(); FRAME_LOGGER.debug("SEND[{}|{}] : {}", - getNetwork().getRemoteAddress(), - amqFrame.getChannel(), - amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody()); + getNetwork().getRemoteAddress(), + amqFrame.getChannel(), + amqFrame.getFrameBody() == null ? "<<HEARTBEAT>>" : amqFrame.getFrameBody()); int size = _frameWriter.send(amqFrame); if (size > getMaxFrameSize()) @@ -1726,10 +1728,10 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } if (_remoteDesiredCapabilities != null - && _remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER)) + && _remoteDesiredCapabilities.contains(SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER)) { _properties.put(SoleConnectionConnectionProperties.SOLE_CONNECTION_DETECTION_POLICY, - SoleConnectionDetectionPolicy.STRONG); + SoleConnectionDetectionPolicy.STRONG); } if (_soleConnectionEnforcementPolicy == SoleConnectionEnforcementPolicy.CLOSE_EXISTING) @@ -1844,46 +1846,11 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio @Override public Iterator<ServerTransaction> getOpenTransactions() { - return new Iterator<ServerTransaction>() - { - int _index = 0; - - @Override - public boolean hasNext() - { - for(int i = _index; i < _openTransactions.length; i++) - { - if(_openTransactions[i] != null) - { - return true; - } - } - return false; - } - - @Override - public ServerTransaction next() - { - IdentifiedTransaction txn; - for( ; _index < _openTransactions.length; _index++) - { - if(_openTransactions[_index] != null) - { - txn = new IdentifiedTransaction(_index, _openTransactions[_index]); - _index++; - return txn.getServerTransaction(); - } - } - - throw new NoSuchElementException(); - } - - @Override - public void remove() - { - _openTransactions[_index] = null; - } - }; + final AtomicInteger counter = new AtomicInteger(0); + return Arrays.stream(_openTransactions) + .filter(Objects::nonNull) + .map(transaction -> new IdentifiedTransaction(counter.getAndIncrement(), transaction).getServerTransaction()) + .collect(Collectors.toList()).iterator(); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
