This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new 660acde6b6 QPID-8653: [Broker-J] Code cleanup: collection type
arguments, collection factory methods, lambdas (#211)
660acde6b6 is described below
commit 660acde6b63105cc3f7ac484b15650f1ef3e2349
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Mon Aug 7 12:58:20 2023 +0200
QPID-8653: [Broker-J] Code cleanup: collection type arguments, collection
factory methods, lambdas (#211)
---
.../qpid/server/protocol/ProtocolVersion.java | 7 +-
.../qpid/server/protocol/v0_8/AMQChannel.java | 122 +++++++------------
.../protocol/v0_8/AMQPConnection_0_8Impl.java | 131 +++++++--------------
.../qpid/server/protocol/v0_8/AMQTypeMap.java | 2 +-
.../qpid/server/protocol/v0_8/BrokerDecoder.java | 25 ++--
.../server/protocol/v0_8/ConsumerTarget_0_8.java | 34 +++---
.../qpid/server/protocol/v0_8/IncomingMessage.java | 2 +-
.../v0_8/MessageConverter_Internal_to_v0_8.java | 4 +-
.../v0_8/UnacknowledgedMessageMapImpl.java | 7 +-
.../org/apache/qpid/server/url/BindingURL.java | 31 ++---
10 files changed, 134 insertions(+), 231 deletions(-)
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/ProtocolVersion.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/ProtocolVersion.java
index e5935e4a4f..4923f87829 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/ProtocolVersion.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/ProtocolVersion.java
@@ -143,8 +143,7 @@ public class ProtocolVersion implements Comparable
}
private static final SortedSet<ProtocolVersion> _supportedVersions;
- private static final Map<String, ProtocolVersion> _nameToVersionMap =
- new HashMap<String, ProtocolVersion>();
+ private static final Map<String, ProtocolVersion> _nameToVersionMap = new
HashMap<>();
private static final ProtocolVersion _defaultVersion;
public static final ProtocolVersion v0_10 = new
ProtocolVersion((byte)0,(byte)10);
@@ -153,11 +152,11 @@ public class ProtocolVersion implements Comparable
public static final ProtocolVersion v0_91 = new
ProtocolVersion((byte)0,(byte)91);
public static final ProtocolVersion v0_8 = new
ProtocolVersion((byte)8,(byte)0);
- private static final Map<ProtocolVersion, ProtocolVersion> INSTANCES = new
ConcurrentHashMap<ProtocolVersion,ProtocolVersion>();
+ private static final Map<ProtocolVersion, ProtocolVersion> INSTANCES = new
ConcurrentHashMap<>();
static
{
- SortedSet<ProtocolVersion> versions = new TreeSet<ProtocolVersion>();
+ SortedSet<ProtocolVersion> versions = new TreeSet<>();
versions.add(v0_10);
_nameToVersionMap.put("0-10", v0_10);
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index d575b43765..688f685b79 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -118,14 +118,7 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
private static final Logger LOGGER =
LoggerFactory.getLogger(AMQChannel.class);
private static final InfiniteCreditCreditManager
INFINITE_CREDIT_CREDIT_MANAGER = new InfiniteCreditCreditManager();
private static final Function<MessageConsumerAssociation, MessageInstance>
- MESSAGE_INSTANCE_FUNCTION = new
Function<MessageConsumerAssociation, MessageInstance>()
- {
- @Override
- public MessageInstance apply(final MessageConsumerAssociation input)
- {
- return input.getMessageInstance();
- }
- };
+ MESSAGE_INSTANCE_FUNCTION =
MessageConsumerAssociation::getMessageInstance;
private static final String ALTERNATE_EXCHANGE = "alternateExchange";
private static final AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT =
@@ -140,7 +133,7 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
private final boolean _forceMessageValidation;
/** Maps from consumer tag to subscription instance. Allows us to
unsubscribe from a queue. */
- private final Map<AMQShortString, ConsumerTarget_0_8>
_tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
+ private final Map<AMQShortString, ConsumerTarget_0_8>
_tag2SubscriptionTargetMap = new HashMap<>();
private final MessageStore _messageStore;
@@ -153,7 +146,7 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
private final AMQPConnection_0_8 _connection;
private final AtomicBoolean _closing = new AtomicBoolean(false);
- private final Set<Object> _blockingEntities =
Collections.synchronizedSet(new HashSet<Object>());
+ private final Set<Object> _blockingEntities =
Collections.synchronizedSet(new HashSet<>());
private final AtomicBoolean _blocking = new AtomicBoolean(false);
@@ -229,16 +222,11 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
_clientDeliveryMethod = connection.createDeliveryMethod(_channelId);
- AccessController.doPrivileged((new PrivilegedAction<Object>()
+ AccessController.doPrivileged(((PrivilegedAction<Object>) () ->
{
- @Override
- public Object run()
- {
- message(ChannelMessages.CREATE());
-
- return null;
- }
- }),_accessControllerContext);
+ message(ChannelMessages.CREATE());
+ return null;
+ }), _accessControllerContext);
_forceMessageValidation = connection.getContextValue(Boolean.class,
AMQPConnection_0_8.FORCE_MESSAGE_VALIDATION);
@@ -303,14 +291,10 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
public final void receivedComplete()
{
- AccessController.doPrivileged(new PrivilegedAction<Void>()
+ AccessController.doPrivileged((PrivilegedAction<Void>) () ->
{
- @Override
- public Void run()
- {
- sync();
- return null;
- }
+ sync();
+ return null;
}, getAccessControllerContext());
}
@@ -385,28 +369,23 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
_currentMessage = null;
- final InstanceProperties instanceProperties =
- new InstanceProperties()
- {
- @Override
- public Object getProperty(final Property
prop)
- {
- switch (prop)
- {
- case EXPIRATION:
- return
amqMessage.getExpiration();
- case IMMEDIATE:
- return
amqMessage.isImmediate();
- case PERSISTENT:
- return
amqMessage.isPersistent();
- case MANDATORY:
- return
amqMessage.isMandatory();
- case REDELIVERED:
- return false;
- }
- return null;
- }
- };
+ final InstanceProperties instanceProperties = prop ->
+ {
+ switch (prop)
+ {
+ case EXPIRATION:
+ return amqMessage.getExpiration();
+ case IMMEDIATE:
+ return amqMessage.isImmediate();
+ case PERSISTENT:
+ return amqMessage.isPersistent();
+ case MANDATORY:
+ return amqMessage.isMandatory();
+ case REDELIVERED:
+ return false;
+ }
+ return null;
+ };
final RoutingResult<AMQMessage> result =
destination.route(amqMessage,
@@ -1090,19 +1069,15 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
if(async && _transaction instanceof LocalTransaction)
{
- ((LocalTransaction)_transaction).commitAsync(new Runnable()
+ ((LocalTransaction)_transaction).commitAsync(() ->
{
- @Override
- public void run()
+ try
{
- try
- {
- immediateAction.run();
- }
- finally
- {
- _connection.incrementTransactionBeginCounter();
- }
+ immediateAction.run();
+ }
+ finally
+ {
+ _connection.incrementTransactionBeginCounter();
}
});
}
@@ -1371,7 +1346,7 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
finally
{
- _ackedMessages = Collections.emptySet();
+ _ackedMessages = Set.of();
}
}
@@ -1400,7 +1375,7 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
finally
{
- _ackedMessages = Collections.emptySet();
+ _ackedMessages = Set.of();
}
}
@@ -1566,16 +1541,8 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
int requeues = 0;
if
(messageInstance.makeAcquisitionUnstealable(association.getConsumer()))
{
- requeues = messageInstance.routeToAlternate(new
Action<MessageInstance>()
- {
- @Override
- public void performAction(final MessageInstance
requeueEntry)
- {
-
messageWithSubject(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
-
requeueEntry.getOwningResource()
-
.getName()));
- }
- }, null, null);
+ requeues = messageInstance.routeToAlternate(requeueEntry ->
+
messageWithSubject(ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
requeueEntry.getOwningResource().getName())), null, null);
}
if(requeues == 0)
@@ -2626,7 +2593,7 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
try
{
- Map<String, Object> attributes = new HashMap<String,
Object>();
+ Map<String, Object> attributes = new HashMap<>();
if (arguments != null)
{
attributes.putAll(FieldTable.convertToMap(arguments));
@@ -3404,17 +3371,8 @@ public class AMQChannel extends
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
else
{
- commit(new Runnable()
- {
-
- @Override
- public void run()
- {
- _connection.writeFrame(_txCommitOkFrame);
- }
- }, true);
+ commit(() -> _connection.writeFrame(_txCommitOkFrame), true);
}
-
}
@Override
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index bc04386e94..e28a5129d6 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -23,8 +23,6 @@ package org.apache.qpid.server.protocol.v0_8;
import static java.nio.charset.StandardCharsets.US_ASCII;
import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.BufferUnderflowException;
import java.security.AccessControlException;
@@ -114,7 +112,7 @@ public class AMQPConnection_0_8Impl
* Used so we know which channels we need to call {@link
AMQChannel#receivedComplete()}
* on after handling the frames.
*/
- private final Set<AMQChannel> _channelsForCurrentMessage =
Collections.newSetFromMap(new ConcurrentHashMap<AMQChannel, Boolean>());
+ private final Set<AMQChannel> _channelsForCurrentMessage =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ServerDecoder _decoder;
@@ -160,7 +158,7 @@ public class AMQPConnection_0_8Impl
private volatile SubjectAuthenticationResult
_successfulAuthenticationResult;
private final Set<AMQPSession<?,?>> _sessionsWithWork =
- Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?,?>,
Boolean>());
+ Collections.newSetFromMap(new ConcurrentHashMap<>());
private volatile int _heartBeatDelay;
private volatile String _closeCause;
@@ -314,7 +312,7 @@ public class AMQPConnection_0_8Impl
String locales = "en_US";
- Map<String,Object> props = Collections.emptyMap();
+ Map<String,Object> props = Map.of();
for(ConnectionPropertyEnricher enricher :
getPort().getConnectionPropertyEnrichers())
{
props = enricher.addConnectionProperties(this, props);
@@ -322,8 +320,8 @@ public class AMQPConnection_0_8Impl
FieldTable serverProperties =
FieldTable.convertToFieldTable(props);
- AMQMethodBody responseBody =
getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
-
(short) pv.getActualMinorVersion(),
+ AMQMethodBody responseBody =
getMethodRegistry().createConnectionStartBody(getProtocolMajorVersion(),
+
pv.getActualMinorVersion(),
serverProperties,
mechanisms.getBytes(US_ASCII),
locales.getBytes(US_ASCII));
@@ -706,15 +704,11 @@ public class AMQPConnection_0_8Impl
@Override
public final void readerIdle()
{
- AccessController.doPrivileged(new PrivilegedAction<Object>()
+ AccessController.doPrivileged((PrivilegedAction<Object>) () ->
{
- @Override
- public Object run()
- {
-
getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Current connection
state: " + _state, true));
- getNetwork().close();
- return null;
- }
+ getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Current
connection state: " + _state, true));
+ getNetwork().close();
+ return null;
}, getAccessControllerContext());
}
@@ -756,24 +750,19 @@ public class AMQPConnection_0_8Impl
default:
cause = ErrorCodes.INTERNAL_ERROR;
}
- addAsyncTask(new Action<AMQPConnection_0_8Impl>()
+ addAsyncTask(object ->
{
+ int channelId = session.getChannelId();
+ closeChannel(channelId, cause, message);
- @Override
- public void performAction(final AMQPConnection_0_8Impl object)
- {
- int channelId = session.getChannelId();
- closeChannel(channelId, cause, message);
-
- MethodRegistry methodRegistry = getMethodRegistry();
- ChannelCloseBody responseBody =
- methodRegistry.createChannelCloseBody(
- cause,
- AMQShortString.validValueOf(message),
- 0, 0);
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ChannelCloseBody responseBody =
+ methodRegistry.createChannelCloseBody(
+ cause,
+ AMQShortString.validValueOf(message),
+ 0, 0);
- writeFrame(responseBody.generateFrame(channelId));
- }
+ writeFrame(responseBody.generateFrame(channelId));
});
}
@@ -796,16 +785,10 @@ public class AMQPConnection_0_8Impl
}
_closeCauseCode = cause;
_closeCause = description;
- Action<AMQPConnection_0_8Impl> action = new
Action<AMQPConnection_0_8Impl>()
+ Action<AMQPConnection_0_8Impl> action = object ->
{
- @Override
- public void performAction(final AMQPConnection_0_8Impl object)
- {
- AMQConnectionException e = new AMQConnectionException(cause,
description, 0, 0,
-
getMethodRegistry(),
- null);
- sendConnectionClose(0, e.getCloseFrame());
- }
+ AMQConnectionException e = new AMQConnectionException(cause,
description, 0, 0, getMethodRegistry(), null);
+ sendConnectionClose(0, e.getCloseFrame());
};
addAsyncTask(action);
}
@@ -1295,30 +1278,24 @@ public class AMQPConnection_0_8Impl
assertState(ConnectionState.OPEN);
ServerChannelMethodProcessor channelMethodProcessor =
getChannel(channelId);
- if(channelMethodProcessor == null)
+ if (channelMethodProcessor == null)
{
channelMethodProcessor = (ServerChannelMethodProcessor)
Proxy.newProxyInstance(ServerMethodDispatcher.class.getClassLoader(),
- new Class[] {
ServerChannelMethodProcessor.class }, new InvocationHandler()
+ new Class[] { ServerChannelMethodProcessor.class },
(proxy, method, args) ->
{
- @Override
- public Object invoke(final Object proxy, final Method
method, final Object[] args)
- throws Throwable
+ if (method.getName().equals("receiveChannelCloseOk")
&& channelAwaitingClosure(channelId))
+ {
+ closeChannelOk(channelId);
+ }
+ else if(method.getName().startsWith("receive"))
+ {
+ sendConnectionClose(ErrorCodes.CHANNEL_ERROR,
"Unknown channel id: " + channelId, channelId);
+ }
+ else if(method.getName().equals("ignoreAllButCloseOk"))
{
- if
(method.getName().equals("receiveChannelCloseOk") &&
channelAwaitingClosure(channelId))
- {
- closeChannelOk(channelId);
- }
- else if(method.getName().startsWith("receive"))
- {
- sendConnectionClose(ErrorCodes.CHANNEL_ERROR,
- "Unknown channel id: " + channelId,
channelId);
- }
- else
if(method.getName().equals("ignoreAllButCloseOk"))
- {
- return channelAwaitingClosure(channelId);
- }
- return null;
+ return channelAwaitingClosure(channelId);
}
+ return null;
});
}
return channelMethodProcessor;
@@ -1450,26 +1427,15 @@ public class AMQPConnection_0_8Impl
final Action<? super AMQPConnection_0_8Impl> asyncAction =
_asyncTaskList.poll();
if(asyncAction != null)
{
- return new Runnable()
- {
- @Override
- public void run()
- {
-
asyncAction.performAction(AMQPConnection_0_8Impl.this);
- }
- };
+ return () ->
asyncAction.performAction(AMQPConnection_0_8Impl.this);
}
else
{
// in case the connection was marked as closing
between a call to hasNext() and
// a subsequent call to next()
- return new Runnable()
+ return () ->
{
- @Override
- public void run()
- {
- }
};
}
}
@@ -1480,17 +1446,13 @@ public class AMQPConnection_0_8Impl
_sessionIterator = _sessionsWithWork.iterator();
}
final AMQPSession<?,?> session = _sessionIterator.next();
- return new Runnable()
+ return () ->
{
- @Override
- public void run()
- {
- _sessionIterator.remove();
+ _sessionIterator.remove();
- if (session.processPending())
- {
- _sessionsWithWork.add(session);
- }
+ if (session.processPending())
+ {
+ _sessionsWithWork.add(session);
}
};
}
@@ -1498,14 +1460,7 @@ public class AMQPConnection_0_8Impl
else if(!_asyncTaskList.isEmpty())
{
final Action<? super AMQPConnection_0_8Impl> asyncAction =
_asyncTaskList.poll();
- return new Runnable()
- {
- @Override
- public void run()
- {
- asyncAction.performAction(AMQPConnection_0_8Impl.this);
- }
- };
+ return () ->
asyncAction.performAction(AMQPConnection_0_8Impl.this);
}
else
{
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQTypeMap.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQTypeMap.java
index 1d7c348760..0f8c1cf10b 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQTypeMap.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQTypeMap.java
@@ -25,7 +25,7 @@ import java.util.Map;
public class AMQTypeMap
{
- public static final Map<Byte, AMQType> _reverseTypeMap = new HashMap<Byte,
AMQType>();
+ public static final Map<Byte, AMQType> _reverseTypeMap = new HashMap<>();
private AMQTypeMap()
{
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
index ba0196d007..864e467cd1 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
@@ -93,25 +92,21 @@ public class BrokerDecoder extends ServerDecoder
{
try
{
- return AccessController.doPrivileged(new
PrivilegedExceptionAction<Integer>()
+ return
AccessController.doPrivileged((PrivilegedExceptionAction<Integer>) () ->
{
- @Override
- public Integer run() throws IOException,
AMQFrameDecodingException
+ int required1;
+ while (true)
{
- int required;
- while (true)
- {
- processInput(buf);
+ processInput(buf);
- required = decodable(buf);
- if (required != 0 ||
buf.getUnsignedShort(buf.position() + 1) != channelId)
- {
- break;
- }
+ required1 = decodable(buf);
+ if (required1 != 0 ||
buf.getUnsignedShort(buf.position() + 1) != channelId)
+ {
+ break;
}
-
- return required;
}
+
+ return required1;
}, channel.getAccessControllerContext());
}
catch (PrivilegedActionException e)
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 092de1ccb9..ab6895239c 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -449,22 +449,24 @@ public abstract class ConsumerTarget_0_8 extends
AbstractConsumerTarget<Consumer
_unacknowledgedCount.decrementAndGet();
}
- private final StateChangeListener<MessageInstance, EntryState>
_unacknowledgedMessageListener = new StateChangeListener<MessageInstance,
EntryState>()
- {
- @Override
- public void stateChanged(MessageInstance entry, EntryState oldState,
EntryState newState)
- {
- if (isConsumerAcquiredStateForThis(oldState) &&
!isConsumerAcquiredStateForThis(newState))
+ private final StateChangeListener<MessageInstance, EntryState>
_unacknowledgedMessageListener =
+ new StateChangeListener<>()
{
- removeUnacknowledgedMessage(entry);
- entry.removeStateChangeListener(this);
- }
- }
+ @Override
+ public void stateChanged(MessageInstance entry, EntryState
oldState, EntryState newState)
+ {
+ if (isConsumerAcquiredStateForThis(oldState) &&
!isConsumerAcquiredStateForThis(newState))
+ {
+ removeUnacknowledgedMessage(entry);
+ entry.removeStateChangeListener(this);
+ }
+ }
- private boolean isConsumerAcquiredStateForThis(EntryState state)
- {
- return state instanceof MessageInstance.ConsumerAcquiredState
- && ((MessageInstance.ConsumerAcquiredState)
state).getConsumer().getTarget() == ConsumerTarget_0_8.this;
- }
- };
+ private boolean isConsumerAcquiredStateForThis(EntryState
state)
+ {
+ return state instanceof
MessageInstance.ConsumerAcquiredState
+ && ((MessageInstance.ConsumerAcquiredState)
state).getConsumer().getTarget()
+ == ConsumerTarget_0_8.this;
+ }
+ };
}
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
index e85a2e4a41..fc6d831bf9 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
@@ -32,7 +32,7 @@ public class IncomingMessage
{
private final MessagePublishInfo _messagePublishInfo;
- private final List<ContentBody> _contentChunks = new
ArrayList<ContentBody>();
+ private final List<ContentBody> _contentChunks = new ArrayList<>();
private ContentHeaderBody _contentHeaderBody;
private MessageDestination _messageDestination;
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index 1196d38506..2d8845fc53 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -93,7 +93,7 @@ public class MessageConverter_Internal_to_v0_8 implements
MessageConverter<Inter
convertMetaData(serverMsg, addressSpace, mimeType,
messageContent.length);
final int metadataSize = messageMetaData_0_8.getStorableSize();
- return new StoredMessage<MessageMetaData>()
+ return new StoredMessage<>()
{
@Override
public MessageMetaData getMetaData()
@@ -196,7 +196,7 @@ public class MessageConverter_Internal_to_v0_8 implements
MessageConverter<Inter
props.setEncoding(convertToShortStringForProperty("encoding",
serverMsg.getMessageHeader().getEncoding()));
- Map<String,Object> headerProps = new LinkedHashMap<String, Object>();
+ Map<String,Object> headerProps = new LinkedHashMap<>();
for(String headerName : serverMsg.getMessageHeader().getHeaderNames())
{
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
index a8c57093ee..168610a29e 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
@@ -22,14 +22,13 @@ package org.apache.qpid.server.protocol.v0_8;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
-import org.apache.qpid.server.protocol.ErrorCodes;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
@@ -186,10 +185,10 @@ class UnacknowledgedMessageMapImpl implements
UnacknowledgedMessageMap
final MessageInstance messageInstance =
association.getMessageInstance();
if (messageInstance != null &&
messageInstance.makeAcquisitionUnstealable(association.getConsumer()))
{
- return Collections.singleton(association);
+ return Set.of(association);
}
}
- return Collections.emptySet();
+ return Set.of();
}
}
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/url/BindingURL.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/url/BindingURL.java
index df8df42f22..cceb19a70a 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/url/BindingURL.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/url/BindingURL.java
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.server.url;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -56,21 +53,19 @@ public interface BindingURL
*/
String OPTION_REJECT_BEHAVIOUR = "rejectbehaviour";
- Set<String> NON_CONSUMER_OPTIONS =
- Collections.unmodifiableSet(new
HashSet<String>(Arrays.asList(OPTION_EXCLUSIVE,
-
OPTION_AUTODELETE,
-
OPTION_DURABLE,
-
OPTION_BROWSE,
-
OPTION_ROUTING_KEY,
-
OPTION_BINDING_KEY,
-
OPTION_EXCHANGE_AUTODELETE,
-
OPTION_EXCHANGE_DURABLE,
-
OPTION_EXCHANGE_DURABLE,
-
OPTION_REJECT_BEHAVIOUR,
-
OPTION_SEND_ENCRYPTED,
-
OPTION_ENCRYPTED_RECIPIENTS,
-
OPTION_DELIVERY_DELAY,
-
OPTION_LOCAL_ADDRESS)));
+ Set<String> NON_CONSUMER_OPTIONS = Set.of(OPTION_EXCLUSIVE,
+ OPTION_AUTODELETE,
+ OPTION_DURABLE,
+ OPTION_BROWSE,
+ OPTION_ROUTING_KEY,
+ OPTION_BINDING_KEY,
+ OPTION_EXCHANGE_AUTODELETE,
+ OPTION_EXCHANGE_DURABLE,
+ OPTION_REJECT_BEHAVIOUR,
+ OPTION_SEND_ENCRYPTED,
+ OPTION_ENCRYPTED_RECIPIENTS,
+ OPTION_DELIVERY_DELAY,
+ OPTION_LOCAL_ADDRESS);
String getURL();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]