http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java index d5e8df4..d932274 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -48,7 +48,6 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext { super(name, remotingConnection, sessionChannel, serverVersion, confirmationWindow); } - @Override public ClientSession.QueueQuery queueQuery(final SimpleString queueName) throws ActiveMQException { SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName); @@ -69,7 +68,6 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext { return new CreateSessionMessage(getName(), getSessionChannel().getID(), 123, username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge, getConfirmationWindow(), defaultAddress == null ? null : defaultAddress.toString()); } - @Override public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException { SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP); @@ -101,5 +99,4 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext { return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); } - }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java index ee71162..cfb8d2f 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/util/HQPropertiesConverter.java @@ -51,7 +51,7 @@ public class HQPropertiesConverter { d = new HashMap<>(); // inverting the direction - for (Map.Entry<SimpleString, SimpleString> entry: hqAmqDictionary.entrySet()) { + for (Map.Entry<SimpleString, SimpleString> entry : hqAmqDictionary.entrySet()) { d.put(entry.getValue(), entry.getKey()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/pom.xml b/artemis-protocols/artemis-mqtt-protocol/pom.xml index ae6bf62..2260d51 100644 --- a/artemis-protocols/artemis-mqtt-protocol/pom.xml +++ b/artemis-protocols/artemis-mqtt-protocol/pom.xml @@ -14,7 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>artemis-protocols</artifactId> <groupId>org.apache.activemq</groupId> @@ -48,27 +49,27 @@ <groupId>io.netty</groupId> <artifactId>netty-codec-mqtt</artifactId> </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.felix</groupId> - <artifactId>maven-bundle-plugin</artifactId> - <version>3.0.0</version> - <extensions>true</extensions> - <configuration> - <instructions> - <Embed-Dependency>netty-codec-mqtt</Embed-Dependency> - <Export-Package>!*</Export-Package> - <Import-Package>io.netty.*;version="[4,6)", *</Import-Package> - </instructions> - </configuration> - </plugin> - </plugins> - </build> + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <version>3.0.0</version> + <extensions>true</extensions> + <configuration> + <instructions> + <Embed-Dependency>netty-codec-mqtt</Embed-Dependency> + <Export-Package>!*</Export-Package> + <Import-Package>io.netty.*;version="[4,6)", *</Import-Package> + </instructions> + </configuration> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index 31486ff..446e362 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index a05ecb5..3a1f447 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,6 +17,9 @@ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.util.Set; +import java.util.UUID; + import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -26,9 +29,6 @@ import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.UUIDGenerator; -import java.util.Set; -import java.util.UUID; - /** * MQTTConnectionMananager is responsible for handle Connect and Disconnect packets and any resulting behaviour of these * events. @@ -96,12 +96,7 @@ public class MQTTConnectionManager { String id = UUIDGenerator.getInstance().generateStringUUID(); ActiveMQServer server = session.getServer(); - ServerSession serverSession = server.createSession(id, username, password, - ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, - session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, - MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, - MQTTUtil.SESSION_XA, null, session.getSessionCallback(), - MQTTUtil.SESSION_AUTO_CREATE_QUEUE); + ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null, session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE); return (ServerSessionImpl) serverSession; } @@ -122,8 +117,7 @@ public class MQTTConnectionManager { session.stop(); session.getConnection().disconnect(false); session.getConnection().destroy(); - } - catch (Exception e) { + } catch (Exception e) { /* FIXME Failure during disconnect would leave the session state in an unrecoverable state. We should handle errors more gracefully. */ @@ -144,8 +138,7 @@ public class MQTTConnectionManager { if (cleanSession) { MQTTSession.SESSIONS.remove(clientId); return new MQTTSessionState(clientId); - } - else { + } else { /* [MQTT-3.1.2-4] Attach an existing session if one exists (if cleanSession flag is false) otherwise create a new one. */ MQTTSessionState state = MQTTSession.SESSIONS.get(clientId); @@ -155,8 +148,7 @@ public class MQTTConnectionManager { Thread.sleep(1000); } return state; - } - else { + } else { state = new MQTTSessionState(clientId); MQTTSession.SESSIONS.put(clientId, state); return state; @@ -170,14 +162,14 @@ public class MQTTConnectionManager { // [MQTT-3.1.3-7] [MQTT-3.1.3-6] If client does not specify a client ID and clean session is set to 1 create it. if (cleanSession) { clientId = UUID.randomUUID().toString(); - } - else { + } else { // [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null return null; } - } - // If the client ID is not unique (i.e. it has already registered) then do not accept it. - else if (!CONNECTED_CLIENTS.add(clientId)) { + } else if (!CONNECTED_CLIENTS.add(clientId)) { + // ^^^ If the client ID is not unique (i.e. it has already registered) then do not accept it. + + // [MQTT-3.1.3-9] Return ID Rejected if server rejects the client ID return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTFailureListener.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTFailureListener.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTFailureListener.java index ee03bba..7bd9fad 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTFailureListener.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTFailureListener.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTInterceptor.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTInterceptor.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTInterceptor.java index ba22f25..9034655 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTInterceptor.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTInterceptor.java @@ -23,4 +23,5 @@ import io.netty.handler.codec.mqtt.MqttMessage; import org.apache.activemq.artemis.api.core.BaseInterceptor; public interface MQTTInterceptor extends BaseInterceptor<MqttMessage> { + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java index 08f552a..0a8f24b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTMessageInfo.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTMessageInfo.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTMessageInfo.java index ffa6202..53785fa 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTMessageInfo.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTMessageInfo.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 17fc978..68648cd 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -144,8 +144,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { default: disconnect(); } - } - catch (Exception e) { + } catch (Exception e) { log.warn("Error processing Control Packet, Disconnecting Client" + e.getMessage()); disconnect(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index 17f7e33..f8bdf2a 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -41,8 +41,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection; /** * MQTTProtocolManager */ -class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterceptor,MQTTConnection> - implements NotificationListener { +class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInterceptor, MQTTConnection> implements NotificationListener { private static final List<String> websocketRegistryNames = Arrays.asList("mqtt", "mqttv3.1"); @@ -52,7 +51,9 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterc private final List<MQTTInterceptor> incomingInterceptors = new ArrayList<>(); private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>(); - MQTTProtocolManager(ActiveMQServer server, List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) { + MQTTProtocolManager(ActiveMQServer server, + List<BaseInterceptor> incomingInterceptors, + List<BaseInterceptor> outgoingInterceptors) { this.server = server; this.updateInterceptors(incomingInterceptors, outgoingInterceptors); } @@ -86,8 +87,7 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterc MQTTProtocolHandler protocolHandler = nettyConnection.getChannel().pipeline().get(MQTTProtocolHandler.class); protocolHandler.setConnection(mqttConnection, entry); return entry; - } - catch (Exception e) { + } catch (Exception e) { log.error(e); return null; } @@ -98,7 +98,6 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterc return false; } - @Override public void removeHandler(String name) { // TODO add support for handlers http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java index 58826f6..553f9ad 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index c985c0f..96c6bf6 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -104,14 +104,12 @@ public class MQTTPublishManager { // This is to allow retries of PubRel. if (isManagementConsumer(consumer)) { sendPubRelMessage(message); - } - else { + } else { int qos = decideQoS(message, consumer); if (qos == 0) { sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos); session.getServerSession().acknowledge(consumer.getID(), message.getMessageID()); - } - else { + } else { String consumerAddress = consumer.getQueue().getAddress().toString(); Integer mqttid = generateMqttId(qos); @@ -160,8 +158,7 @@ public class MQTTPublishManager { public void done() { if (qos == 1) { session.getProtocolHandler().sendPubAck(messageId); - } - else if (qos == 2) { + } else if (qos == 2) { session.getProtocolHandler().sendPubRec(messageId); } } @@ -220,8 +217,7 @@ public class MQTTPublishManager { payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length); payload.writeBytes(stringPayload); break; - } - catch (UnsupportedEncodingException e) { + } catch (UnsupportedEncodingException e) { e.printStackTrace(); // Do nothing default to sending raw bytes. } @@ -244,4 +240,4 @@ public class MQTTPublishManager { is less than the subscription QoS then use it, otherwise use the subscription qos). */ return subscriptionQoS < qos ? subscriptionQoS : qos; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java index dd6bbc0..c48f6aa 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index d9c819c..059948f 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,14 +17,14 @@ package org.apache.activemq.artemis.core.protocol.mqtt; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; -import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; - import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; +import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; + public class MQTTSession { static Map<String, MQTTSessionState> SESSIONS = new ConcurrentHashMap<>(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index ea3fa0f..fa282d4 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -42,11 +42,13 @@ public class MQTTSessionCallback implements SessionCallback { } @Override - public int sendMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, int deliveryCount) { + public int sendMessage(MessageReference reference, + ServerMessage message, + ServerConsumer consumer, + int deliveryCount) { try { session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage()); } @@ -58,7 +60,6 @@ public class MQTTSessionCallback implements SessionCallback { return false; } - @Override public int sendLargeMessageContinuation(ServerConsumer consumerID, byte[] body, @@ -69,7 +70,11 @@ public class MQTTSessionCallback implements SessionCallback { } @Override - public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { + public int sendLargeMessage(MessageReference reference, + ServerMessage message, + ServerConsumer consumer, + long bodySize, + int deliveryCount) { return sendMessage(reference, message, consumer, deliveryCount); } @@ -77,8 +82,7 @@ public class MQTTSessionCallback implements SessionCallback { public void disconnect(ServerConsumer consumer, String queueName) { try { consumer.removeItself(); - } - catch (Exception e) { + } catch (Exception e) { log.error(e.getMessage()); } } @@ -88,7 +92,6 @@ public class MQTTSessionCallback implements SessionCallback { } - @Override public void browserFinished(ServerConsumer consumer) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java index 9bc3074..dd7a360 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java @@ -84,8 +84,7 @@ public class MQTTSessionState { if (qos == 2) { if (reverseOutboundReferenceStore.containsKey(address)) { reverseOutboundReferenceStore.get(address).put(serverMessageId, mqttId); - } - else { + } else { ConcurrentHashMap<Long, Integer> serverToMqttId = new ConcurrentHashMap<>(); serverToMqttId.put(serverMessageId, mqttId); reverseOutboundReferenceStore.put(address, serverToMqttId); @@ -159,8 +158,7 @@ public class MQTTSessionState { subscriptions.put(subscription.topicName(), subscription); return true; } - } - else { + } else { subscriptions.put(subscription.topicName(), subscription); return true; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index 9c18e45..ea3ab19 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -121,8 +121,7 @@ public class MQTTSubscriptionManager { if (s == null) { createConsumerForSubscriptionQueue(q, topic, qos); - } - else { + } else { consumerQoSLevels.put(consumers.get(topic).getID(), qos); } session.getRetainMessageManager().addRetainedMessagesToQueue(q, topic); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 2313248..e6affc1 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -156,8 +156,7 @@ public class MQTTUtil { if (message.variableHeader() instanceof MqttPublishVariableHeader) { log.append("(" + ((MqttPublishVariableHeader) message.variableHeader()).messageId() + ") " + message.fixedHeader().qosLevel()); - } - else if (message.variableHeader() instanceof MqttMessageIdVariableHeader) { + } else if (message.variableHeader() instanceof MqttMessageIdVariableHeader) { log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-openwire-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/pom.xml b/artemis-protocols/artemis-openwire-protocol/pom.xml index 9ad0db3..423638c 100644 --- a/artemis-protocols/artemis-openwire-protocol/pom.xml +++ b/artemis-protocols/artemis-openwire-protocol/pom.xml @@ -14,7 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>artemis-protocols</artifactId> <groupId>org.apache.activemq</groupId> @@ -49,8 +50,8 @@ <artifactId>activemq-client</artifactId> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index a29b3ee..c6582bd 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -251,15 +251,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se try { setLastCommand(command); response = command.visit(commandProcessorInstance); - } - catch (Exception e) { + } catch (Exception e) { // TODO: logging e.printStackTrace(); if (responseRequired) { response = convertException(e); } - } - finally { + } finally { setLastCommand(null); } @@ -294,8 +292,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se dispatchSync(response); } } - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.debug(e); sendException(e); @@ -306,8 +303,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se Response resp = convertException(e); try { dispatch(resp); - } - catch (IOException e2) { + } catch (IOException e2) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2); } } @@ -316,11 +312,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se Response resp; if (e instanceof ActiveMQSecurityException) { resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); - } - else if (e instanceof ActiveMQNonExistentQueueException) { + } else if (e instanceof ActiveMQNonExistentQueueException) { resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage())); - } - else { + } else { resp = new ExceptionResponse(e); } return resp; @@ -374,8 +368,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se for (final FailureListener listener : listenersClone) { try { listener.connectionFailed(me, false); - } - catch (final Throwable t) { + } catch (final Throwable t) { // Failure of one listener to execute shouldn't prevent others // from // executing @@ -402,11 +395,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se getTransportConnection().write(buffer, false, false); } bufferSent(); - } - catch (IOException e) { + } catch (IOException e) { throw e; - } - catch (Throwable t) { + } catch (Throwable t) { ActiveMQServerLogger.LOGGER.error("error sending", t); } @@ -439,8 +430,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } dispatch(command); } - } - catch (IOException e) { + } catch (IOException e) { if (messageDispatch != null) { TransmitCallback sub = messageDispatch.getTransmitCallback(); protocolManager.postProcessDispatch(messageDispatch); @@ -450,8 +440,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se messageDispatch = null; throw e; } - } - finally { + } finally { if (messageDispatch != null) { TransmitCallback sub = messageDispatch.getTransmitCallback(); protocolManager.postProcessDispatch(messageDispatch); @@ -467,8 +456,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se if (result == null) { if (consumerList.size() == 1) { result = new AMQSingleConsumerBrokerExchange(amqSession, consumerList.get(0)); - } - else { + } else { result = new AMQCompositeConsumerBrokerExchange(amqSession, consumerList); } synchronized (consumerExchanges) { @@ -523,8 +511,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se if (fail) { transportConnection.forceClose(); - } - else { + } else { transportConnection.close(); } } @@ -558,8 +545,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se lastResponse.setCorrelationId(command.getCommandId()); try { dispatchSync(lastResponse); - } - catch (Throwable e) { + } catch (Throwable e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); } } @@ -578,8 +564,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } try { protocolManager.removeConnection(this.getConnectionInfo(), me); - } - catch (InvalidClientIDException e) { + } catch (InvalidClientIDException e) { ActiveMQServerLogger.LOGGER.warn("Couldn't close connection because invalid clientID", e); } shutdown(true); @@ -682,11 +667,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se try { physicalSend(command); - } - catch (Exception e) { + } catch (Exception e) { return false; - } - catch (Throwable t) { + } catch (Throwable t) { return false; } return true; @@ -700,8 +683,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se if (binding == null) { if (dest.isTemporary()) { internalSession.createQueue(qName, qName, null, dest.isTemporary(), false); - } - else { + } else { ConnectionInfo connInfo = getState().getInfo(); CheckType checkType = dest.isTemporary() ? CheckType.CREATE_NON_DURABLE_QUEUE : CheckType.CREATE_DURABLE_QUEUE; server.getSecurityStore().check(qName, checkType, this); @@ -761,7 +743,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se this.connectionEntry = connectionEntry; } - public void setUpTtl(final long inactivityDuration, final long inactivityDurationInitialDelay, final boolean useKeepAlive) { + public void setUpTtl(final long inactivityDuration, + final long inactivityDurationInitialDelay, + final boolean useKeepAlive) { this.useKeepAlive = useKeepAlive; this.maxInactivityDuration = inactivityDuration; @@ -787,8 +771,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se try { advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, amqConsumer.getId().toString()); protocolManager.fireAdvisory(context, topic, advisoryMessage, amqConsumer.getId()); - } - catch (Exception e) { + } catch (Exception e) { // TODO-NOW: LOGGING e.printStackTrace(); } @@ -833,8 +816,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public void removeDestination(ActiveMQDestination dest) throws Exception { if (dest.isQueue()) { server.destroyQueue(OpenWireUtil.toCoreAddress(dest)); - } - else { + } else { Bindings bindings = server.getPostOffice().getBindingsForAddress(OpenWireUtil.toCoreAddress(dest)); for (Binding binding : bindings.getBindings()) { @@ -886,8 +868,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processAddConnection(ConnectionInfo info) throws Exception { try { protocolManager.addConnection(OpenWireConnection.this, info); - } - catch (Exception e) { + } catch (Exception e) { Response resp = new ExceptionResponse(e); return resp; } @@ -977,8 +958,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se for (ProducerId producerId : session.getProducerIds()) { try { processRemoveProducer(producerId); - } - catch (Throwable e) { + } catch (Throwable e) { // LOG.warn("Failed to remove producer: {}", producerId, e); } } @@ -1000,8 +980,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se Transaction tx = lookupTX(info.getTransactionId(), null); if (info.getTransactionId().isXATransaction() && tx == null) { throw newXAException("Transaction '" + info.getTransactionId() + "' has not been started.", XAException.XAER_NOTA); - } - else if (tx != null) { + } else if (tx != null) { AMQSession amqSession = (AMQSession) tx.getProtocolData(); @@ -1010,8 +989,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se try { returnReferences(tx, amqSession); - } - finally { + } finally { amqSession.getCoreSession().resetTX(null); } } @@ -1074,12 +1052,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se Response resp = null; try { addDestination(dest); - } - catch (Exception e) { + } catch (Exception e) { if (e instanceof ActiveMQSecurityException) { resp = new ExceptionResponse(new JMSSecurityException(e.getMessage())); - } - else { + } else { resp = new ExceptionResponse(e); } } @@ -1106,8 +1082,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se if (txID.isXATransaction()) { Xid xid = OpenWireUtil.toXID(txID); internalSession.xaStart(xid); - } - else { + } else { Transaction transaction = internalSession.newTransaction(); txMap.put(txID, transaction); transaction.addOperation(new TransactionOperationAbstract() { @@ -1117,8 +1092,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } }); } - } - finally { + } finally { internalSession.resetTX(null); clearOpeartionContext(); } @@ -1140,8 +1114,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se setOperationContext(session); try { tx.commit(onePhase); - } - finally { + } finally { clearOpeartionContext(); } @@ -1163,17 +1136,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se try { Xid xid = OpenWireUtil.toXID(info.getTransactionId()); internalSession.xaForget(xid); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); throw e; } - } - else { + } else { txMap.remove(txID); } - } - finally { + } finally { clearOpeartionContext(); } @@ -1190,18 +1160,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se try { Xid xid = OpenWireUtil.toXID(info.getTransactionId()); internalSession.xaPrepare(xid); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); throw e; } - } - else { + } else { Transaction tx = lookupTX(txID, null); tx.prepare(); } - } - finally { + } finally { internalSession.resetTX(null); clearOpeartionContext(); } @@ -1221,17 +1188,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se try { Xid xid = OpenWireUtil.toXID(info.getTransactionId()); internalSession.xaEnd(xid); - } - finally { + } finally { internalSession.resetTX(null); } - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); throw e; } - } - else { + } else { txMap.remove(txID); clearOpeartionContext(); } @@ -1263,8 +1227,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se //after successful reconnect try { updateConsumer(consumerControl); - } - catch (Exception e) { + } catch (Exception e) { //log error } return null; @@ -1301,13 +1264,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se session.getCoreSession().resetTX(tx); try { session.send(producerInfo, messageSend, sendProducerAck); - } - finally { + } finally { session.getCoreSession().resetTX(null); clearOpeartionContext(); } - return null; } @@ -1321,8 +1282,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se try { AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId()); consumerBrokerExchange.acknowledge(ack); - } - finally { + } finally { session.getCoreSession().resetTX(null); clearOpeartionContext(); } @@ -1371,8 +1331,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se //we let protocol manager to handle connection add/remove try { protocolManager.removeConnection(state.getInfo(), null); - } - catch (Throwable e) { + } catch (Throwable e) { // log } return null; @@ -1405,14 +1364,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se OperationContext ctx; if (session == null) { ctx = this.internalSession.getSessionContext(); - } - else { + } else { ctx = session.getCoreSession().getSessionContext(); } server.getStorageManager().setContext(ctx); } - private void clearOpeartionContext() { server.getStorageManager().clearContext(); } @@ -1427,8 +1384,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se if (txID.isXATransaction()) { xid = OpenWireUtil.toXID(txID); transaction = server.getResourceManager().getTransaction(xid); - } - else { + } else { transaction = txMap.get(txID); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index fd6aef2..131cfd1 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -94,7 +94,6 @@ public class OpenWireMessageConverter implements MessageConverter { private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE"; private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED"; - private final WireFormat marshaller; public OpenWireMessageConverter(WireFormat marshaller) { @@ -107,11 +106,10 @@ public class OpenWireMessageConverter implements MessageConverter { return null; } - @Override public ServerMessage inbound(Object message) throws Exception { - Message messageSend = (Message)message; + Message messageSend = (Message) message; ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize()); String type = messageSend.getType(); @@ -131,8 +129,7 @@ public class OpenWireMessageConverter implements MessageConverter { ByteSequence contents = messageSend.getContent(); if (contents == null && coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) { body.writeNullableString(null); - } - else if (contents != null) { + } else if (contents != null) { boolean messageCompressed = messageSend.isCompressed(); if (messageCompressed) { coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed); @@ -265,11 +262,9 @@ public class OpenWireMessageConverter implements MessageConverter { int count = inflater.inflate(buffer); decompressed.write(buffer, 0, count); contents = decompressed.toByteSequence(); - } - catch (Exception e) { + } catch (Exception e) { throw new IOException(e); - } - finally { + } finally { inflater.end(); } } @@ -281,8 +276,7 @@ public class OpenWireMessageConverter implements MessageConverter { OutputStream os = new InflaterOutputStream(decompressed)) { os.write(contents.data, contents.offset, contents.getLength()); contents = decompressed.toByteSequence(); - } - catch (Exception e) { + } catch (Exception e) { throw new IOException(e); } } @@ -356,8 +350,7 @@ public class OpenWireMessageConverter implements MessageConverter { Object value = ent.getValue(); try { coreMessage.putObjectProperty(ent.getKey(), value); - } - catch (ActiveMQPropertyConversionException e) { + } catch (ActiveMQPropertyConversionException e) { coreMessage.putStringProperty(ent.getKey(), value.toString()); } } @@ -372,7 +365,6 @@ public class OpenWireMessageConverter implements MessageConverter { ConsumerId consumerId = messageSend.getTargetConsumerId(); - String userId = messageSend.getUserID(); if (userId != null) { coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId); @@ -421,7 +413,8 @@ public class OpenWireMessageConverter implements MessageConverter { } } - public static MessageDispatch createMessageDispatch(MessageReference reference, ServerMessage message, + public static MessageDispatch createMessageDispatch(MessageReference reference, + ServerMessage message, AMQConsumer consumer) throws IOException, JMSException { ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination()); @@ -438,7 +431,10 @@ public class OpenWireMessageConverter implements MessageConverter { return md; } - private static ActiveMQMessage toAMQMessage(MessageReference reference, ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException { + private static ActiveMQMessage toAMQMessage(MessageReference reference, + ServerMessage coreMessage, + WireFormat marshaller, + ActiveMQDestination actualDestination) throws IOException { ActiveMQMessage amqMsg = null; byte coreType = coreMessage.getType(); switch (coreType) { @@ -480,7 +476,7 @@ public class OpenWireMessageConverter implements MessageConverter { amqMsg.setBrokerInTime(brokerInTime); ActiveMQBuffer buffer = coreMessage.getBodyBufferDuplicate(); - Boolean compressProp = (Boolean)coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); + Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); boolean isCompressed = compressProp == null ? false : compressProp.booleanValue(); amqMsg.setCompressed(isCompressed); @@ -501,8 +497,7 @@ public class OpenWireMessageConverter implements MessageConverter { bytes = bytesOut.toByteArray(); } } - } - else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) { + } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) { TypedProperties mapData = new TypedProperties(); mapData.decode(buffer); @@ -516,8 +511,7 @@ public class OpenWireMessageConverter implements MessageConverter { MarshallingSupport.marshalPrimitiveMap(map, dataOut); } bytes = out.toByteArray(); - } - else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { + } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) { int len = buffer.readInt(); bytes = new byte[len]; buffer.readBytes(bytes); @@ -528,8 +522,7 @@ public class OpenWireMessageConverter implements MessageConverter { } bytes = bytesOut.toByteArray(); } - } - else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) { + } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) { org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream(); OutputStream out = bytesOut; if (isCompressed) { @@ -578,8 +571,7 @@ public class OpenWireMessageConverter implements MessageConverter { String string = buffer.readNullableString(); if (string == null) { MarshallingSupport.marshalNull(dataOut); - } - else { + } else { MarshallingSupport.marshalString(dataOut, string); } break; @@ -591,8 +583,7 @@ public class OpenWireMessageConverter implements MessageConverter { } } bytes = bytesOut.toByteArray(); - } - else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) { + } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) { int n = buffer.readableBytes(); bytes = new byte[n]; buffer.readBytes(bytes); @@ -611,13 +602,11 @@ public class OpenWireMessageConverter implements MessageConverter { ByteSequence byteSeq = compressed.toByteSequence(); ByteSequenceData.writeIntBig(byteSeq, length); bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length); - } - finally { + } finally { deflater.end(); } } - } - else { + } else { int n = buffer.readableBytes(); bytes = new byte[n]; buffer.readBytes(bytes); @@ -706,8 +695,7 @@ public class OpenWireMessageConverter implements MessageConverter { if (midBytes != null) { ByteSequence midSeq = new ByteSequence(midBytes); mid = (MessageId) marshaller.unmarshal(midSeq); - } - else { + } else { mid = new MessageId(UUIDGenerator.getInstance().generateStringUUID() + ":-1"); } @@ -744,7 +732,6 @@ public class OpenWireMessageConverter implements MessageConverter { amqMsg.setReplyTo(replyTo); } - String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID); if (userId != null) { amqMsg.setUserID(userId); @@ -759,8 +746,7 @@ public class OpenWireMessageConverter implements MessageConverter { if (dlqCause != null) { try { amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString()); - } - catch (JMSException e) { + } catch (JMSException e) { throw new IOException("failure to set dlq property " + dlqCause, e); } } @@ -776,12 +762,10 @@ public class OpenWireMessageConverter implements MessageConverter { try { if (prop instanceof SimpleString) { amqMsg.setObjectProperty(s.toString(), prop.toString()); - } - else { + } else { amqMsg.setObjectProperty(s.toString(), prop); } - } - catch (JMSException e) { + } catch (JMSException e) { throw new IOException("exception setting property " + s + " : " + prop, e); } } @@ -789,8 +773,7 @@ public class OpenWireMessageConverter implements MessageConverter { try { amqMsg.onSend(); amqMsg.setCompressed(isCompressed); - } - catch (JMSException e) { + } catch (JMSException e) { throw new IOException("Failed to covert to Openwire message", e); } return amqMsg; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 8497970..0ee1711 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -167,8 +167,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl this.connections.remove(context.getConnection()); this.clientIdSet.remove(clientId); } - } - else { + } else { throw new InvalidClientIDException("No clientID specified for connection disconnect request"); } } @@ -193,8 +192,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl ConnectionControl control = newConnectionControl(); try { c.updateClient(control); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); c.sendException(e); } @@ -308,12 +306,10 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl oldConnection.disconnect(true); connections.remove(oldConnection); connection.reconnect(context, info); - } - else { + } else { throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress()); } - } - else { + } else { //new connection context = connection.initContext(info); clientIdSet.put(clientId, context); @@ -379,8 +375,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl if (sess != null) { sess.send(producerExchange.getProducerState().getInfo(), advisoryMessage, false); } - } - finally { + } finally { context.setProducerFlowControl(originalFlowControl); } } @@ -389,8 +384,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl if (brokerName == null) { try { brokerName = InetAddressUtil.getLocalHostName().toLowerCase(Locale.ENGLISH); - } - catch (Exception e) { + } catch (Exception e) { brokerName = server.getNodeID().toString(); } } @@ -462,8 +456,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl if (sm != null && server.getConfiguration().isSecurityEnabled()) { if (sm instanceof ActiveMQSecurityManager3) { validated = ((ActiveMQSecurityManager3) sm).validateUser(login, passcode, null) != null; - } - else { + } else { validated = sm.validateUser(login, passcode); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java index 5b9d72e..69d1ccf 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQCompositeConsumerBrokerExchange.java @@ -16,14 +16,14 @@ */ package org.apache.activemq.artemis.core.protocol.openwire.amq; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessagePull; - import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessagePull; + public class AMQCompositeConsumerBrokerExchange extends AMQConsumerBrokerExchange { private final Map<ActiveMQDestination, AMQConsumer> consumerMap; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 17f5b79..e53b962 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -47,6 +47,7 @@ import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.wireformat.WireFormat; public class AMQConsumer { + private AMQSession session; private org.apache.activemq.command.ActiveMQDestination openwireDestination; private ConsumerInfo info; @@ -84,8 +85,7 @@ public class AMQConsumer { if (openwireDestination.isTopic()) { if (openwireDestination.isTemporary()) { address = new SimpleString("jms.temptopic." + physicalName); - } - else { + } else { address = new SimpleString("jms.topic." + physicalName); } @@ -93,8 +93,7 @@ public class AMQConsumer { serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1); serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener); - } - else { + } else { SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination); session.getCoreServer().getJMSDestinationCreator().create(queueName); serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1); @@ -149,12 +148,10 @@ public class AMQConsumer { // Create the new one session.getCoreSession().createQueue(address, queueName, selector, false, true); } - } - else { + } else { session.getCoreSession().createQueue(address, queueName, selector, false, true); } - } - else { + } else { queueName = new SimpleString(UUID.randomUUID().toString()); session.getCoreSession().createQueue(address, queueName, selector, true, false); @@ -200,12 +197,10 @@ public class AMQConsumer { session.deliverMessage(dispatch); currentWindow.decrementAndGet(); return size; - } - catch (IOException e) { + } catch (IOException e) { e.printStackTrace(); return 0; - } - catch (Throwable t) { + } catch (Throwable t) { t.printStackTrace(); return 0; } @@ -218,10 +213,12 @@ public class AMQConsumer { session.deliverMessage(md); } - /** The acknowledgement in openwire is done based on intervals. - * We will iterate through the list of delivering messages at {@link ServerConsumer#getDeliveringReferencesBasedOnProtocol(boolean, Object, Object)} - * and add those to the Transaction. - * Notice that we will start a new transaction on the cases where there is no transaction. */ + /** + * The acknowledgement in openwire is done based on intervals. + * We will iterate through the list of delivering messages at {@link ServerConsumer#getDeliveringReferencesBasedOnProtocol(boolean, Object, Object)} + * and add those to the Transaction. + * Notice that we will start a new transaction on the cases where there is no transaction. + */ public void acknowledge(MessageAck ack) throws Exception { MessageId first = ack.getFirstMessageId(); @@ -248,8 +245,7 @@ public class AMQConsumer { if (originalTX == null) { transaction = session.getCoreSession().newTransaction(); - } - else { + } else { transaction = originalTX; } @@ -257,8 +253,7 @@ public class AMQConsumer { for (MessageReference ref : ackList) { ref.acknowledge(transaction); } - } - else if (ack.isPoisonAck()) { + } else if (ack.isPoisonAck()) { for (MessageReference ref : ackList) { Throwable poisonCause = ack.getPoisonCause(); if (poisonCause != null) { @@ -340,7 +335,7 @@ public class AMQConsumer { /** * The MessagePullHandler is used with slow consumer policies. - * */ + */ private class MessagePullHandler { private long next = -1; @@ -368,8 +363,7 @@ public class AMQConsumer { if (next >= 0) { if (timeout <= 0) { latch.countDown(); - } - else { + } else { messagePullFuture = scheduledPool.schedule(new Runnable() { @Override public void run() { @@ -381,8 +375,7 @@ public class AMQConsumer { } } return false; - } - else { + } else { next = -1; if (messagePullFuture != null) { messagePullFuture.cancel(true); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 4e1e8c4..714a29a 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -113,8 +113,7 @@ public class AMQSession implements SessionCallback { if (sessionId == -1) { this.connection.setAdvisorySession(this); } - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.error("error init session", e); } @@ -124,8 +123,7 @@ public class AMQSession implements SessionCallback { public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { if (consumer.getProtocolData() != null) { return ((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref); - } - else { + } else { return false; } @@ -138,8 +136,7 @@ public class AMQSession implements SessionCallback { ActiveMQDestination[] dests = null; if (dest.isComposite()) { dests = dest.getCompositeDestinations(); - } - else { + } else { dests = new ActiveMQDestination[]{dest}; } @@ -260,8 +257,7 @@ public class AMQSession implements SessionCallback { if (destination.isComposite()) { actualDestinations = destination.getCompositeDestinations(); messageSend.setOriginalDestination(destination); - } - else { + } else { actualDestinations = new ActiveMQDestination[]{destination}; } @@ -284,16 +280,14 @@ public class AMQSession implements SessionCallback { try { ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); connection.dispatchSync(ack); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); connection.sendException(e); } } }; - } - else { + } else { final Connection transportConnection = connection.getTransportConnection(); // new Exception("Setting to false").printStackTrace(); @@ -301,8 +295,7 @@ public class AMQSession implements SessionCallback { if (transportConnection == null) { // I don't think this could happen, but just in case, avoiding races runnable = null; - } - else { + } else { runnable = new Runnable() { @Override public void run() { @@ -324,8 +317,7 @@ public class AMQSession implements SessionCallback { if (actualDestinations.length <= 1 || onComplete == null) { // if onComplete is null, this will be null ;) runToUse = onComplete; - } - else { + } else { final AtomicInteger count = new AtomicInteger(actualDestinations.length); runToUse = new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java index 3e7410c..05e1e34 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java @@ -46,16 +46,13 @@ public class OpenWireUtil { if (dest.isQueue()) { if (dest.isTemporary()) { return new SimpleString(JMS_TEMP_QUEUE_ADDRESS_PREFIX + dest.getPhysicalName()); - } - else { + } else { return new SimpleString(JMS_QUEUE_ADDRESS_PREFIX + dest.getPhysicalName()); } - } - else { + } else { if (dest.isTemporary()) { return new SimpleString(JMS_TEMP_TOPIC_ADDRESS_PREFIX + dest.getPhysicalName()); - } - else { + } else { return new SimpleString(JMS_TOPIC_ADDRESS_PREFIX + dest.getPhysicalName()); } } @@ -72,8 +69,7 @@ public class OpenWireUtil { String strippedAddress = address.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, ""); if (actualDestination.isQueue()) { return new ActiveMQQueue(strippedAddress); - } - else { + } else { return new ActiveMQTopic(strippedAddress); } } @@ -89,7 +85,7 @@ public class OpenWireUtil { } public static XidImpl toXID(TransactionId xaXid) { - return toXID((XATransactionId)xaXid); + return toXID((XATransactionId) xaXid); } public static XidImpl toXID(XATransactionId xaXid) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-stomp-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/pom.xml b/artemis-protocols/artemis-stomp-protocol/pom.xml index 38eec7e..7fff55f 100644 --- a/artemis-protocols/artemis-stomp-protocol/pom.xml +++ b/artemis-protocols/artemis-stomp-protocol/pom.xml @@ -14,7 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>artemis-protocols</artifactId> <groupId>org.apache.activemq</groupId> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java index 7966037..118f7f8 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java @@ -70,8 +70,7 @@ public class ActiveMQStompException extends Exception { StompFrame frame = null; if (handler == null) { frame = new StompFrame(Stomp.Responses.ERROR); - } - else { + } else { frame = handler.createStompFrame(Stomp.Responses.ERROR); } frame.addHeader(Stomp.Headers.Error.MESSAGE, this.getMessage()); @@ -82,8 +81,7 @@ public class ActiveMQStompException extends Exception { if (body != null) { frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain"); frame.setByteBody(body.getBytes(StandardCharsets.UTF_8)); - } - else { + } else { frame.setByteBody(new byte[0]); } if (disconnect != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java index b119054..e535725 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java @@ -17,10 +17,10 @@ package org.apache.activemq.artemis.core.protocol.stomp; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.jboss.logging.Messages; import org.jboss.logging.annotations.Cause; import org.jboss.logging.annotations.Message; import org.jboss.logging.annotations.MessageBundle; -import org.jboss.logging.Messages; /** * Logger Code 33
