http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java index 70fda67..75543fd 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.core.client.impl; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.SimpleString; @@ -23,9 +26,6 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - public class ClientProducerCreditsImpl implements ClientProducerCredits { private final Semaphore semaphore; @@ -94,12 +94,10 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits { // better getting a "null" string than a NPE ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address); } - } - catch (InterruptedException interrupted) { + } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); throw new ActiveMQInterruptedException(interrupted); - } - finally { + } finally { this.blocked = false; } } @@ -210,4 +208,4 @@ public class ClientProducerCreditsImpl implements ClientProducerCredits { private void requestCredits(final int credits) { session.sendProducerCreditsMessage(credits, address); } -} \ No newline at end of file +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index baa59f5..fddd4de 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -30,8 +30,8 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.message.BodyEncoder; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; -import org.apache.activemq.artemis.utils.DeflaterReader; import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream; +import org.apache.activemq.artemis.utils.DeflaterReader; import org.apache.activemq.artemis.utils.TokenBucketLimiter; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.jboss.logging.Logger; @@ -92,8 +92,7 @@ public class ClientProducerImpl implements ClientProducerInternal { if (autoGroup) { this.groupID = UUIDGenerator.getInstance().generateSimpleStringUUID(); - } - else { + } else { this.groupID = groupID; } @@ -101,8 +100,7 @@ public class ClientProducerImpl implements ClientProducerInternal { if (address != null) { producerCredits = session.getCredits(address, false); - } - else { + } else { producerCredits = null; } } @@ -141,8 +139,7 @@ public class ClientProducerImpl implements ClientProducerInternal { boolean confirmationWindowEnabled = session.isConfirmationWindowEnabled(); if (confirmationWindowEnabled) { doSend(address1, message, handler, true); - } - else { + } else { doSend(address1, message, null, true); if (handler != null) { session.scheduleConfirmation(handler, message); @@ -231,15 +228,13 @@ public class ClientProducerImpl implements ClientProducerInternal { if (sessionContext.supportsLargeMessage() && (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || msgI.getBodyBuffer().writerIndex() > minLargeMessageSize && !msgI.isServerMessage())) { isLarge = true; - } - else { + } else { isLarge = false; } if (!isLarge) { session.setAddress(msg, sendingAddress); - } - else { + } else { msg.setAddress(sendingAddress); } @@ -264,12 +259,10 @@ public class ClientProducerImpl implements ClientProducerInternal { if (isLarge) { largeMessageSend(sendBlocking, msgI, theCredits, handler); - } - else { + } else { sendRegularMessage(sendingAddress, msgI, sendBlocking, theCredits, handler); } - } - finally { + } finally { session.endCall(); } } @@ -330,11 +323,9 @@ public class ClientProducerImpl implements ClientProducerInternal { if (msgI.isServerMessage()) { largeMessageSendServer(sendBlocking, msgI, credits, handler); - } - else if ((input = msgI.getBodyInputStream()) != null) { + } else if ((input = msgI.getBodyInputStream()) != null) { largeMessageSendStreamed(sendBlocking, msgI, input, credits, handler); - } - else { + } else { largeMessageSendBuffered(sendBlocking, msgI, credits, handler); } } @@ -391,8 +382,7 @@ public class ClientProducerImpl implements ClientProducerInternal { credits.acquireCredits(creditsUsed); } - } - finally { + } finally { context.close(); } } @@ -443,7 +433,6 @@ public class ClientProducerImpl implements ClientProducerInternal { boolean headerSent = false; - int reconnectID = sessionContext.getReconnectID(); while (!lastPacket) { byte[] buff = new byte[minLargeMessageSize]; @@ -457,8 +446,7 @@ public class ClientProducerImpl implements ClientProducerInternal { try { numberOfBytesRead = input.read(buff, pos, wanted); - } - catch (IOException e) { + } catch (IOException e) { throw ActiveMQClientMessageBundle.BUNDLE.errorReadingBody(e); } @@ -494,8 +482,7 @@ public class ClientProducerImpl implements ClientProducerInternal { msgI.getBodyBuffer().writeBytes(buff, 0, pos); sendRegularMessage(msgI.getAddress(), msgI, sendBlocking, credits, handler); return; - } - else { + } else { if (!headerSent) { headerSent = true; sendInitialLargeMessageHeader(msgI, credits); @@ -503,8 +490,7 @@ public class ClientProducerImpl implements ClientProducerInternal { int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler); credits.acquireCredits(creditsSent); } - } - else { + } else { if (!headerSent) { headerSent = true; sendInitialLargeMessageHeader(msgI, credits); @@ -517,8 +503,7 @@ public class ClientProducerImpl implements ClientProducerInternal { try { input.close(); - } - catch (IOException e) { + } catch (IOException e) { throw ActiveMQClientMessageBundle.BUNDLE.errorClosingLargeMessage(e); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 88a20e1..d781fff 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -189,8 +189,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C connectionTTL == ActiveMQClient.DEFAULT_CONNECTION_TTL) { this.clientFailureCheckPeriod = ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD_INVM; this.connectionTTL = ActiveMQClient.DEFAULT_CONNECTION_TTL_INVM; - } - else { + } else { this.clientFailureCheckPeriod = clientFailureCheckPeriod; this.connectionTTL = connectionTTL; @@ -269,14 +268,13 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C logger.debug("Setting up backup config = " + backUp + " for live = " + live); } backupConfig = backUp; - } - else { + } else { if (logger.isDebugEnabled()) { logger.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live + - " / " + - backUp + - " but it didn't belong to " + - connectorConfig); + " / " + + backUp + + " but it didn't belong to " + + connectorConfig); } } } @@ -444,8 +442,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C session.close(); else session.cleanUp(false); - } - catch (Exception e1) { + } catch (Exception e1) { ActiveMQClientLogger.LOGGER.unableToCloseSession(e1); } } @@ -475,8 +472,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C public boolean waitForTopology(long timeout, TimeUnit unit) { try { return latchFinalTopology.await(timeout, unit); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { Thread.currentThread().interrupt(); ActiveMQClientLogger.LOGGER.warn(e.getMessage(), e); return false; @@ -506,12 +502,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C String scaleDownTargetNodeID) { try { failoverOrReconnect(connectionID, me, scaleDownTargetNodeID); - } - catch (ActiveMQInterruptedException e1) { + } catch (ActiveMQInterruptedException e1) { // this is just a debug, since an interrupt is an expected event (in case of a shutdown) logger.debug(e1.getMessage(), e1); - } - catch (Throwable t) { + } catch (Throwable t) { //for anything else just close so clients are un blocked close(); throw t; @@ -532,7 +526,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C for (ClientSessionInternal session : sessions) { SessionContext context = session.getSessionContext(); if (context instanceof ActiveMQSessionContext) { - ActiveMQSessionContext sessionContext = (ActiveMQSessionContext)context; + ActiveMQSessionContext sessionContext = (ActiveMQSessionContext) context; if (sessionContext.isKilled()) { setReconnectAttempts(0); } @@ -607,8 +601,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C if (localConnector != null) { try { localConnector.close(); - } - catch (Exception ignore) { + } catch (Exception ignore) { // no-op } } @@ -627,8 +620,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED); } } - } - else { + } else { RemotingConnection connectionToDestory = connection; if (connectionToDestory != null) { connectionToDestory.destroy(); @@ -643,8 +635,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C callFailoverListeners(FailoverEventType.FAILOVER_FAILED); callSessionFailureListeners(me, true, false, scaleDownTargetNodeID); } - } - finally { + } finally { localFailoverLock.unlock(); } @@ -659,8 +650,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C for (ClientSessionInternal session : sessionsToClose) { try { session.cleanUp(true); - } - catch (Exception cause) { + } catch (Exception cause) { ActiveMQClientLogger.LOGGER.failedToCleanupSession(cause); } } @@ -708,12 +698,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C try { if (afterReconnect) { listener.connectionFailed(me, failedOver, scaleDownTargetNodeID); - } - else { + } else { listener.beforeReconnect(me); } - } - catch (final Throwable t) { + } catch (final Throwable t) { // Failure of one listener to execute shouldn't prevent others // from // executing @@ -728,8 +716,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C for (final FailoverEventListener listener : listenersClone) { try { listener.failoverEvent(type); - } - catch (final Throwable t) { + } catch (final Throwable t) { // Failure of one listener to execute shouldn't prevent others // from // executing @@ -789,10 +776,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C return; if (logger.isTraceEnabled()) { logger.trace("getConnectionWithRetry::" + reconnectAttempts + - " with retryInterval = " + - retryInterval + - " multiplier = " + - retryIntervalMultiplier, new Exception("trace")); + " with retryInterval = " + + retryInterval + + " multiplier = " + + retryIntervalMultiplier, new Exception("trace")); } long interval = retryInterval; @@ -809,8 +796,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C logger.debug("Reconnection successful"); } return; - } - else { + } else { // Failed to get connection if (reconnectAttempts != 0) { @@ -832,8 +818,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C if (clientProtocolManager.waitOnLatch(interval)) { return; } - } - catch (InterruptedException ignore) { + } catch (InterruptedException ignore) { throw new ActiveMQInterruptedException(createTrace); } @@ -845,8 +830,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } interval = newInterval; - } - else { + } else { logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory"); return; } @@ -876,8 +860,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C try { connectionInUse.destroy(); - } - catch (Throwable ignore) { + } catch (Throwable ignore) { } connection = null; @@ -886,8 +869,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C if (connectorInUse != null) { connectorInUse.close(); } - } - catch (Throwable ignore) { + } catch (Throwable ignore) { } connector = null; @@ -904,8 +886,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C if (connection != null) { // a connection already exists, so returning the same one return connection; - } - else { + } else { RemotingConnection connection = establishNewConnection(); this.connection = connection; @@ -920,8 +901,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C this.connection = null; return null; } - } - catch (ActiveMQException e) { + } catch (ActiveMQException e) { connection.destroy(); this.connection = null; return null; @@ -939,8 +919,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } clientProtocolManager.sendSubscribeTopology(serverLocator.isClusterConnection()); } - } - else { + } else { logger.debug("serverLocator@" + System.identityHashCode(serverLocator + " had no topology")); } @@ -959,10 +938,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C // To make sure the first ping will be sent pingRunnable.send(); - } - // send a ping every time we create a new remoting connection - // to set up its TTL on the server side - else { + } else { + // send a ping every time we create a new remoting connection + // to set up its TTL on the server side pingRunnable.run(); } } @@ -1015,12 +993,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C CLOSE_RUNNABLES.add(this); if (scaleDownTargetNodeID == null) { conn.fail(ActiveMQClientMessageBundle.BUNDLE.disconnected()); - } - else { + } else { conn.fail(ActiveMQClientMessageBundle.BUNDLE.disconnected(), scaleDownTargetNodeID); } - } - finally { + } finally { CLOSE_RUNNABLES.remove(this); } @@ -1065,8 +1041,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C try { connector.close(); - } - catch (Throwable t) { + } catch (Throwable t) { } } @@ -1092,10 +1067,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C try { if (logger.isDebugEnabled()) { logger.debug("Trying to connect with connector = " + connectorFactory + - ", parameters = " + - connectorConfig.getParams() + - " connector = " + - connector); + ", parameters = " + + connectorConfig.getParams() + + " connector = " + + connector); } Connector liveConnector = createConnector(connectorFactory, connectorConfig); @@ -1103,8 +1078,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C if ((transportConnection = openTransportConnection(liveConnector)) != null) { // if we can't connect the connect method will return null, hence we have to try the backup connector = liveConnector; - } - else if (backupConfig != null) { + } else if (backupConfig != null) { if (logger.isDebugEnabled()) { logger.debug("Trying backup config = " + backupConfig); } @@ -1127,16 +1101,14 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C connectorConfig = backupConfig; backupConfig = null; connectorFactory = backupConnectorFactory; - } - else { + } else { if (logger.isDebugEnabled()) { logger.debug("Backup is not active."); } } } - } - catch (Exception cause) { + } catch (Exception cause) { // Sanity catch for badly behaved remoting plugins ActiveMQClientLogger.LOGGER.createConnectorException(cause); @@ -1144,16 +1116,14 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C if (transportConnection != null) { try { transportConnection.close(); - } - catch (Throwable t) { + } catch (Throwable t) { } } if (connector != null) { try { connector.close(); - } - catch (Throwable t) { + } catch (Throwable t) { } } @@ -1173,8 +1143,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C if (theConn != null && connectionID.equals(theConn.getID())) { theConn.bufferReceived(connectionID, buffer); - } - else { + } else { logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet"); } } @@ -1262,8 +1231,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C }); return; - } - else { + } else { lastCheck = now; } } @@ -1331,11 +1299,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C if (logger.isTraceEnabled()) { logger.trace("Disconnect being called on client:" + - " server locator = " + - serverLocator + - " notifying node " + - nodeID + - " as down", new Exception("trace")); + " server locator = " + + serverLocator + + " notifying node " + + nodeID + + " as down", new Exception("trace")); } serverLocator.notifyNodeDown(System.currentTimeMillis(), nodeID); @@ -1359,8 +1327,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } serverLocator.notifyNodeUp(uniqueEventID, nodeID, backupGroupName, scaleDownGroupName, connectorPair, isLast); - } - finally { + } finally { if (isLast) { latchFinalTopology.countDown(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 8b0fc8e..de45066 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -274,8 +274,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi startCall(); try { sessionContext.createSharedQueue(address, queueName, filterString, durable); - } - finally { + } finally { endCall(); } @@ -328,8 +327,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi startCall(); try { sessionContext.deleteQueue(queueName); - } - finally { + } finally { endCall(); } } @@ -346,8 +344,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi startCall(); try { return sessionContext.queueQuery(queueName); - } - finally { + } finally { endCall(); } @@ -411,13 +408,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return createConsumer(SimpleString.toSimpleString(queueName), null, browseOnly); } - @Override public boolean isWritable(ReadyListener callback) { return sessionContext.isWritable(callback); } - /** * Note, we DO NOT currently support direct consumers (i.e. consumers where delivery occurs on * the remoting thread). @@ -510,16 +505,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } try { sessionContext.simpleCommit(); - } - catch (ActiveMQException e) { + } catch (ActiveMQException e) { if (e.getType() == ActiveMQExceptionType.UNBLOCKED || rollbackOnly) { // The call to commit was unlocked on failover, we therefore rollback the tx, // and throw a transaction rolled back exception instead //or //if we have been set to rollbackonly then we have probably failed over and don't know if the tx has committed rollbackOnFailover(false); - } - else { + } else { throw e; } } @@ -753,8 +746,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi startCall(); try { sessionContext.sendACK(false, blockOnAcknowledge, consumer, message); - } - finally { + } finally { endCall(); } } @@ -772,8 +764,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi try { sessionContext.sendACK(true, blockOnAcknowledge, consumer, message); - } - finally { + } finally { endCall(); } } @@ -859,8 +850,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi public void run() { try { consumer.close(); - } - catch (ActiveMQException e) { + } catch (ActiveMQException e) { ActiveMQClientLogger.LOGGER.unableToCloseConsumer(e); } } @@ -887,8 +877,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } inClose = true; sessionContext.sessionClose(); - } - catch (Throwable e) { + } catch (Throwable e) { // Session close should always return without exception // Note - we only log at trace @@ -1002,11 +991,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi sessionContext.returnBlocking(cause); } - } - catch (Throwable t) { + } catch (Throwable t) { ActiveMQClientLogger.LOGGER.failedToHandleFailover(t); - } - finally { + } finally { sessionContext.releaseCommunications(); } @@ -1053,13 +1040,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi logger.tracef("setAddress() Setting default address as %s", address); message.setAddress(address); - } - else { + } else { if (!address.equals(defaultAddress)) { logger.tracef("setAddress() setting non default address %s on message", address); message.setAddress(address); - } - else { + } else { logger.trace("setAddress() being set as null"); message.setAddress(null); } @@ -1074,7 +1059,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } } - @Override public void setPacketSize(final int packetSize) { if (packetSize > this.initialMessagePacketSize) { @@ -1149,8 +1133,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi if (rollbackOnly) { if (onePhase) { throw new XAException(XAException.XAER_RMFAIL); - } - else { + } else { ActiveMQClientLogger.LOGGER.commitAfterFailover(); } } @@ -1162,19 +1145,16 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi try { sessionContext.xaCommit(xid, onePhase); workDone = false; - } - catch (XAException xae) { + } catch (XAException xae) { throw xae; - } - catch (Throwable t) { + } catch (Throwable t) { ActiveMQClientLogger.LOGGER.failoverDuringCommit(); XAException xaException = null; if (onePhase) { //we must return XA_RMFAIL xaException = new XAException(XAException.XAER_RMFAIL); - } - else { + } else { // Any error on commit -> RETRY // We can't rollback a Prepared TX for definition xaException = new XAException(XAException.XA_RETRY); @@ -1182,8 +1162,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi xaException.initCause(t); throw xaException; - } - finally { + } finally { endCall(); } } @@ -1200,8 +1179,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi if (rollbackOnly) { try { rollback(false, false); - } - catch (Throwable ignored) { + } catch (Throwable ignored) { logger.debug("Error on rollback during end call!", ignored); } throw new XAException(XAException.XAER_RMFAIL); @@ -1213,23 +1191,19 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi startCall(); try { sessionContext.xaEnd(xid, flags); - } - finally { + } finally { endCall(); } - } - catch (XAException xae) { + } catch (XAException xae) { throw xae; - } - catch (Throwable t) { + } catch (Throwable t) { ActiveMQClientLogger.LOGGER.errorCallingEnd(t); // This could occur if the TM interrupts the thread XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; } - } - finally { + } finally { currentXID = null; } } @@ -1240,17 +1214,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi startCall(); try { sessionContext.xaForget(xid); - } - catch (XAException xae) { + } catch (XAException xae) { throw xae; - } - catch (Throwable t) { + } catch (Throwable t) { // This could occur if the TM interrupts the thread XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; - } - finally { + } finally { endCall(); } } @@ -1261,8 +1232,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi try { return sessionContext.recoverSessionTimeout(); - } - catch (Throwable t) { + } catch (Throwable t) { // This could occur if the TM interrupts the thread XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); @@ -1276,8 +1246,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi try { return sessionContext.configureTransactionTimeout(seconds); - } - catch (Throwable t) { + } catch (Throwable t) { markRollbackOnly(); // The TM will ignore any errors from here, if things are this screwed up we mark rollbackonly // This could occur if the TM interrupts the thread XAException xaException = new XAException(XAException.XAER_RMFAIL); @@ -1317,8 +1286,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } if (xares instanceof ClientSessionInternal) { return (ClientSessionInternal) xares; - } - else if (xares instanceof ActiveMQXAResource) { + } else if (xares instanceof ActiveMQXAResource) { return getSessionInternalFromXAResource(((ActiveMQXAResource) xares).getResource()); } return null; @@ -1341,25 +1309,21 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi startCall(); try { return sessionContext.xaPrepare(xid); - } - catch (XAException xae) { + } catch (XAException xae) { throw xae; - } - catch (ActiveMQException e) { + } catch (ActiveMQException e) { if (e.getType() == ActiveMQExceptionType.UNBLOCKED) { // Unblocked on failover try { // will retry once after failover & unblock return sessionContext.xaPrepare(xid); - } - catch (Throwable t) { + } catch (Throwable t) { // ignore and rollback } ActiveMQClientLogger.LOGGER.failoverDuringPrepareRollingBack(); try { rollback(false); - } - catch (Throwable t) { + } catch (Throwable t) { // This could occur if the TM interrupts the thread XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); @@ -1377,16 +1341,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(e); throw xaException; - } - catch (Throwable t) { + } catch (Throwable t) { ActiveMQClientLogger.LOGGER.errorDuringPrepare(t); // This could occur if the TM interrupts the thread XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); throw xaException; - } - finally { + } finally { endCall(); } } @@ -1398,8 +1360,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi if ((flags & XAResource.TMSTARTRSCAN) == XAResource.TMSTARTRSCAN) { try { return sessionContext.xaScan(); - } - catch (Throwable t) { + } catch (Throwable t) { // This could occur if the TM interrupts the thread XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); @@ -1434,19 +1395,16 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi try { sessionContext.xaRollback(xid, wasStarted); - } - finally { + } finally { if (wasStarted) { start(); } } workDone = false; - } - catch (XAException xae) { + } catch (XAException xae) { throw xae; - } - catch (ActiveMQException e) { + } catch (ActiveMQException e) { if (e.getType() == ActiveMQExceptionType.UNBLOCKED) { // Unblocked on failover throw new XAException(XAException.XA_RETRY); @@ -1456,8 +1414,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(e); throw xaException; - } - catch (Throwable t) { + } catch (Throwable t) { // This could occur if the TM interrupts the thread XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); @@ -1477,20 +1434,16 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi sessionContext.xaStart(xid, flags); this.currentXID = xid; - } - catch (XAException xae) { + } catch (XAException xae) { throw xae; - } - catch (ActiveMQException e) { + } catch (ActiveMQException e) { // we can retry this only because we know for sure that no work would have been done if (e.getType() == ActiveMQExceptionType.UNBLOCKED) { try { sessionContext.xaStart(xid, flags); - } - catch (XAException xae) { + } catch (XAException xae) { throw xae; - } - catch (Throwable t) { + } catch (Throwable t) { // This could occur if the TM interrupts the thread XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); @@ -1502,8 +1455,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(e); throw xaException; - } - catch (Throwable t) { + } catch (Throwable t) { // This could occur if the TM interrupts the thread XAException xaException = new XAException(XAException.XAER_RMFAIL); xaException.initCause(t); @@ -1517,8 +1469,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi public void connectionFailed(final ActiveMQException me, boolean failedOver) { try { cleanUp(false); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.failedToCleanupSession(e); } } @@ -1618,8 +1569,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi startCall(); try { sessionContext.createQueue(address, queueName, filterString, durable, temp); - } - finally { + } finally { endCall(); } } @@ -1739,24 +1689,18 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi private String convertTXFlag(final int flags) { if (flags == XAResource.TMSUSPEND) { return "SESS_XA_SUSPEND"; - } - else if (flags == XAResource.TMSUCCESS) { + } else if (flags == XAResource.TMSUCCESS) { return "TMSUCCESS"; - } - else if (flags == XAResource.TMFAIL) { + } else if (flags == XAResource.TMFAIL) { return "TMFAIL"; - } - else if (flags == XAResource.TMJOIN) { + } else if (flags == XAResource.TMJOIN) { return "TMJOIN"; - } - else if (flags == XAResource.TMRESUME) { + } else if (flags == XAResource.TMRESUME) { return "TMRESUME"; - } - else if (flags == XAResource.TMNOFLAGS) { + } else if (flags == XAResource.TMNOFLAGS) { // Don't need to flush since the previous end will have done this return "TMNOFLAGS"; - } - else { + } else { return "XAER_INVAL(" + flags + ")"; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java index b082786..ed636bd 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java @@ -88,7 +88,9 @@ public interface ClientSessionInternal extends ClientSession { ClientProducerCreditManager getProducerCreditManager(); - /** This will set the address at the message */ + /** + * This will set the address at the message + */ void setAddress(Message message, SimpleString address); void checkDefaultAddress(SimpleString address); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java index 0e826b5..24b7f1e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java @@ -102,8 +102,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll InputStream input = new ActiveMQBufferInputStream(bufferDelegate); dataInput = new DataInputStream(new InflaterReader(input)); - } - catch (Exception e) { + } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } @@ -119,8 +118,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll public byte readByte() { try { return getStream().readByte(); - } - catch (Exception e) { + } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } } @@ -340,8 +338,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll public int readUnsignedByte() { try { return getStream().readUnsignedByte(); - } - catch (Exception e) { + } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @@ -350,8 +347,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll public short readShort() { try { return getStream().readShort(); - } - catch (Exception e) { + } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @@ -360,8 +356,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll public int readUnsignedShort() { try { return getStream().readUnsignedShort(); - } - catch (Exception e) { + } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @@ -370,8 +365,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll public int readInt() { try { return getStream().readInt(); - } - catch (Exception e) { + } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @@ -385,8 +379,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll public long readLong() { try { return getStream().readLong(); - } - catch (Exception e) { + } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @@ -398,8 +391,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll if (nReadBytes < length) { ActiveMQClientLogger.LOGGER.compressedLargeMessageError(length, nReadBytes); } - } - catch (Exception e) { + } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } @@ -445,33 +437,36 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll getStream().read(); } return length; - } - catch (Exception e) { + } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } - - /** from {@link java.io.DataInput} interface */ + /** + * from {@link java.io.DataInput} interface + */ @Override public void readFully(byte[] b) throws IOException { readBytes(b); } - /** from {@link java.io.DataInput} interface */ + /** + * from {@link java.io.DataInput} interface + */ @Override public void readFully(byte[] b, int off, int len) throws IOException { readBytes(b, off, len); } - /** from {@link java.io.DataInput} interface */ + /** + * from {@link java.io.DataInput} interface + */ @Override @SuppressWarnings("deprecation") public String readLine() throws IOException { return getStream().readLine(); } - @Override public void writeByte(final byte value) { throw new IllegalAccessError(OPERATION_NOT_SUPPORTED); @@ -568,8 +563,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll int b = readByte(); if (b == DataConstants.NULL) { return null; - } - else { + } else { return readSimpleString(); } } @@ -579,8 +573,7 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll int b = readByte(); if (b == DataConstants.NULL) { return null; - } - else { + } else { return readString(); } } @@ -603,11 +596,9 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll chars[i] = (char) readShort(); } return new String(chars); - } - else if (len < 0xfff) { + } else if (len < 0xfff) { return readUTF(); - } - else { + } else { return readSimpleString().toString(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java index da4d6bd..5a27f24 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java @@ -122,8 +122,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { this.totalSize = totalSize; if (cachedFile == null) { fileCache = null; - } - else { + } else { fileCache = new FileCache(cachedFile); } this.bufferSize = bufferSize; @@ -142,8 +141,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { return; try { checkForPacket(totalSize - 1); - } - catch (Throwable ignored) { + } catch (Throwable ignored) { } } } @@ -177,18 +175,15 @@ public class LargeMessageControllerImpl implements LargeMessageController { if (streamEnded) { outStream.close(); } - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorAddingPacket(e); handledException = e; } - } - else { + } else { if (fileCache != null) { try { fileCache.cachePackage(chunk); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorAddingPacket(e); handledException = e; } @@ -201,8 +196,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { if (flowControlCredit != 0) { try { consumerInternal.flowControl(flowControlCredit, !isContinues); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorAddingPacket(e); handledException = e; } @@ -222,8 +216,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { try { consumerInternal.flowControl(totalSize, false); - } - catch (Exception ignored) { + } catch (Exception ignored) { // what else can we do here? ActiveMQClientLogger.LOGGER.errorCallingCancel(ignored); } @@ -300,24 +293,21 @@ public class LargeMessageControllerImpl implements LargeMessageController { // And we will check if no packets have arrived within readTimeout milliseconds if (timeWait != 0) { timeOut = System.currentTimeMillis() + timeWait; - } - else { + } else { timeOut = System.currentTimeMillis() + readTimeout; } while (!streamEnded && handledException == null) { try { this.wait(timeWait == 0 ? readTimeout : timeWait); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new ActiveMQInterruptedException(e); } if (!streamEnded && handledException == null) { if (timeWait != 0 && System.currentTimeMillis() > timeOut) { throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage(); - } - else if (System.currentTimeMillis() > timeOut && !packetAdded) { + } else if (System.currentTimeMillis() > timeOut && !packetAdded) { throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage(); } } @@ -343,15 +333,13 @@ public class LargeMessageControllerImpl implements LargeMessageController { // instead to just where it was canceled. if (handledException instanceof ActiveMQLargeMessageInterruptedException) { nestedException = new ActiveMQLargeMessageInterruptedException(handledException.getMessage()); - } - else { + } else { nestedException = new ActiveMQException(((ActiveMQException) handledException).getType(), handledException.getMessage()); } nestedException.initCause(handledException); throw nestedException; - } - else { + } else { throw new ActiveMQException(ActiveMQExceptionType.LARGE_MESSAGE_ERROR_BODY, "Error on saving LargeMessageBufferImpl", handledException); } } @@ -379,8 +367,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { if (fileCache != null && index < packetPosition) { return fileCache.getByteFromCache(index); - } - else { + } else { return currentPacket.getChunk()[(int) (index - packetPosition)]; } } @@ -546,8 +533,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { public void readerIndex(final int readerIndex) { try { checkForPacket(readerIndex); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorReadingIndex(e); throw new RuntimeException(e.getMessage(), e); } @@ -573,8 +559,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { public void setIndex(final int readerIndex, final int writerIndex) { try { checkForPacket(readerIndex); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorSettingIndex(e); throw new RuntimeException(e.getMessage(), e); } @@ -601,8 +586,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { if (readableBytes > Integer.MAX_VALUE) { return Integer.MAX_VALUE; - } - else { + } else { return (int) (totalSize - readerIndex); } } @@ -621,8 +605,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { public void resetReaderIndex() { try { checkForPacket(0); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorReSettingIndex(e); throw new RuntimeException(e.getMessage(), e); } @@ -956,8 +939,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { int b = readByte(); if (b == DataConstants.NULL) { return null; - } - else { + } else { return readSimpleString(); } } @@ -967,8 +949,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { int b = readByte(); if (b == DataConstants.NULL) { return null; - } - else { + } else { return readString(); } } @@ -991,11 +972,9 @@ public class LargeMessageControllerImpl implements LargeMessageController { chars[i] = (char) readShort(); } return new String(chars); - } - else if (len < 0xfff) { + } else if (len < 0xfff) { return readUTF(); - } - else { + } else { return readSimpleString().toString(); } } @@ -1074,8 +1053,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { streamEnded = true; output.close(); } - } - catch (IOException e) { + } catch (IOException e) { throw ActiveMQClientMessageBundle.BUNDLE.errorWritingLargeMessage(e); } } @@ -1105,11 +1083,9 @@ public class LargeMessageControllerImpl implements LargeMessageController { packetPosition += sizeToAdd; packetLastPosition = packetPosition + currentPacket.getChunk().length; - } - catch (IndexOutOfBoundsException e) { + } catch (IndexOutOfBoundsException e) { throw e; - } - catch (Exception e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -1180,12 +1156,10 @@ public class LargeMessageControllerImpl implements LargeMessageController { readCachePositionEnd = readCachePositionStart + cachedChannel.read(readCache) - 1; } - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorReadingCache(e); throw new RuntimeException(e.getMessage(), e); - } - finally { + } finally { close(); } } @@ -1221,8 +1195,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { if (cachedChannel != null && cachedChannel.isOpen()) { try { cachedChannel.close(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorClosingCache(e); } cachedChannel = null; @@ -1231,8 +1204,7 @@ public class LargeMessageControllerImpl implements LargeMessageController { if (cachedRAFile != null) { try { cachedRAFile.close(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorClosingCache(e); } cachedRAFile = null; @@ -1246,27 +1218,32 @@ public class LargeMessageControllerImpl implements LargeMessageController { if (cachedFile != null && cachedFile.exists()) { try { cachedFile.delete(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorFinalisingCache(e); } } } } - /** from {@link java.io.DataInput} interface */ + /** + * from {@link java.io.DataInput} interface + */ @Override public void readFully(byte[] b) throws IOException { readBytes(b); } - /** from {@link java.io.DataInput} interface */ + /** + * from {@link java.io.DataInput} interface + */ @Override public void readFully(byte[] b, int off, int len) throws IOException { readBytes(b, off, len); } - /** from {@link java.io.DataInput} interface */ + /** + * from {@link java.io.DataInput} interface + */ @Override public String readLine() throws IOException { return ByteUtil.readLine(this); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 76ffdb9..beb1b13 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -220,13 +220,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private synchronized void setThreadPools() { if (threadPool != null) { return; - } - else if (useGlobalPools) { + } else if (useGlobalPools) { threadPool = ActiveMQClient.getGlobalThreadPool(); scheduledThreadPool = ActiveMQClient.getGlobalScheduledThreadPool(); - } - else { + } else { this.shutdownPool = true; ThreadFactory factory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() { @@ -238,8 +236,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (threadPoolMaxSize == -1) { threadPool = Executors.newCachedThreadPool(factory); - } - else { + } else { threadPool = new ActiveMQThreadPoolExecutor(0, threadPoolMaxSize, 60L, TimeUnit.SECONDS, factory); } @@ -255,9 +252,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } @Override - public synchronized boolean setThreadPools(ExecutorService threadPool, ScheduledExecutorService scheduledThreadPool) { + public synchronized boolean setThreadPools(ExecutorService threadPool, + ScheduledExecutorService scheduledThreadPool) { - if (threadPool == null || scheduledThreadPool == null) return false; + if (threadPool == null || scheduledThreadPool == null) + return false; if (this.threadPool == null && this.scheduledThreadPool == null) { useGlobalPools = false; @@ -265,8 +264,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery this.threadPool = threadPool; this.scheduledThreadPool = scheduledThreadPool; return true; - } - else { + } else { return false; } } @@ -305,8 +303,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery discoveryGroup.start(); } - } - catch (Exception e) { + } catch (Exception e) { state = null; throw ActiveMQClientMessageBundle.BUNDLE.failedToInitialiseSessionFactory(e); } @@ -401,8 +398,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public static ServerLocator newLocator(String uri) { try { return newLocator(new URI(uri)); - } - catch (Exception e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -411,8 +407,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery try { ServerLocatorParser parser = new ServerLocatorParser(); return parser.newObject(uri, null); - } - catch (Exception e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -546,8 +541,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos]; return pair.getA(); - } - else { + } else { // Get from initialconnectors if (logger.isTraceEnabled()) { logger.trace("Selecting connector from initial connectors."); @@ -572,8 +566,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public void run() { try { connect(); - } - catch (Exception e) { + } catch (Exception e) { if (!isClosed()) { ActiveMQClientLogger.LOGGER.errorConnectingToNodes(e); } @@ -627,8 +620,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (returnFactory != null) { addFactory(returnFactory); return returnFactory; - } - else { + } else { // wait for discovery group to get the list of initial connectors return (ClientSessionFactoryInternal) createSessionFactory(); } @@ -689,16 +681,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery try { try { factory.connect(reconnectAttempts, failoverOnInitialConnection); - } - catch (ActiveMQException e1) { + } catch (ActiveMQException e1) { //we need to make sure is closed just for garbage collection factory.close(); throw e1; } addFactory(factory); return factory; - } - finally { + } finally { removeFromConnecting(factory); } } @@ -717,16 +707,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery try { try { factory.connect(reconnectAttempts, failoverOnInitialConnection); - } - catch (ActiveMQException e1) { + } catch (ActiveMQException e1) { //we need to make sure is closed just for garbage collection factory.close(); throw e1; } addFactory(factory); return factory; - } - finally { + } finally { removeFromConnecting(factory); } } @@ -780,12 +768,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery try { addToConnecting(factory); factory.connect(initialConnectAttempts, failoverOnInitialConnection); - } - finally { + } finally { removeFromConnecting(factory); } - } - catch (ActiveMQException e) { + } catch (ActiveMQException e) { factory.close(); if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) { attempts++; @@ -800,8 +786,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } retry = true; - } - else { + } else { throw e; } } @@ -1344,13 +1329,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery synchronized (this) { try { discoveryGroup.stop(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.failedToStopDiscovery(e); } } - } - else { + } else { staticConnector.disconnect(); } @@ -1377,8 +1360,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery for (ClientSessionFactory factory : clonedFactory) { if (sendClose) { factory.close(); - } - else { + } else { factory.cleanup(); } } @@ -1391,8 +1373,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS)) { ActiveMQClientLogger.LOGGER.timedOutWaitingForTermination(); } - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new ActiveMQInterruptedException(e); } } @@ -1404,8 +1385,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS)) { ActiveMQClientLogger.LOGGER.timedOutWaitingForScheduledPoolTermination(); } - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new ActiveMQInterruptedException(e); } } @@ -1436,14 +1416,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (clusterConnection) { updateArraysAndPairs(); - } - else { + } else { if (topology.isEmpty()) { // Resetting the topology to its original condition as it was brand new receivedTopology = false; topologyArray = null; - } - else { + } else { updateArraysAndPairs(); if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null) { @@ -1551,16 +1529,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery public void run() { try { connect(); - } - catch (ActiveMQException e) { + } catch (ActiveMQException e) { ActiveMQClientLogger.LOGGER.errorConnectingToNodes(e); } } }; if (startExecutor != null) { startExecutor.execute(connectRunnable); - } - else { + } else { connectRunnable.run(); } } @@ -1667,8 +1643,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (clusterConnection && exception.getType() == ActiveMQExceptionType.DISCONNECTED) { try { ServerLocatorImpl.this.start(startExecutor); - } - catch (Exception e) { + } catch (Exception e) { // There isn't much to be done if this happens here ActiveMQClientLogger.LOGGER.errorStartingLocator(e); } @@ -1690,10 +1665,10 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (logger.isDebugEnabled()) { logger.debug("Returning " + csf + - " after " + - retryNumber + - " retries on StaticConnector " + - ServerLocatorImpl.this); + " after " + + retryNumber + + " retries on StaticConnector " + + ServerLocatorImpl.this); } return csf; @@ -1708,14 +1683,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return null; } - } - catch (RejectedExecutionException e) { + } catch (RejectedExecutionException e) { if (isClosed() || skipWarnings) return null; logger.debug("Rejected execution", e); throw e; - } - catch (Exception e) { + } catch (Exception e) { if (isClosed() || skipWarnings) return null; ActiveMQClientLogger.LOGGER.errorConnectingToNodes(e); @@ -1795,14 +1768,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery try { factoryToUse.connect(1, false); - } - finally { + } finally { removeFromConnecting(factoryToUse); } } return factoryToUse; - } - catch (ActiveMQException e) { + } catch (ActiveMQException e) { logger.debug(this + "::Exception on establish connector initial connection", e); return null; } @@ -1883,5 +1854,4 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java index 4e9230b..096fd66 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connector; import org.jboss.logging.Logger; public final class Topology { + private static final Logger logger = Logger.getLogger(Topology.class); private final Set<ClusterTopologyListener> topologyListeners; @@ -181,11 +182,11 @@ public final class Topology { Long deleteTme = getMapDelete().get(nodeId); if (deleteTme != null && uniqueEventID != 0 && uniqueEventID < deleteTme) { logger.debug("Update uniqueEvent=" + uniqueEventID + - ", nodeId=" + - nodeId + - ", memberInput=" + - memberInput + - " being rejected as there was a delete done after that"); + ", nodeId=" + + nodeId + + ", memberInput=" + + memberInput + + " being rejected as there was a delete done after that"); return false; } @@ -214,8 +215,8 @@ public final class Topology { if (logger.isTraceEnabled()) { logger.trace(this + "::updated currentMember=nodeID=" + nodeId + ", currentMember=" + - currentMember + ", memberInput=" + memberInput + "newMember=" + - newMember, new Exception("trace")); + currentMember + ", memberInput=" + memberInput + "newMember=" + + newMember, new Exception("trace")); } newMember.setUniqueEventID(uniqueEventID); @@ -254,17 +255,16 @@ public final class Topology { for (ClusterTopologyListener listener : copy) { if (logger.isTraceEnabled()) { logger.trace(Topology.this + " informing " + - listener + - " about node up = " + - nodeId + - " connector = " + - memberToSend.getConnector()); + listener + + " about node up = " + + nodeId + + " connector = " + + memberToSend.getConnector()); } try { listener.nodeUP(memberToSend, false); - } - catch (Throwable e) { + } catch (Throwable e) { ActiveMQClientLogger.LOGGER.errorSendingTopology(e); } } @@ -293,8 +293,7 @@ public final class Topology { if (member.getUniqueEventID() > uniqueEventID) { logger.debug("The removeMember was issued before the node " + nodeId + " was started, ignoring call"); member = null; - } - else { + } else { getMapDelete().put(nodeId, uniqueEventID); member = topology.remove(nodeId); } @@ -303,12 +302,12 @@ public final class Topology { if (logger.isTraceEnabled()) { logger.trace("removeMember " + this + - " removing nodeID=" + - nodeId + - ", result=" + - member + - ", size = " + - topology.size(), new Exception("trace")); + " removing nodeID=" + + nodeId + + ", result=" + + member + + ", size = " + + topology.size(), new Exception("trace")); } if (member != null) { @@ -323,8 +322,7 @@ public final class Topology { } try { listener.nodeDown(uniqueEventID, nodeId); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorSendingTopologyNodedown(e); } } @@ -353,11 +351,11 @@ public final class Topology { for (Map.Entry<String, TopologyMemberImpl> entry : copy.entrySet()) { if (logger.isDebugEnabled()) { logger.debug(Topology.this + " sending " + - entry.getKey() + - " / " + - entry.getValue().getConnector() + - " to " + - listener); + entry.getKey() + + " / " + + entry.getValue().getConnector() + + " to " + + listener); } listener.nodeUP(entry.getValue(), ++count == copy.size()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java index c646059..cf62e17 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.client.impl; +import java.util.Map; + import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.TopologyMember; @@ -23,8 +25,6 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.ConfigurationHelper; -import java.util.Map; - public final class TopologyMemberImpl implements TopologyMember { private static final long serialVersionUID = 1123652191795626133L; @@ -115,8 +115,7 @@ public final class TopologyMemberImpl implements TopologyMember { public boolean isMember(TransportConfiguration configuration) { if (getConnector().getA() != null && getConnector().getA().isSameParams(configuration) || getConnector().getB() != null && getConnector().getB().isSameParams(configuration)) { return true; - } - else { + } else { return false; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java index 4e9fce5..7c40602 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java @@ -150,8 +150,7 @@ public final class DiscoveryGroup implements ActiveMQComponent { try { endpoint.close(false); - } - catch (Exception e1) { + } catch (Exception e1) { ActiveMQClientLogger.LOGGER.errorStoppingDiscoveryBroadcastEndpoint(endpoint, e1); } @@ -163,8 +162,7 @@ public final class DiscoveryGroup implements ActiveMQComponent { ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery(); } } - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new ActiveMQInterruptedException(e); } @@ -176,8 +174,7 @@ public final class DiscoveryGroup implements ActiveMQComponent { Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STOPPED, props); try { notificationService.sendNotification(notification); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorSendingNotifOnDiscoveryStop(e); } } @@ -207,8 +204,7 @@ public final class DiscoveryGroup implements ActiveMQComponent { while (started && !received && (toWait > 0 || timeout == 0)) { try { waitLock.wait(toWait); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new ActiveMQInterruptedException(e); } @@ -238,8 +234,7 @@ public final class DiscoveryGroup implements ActiveMQComponent { if (currentUniqueID == null) { uniqueIDMap.put(originatingNodeID, uniqueID); - } - else { + } else { if (!currentUniqueID.equals(uniqueID)) { ActiveMQClientLogger.LOGGER.multipleServersBroadcastingSameNode(originatingNodeID); uniqueIDMap.put(originatingNodeID, uniqueID); @@ -266,12 +261,10 @@ public final class DiscoveryGroup implements ActiveMQComponent { } break; } - } - catch (Exception e) { + } catch (Exception e) { if (!started) { return; - } - else { + } else { ActiveMQClientLogger.LOGGER.errorReceivingPacketInDiscovery(e); } } @@ -332,8 +325,7 @@ public final class DiscoveryGroup implements ActiveMQComponent { waitLock.notifyAll(); } - } - catch (Throwable e) { + } catch (Throwable e) { ActiveMQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e); } } @@ -357,8 +349,7 @@ public final class DiscoveryGroup implements ActiveMQComponent { for (DiscoveryListener listener : listeners) { try { listener.connectorsChanged(getDiscoveryEntries()); - } - catch (Throwable t) { + } catch (Throwable t) { // Catch it so exception doesn't prevent other listeners from running ActiveMQClientLogger.LOGGER.failedToCallListenerInDiscovery(t); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java index 10818fa..921f97d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageImpl.java @@ -200,8 +200,7 @@ public abstract class MessageImpl implements MessageInternal { buffer.writeNullableSimpleString(address); if (userID == null) { buffer.writeByte(DataConstants.NULL); - } - else { + } else { buffer.writeByte(DataConstants.NOT_NULL); buffer.writeBytes(userID.asBytes()); } @@ -221,8 +220,7 @@ public abstract class MessageImpl implements MessageInternal { byte[] bytes = new byte[16]; buffer.readBytes(bytes); userID = new UUID(UUID.TYPE_TIME_BASED, bytes); - } - else { + } else { userID = null; } type = buffer.readByte(); @@ -449,7 +447,6 @@ public abstract class MessageImpl implements MessageInternal { tmpbodyBuffer.setMessage(this); this.bodyBuffer = tmpbodyBuffer; - } @Override @@ -804,8 +801,7 @@ public abstract class MessageImpl implements MessageInternal { if (str == null) { return null; - } - else { + } else { return str.toString(); } } @@ -916,8 +912,7 @@ public abstract class MessageImpl implements MessageInternal { bodyBuffer.readBytes(buffer2); bodyBuffer.readerIndex(readerIndex2); return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1); - } - else { + } else { return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1); } @@ -955,8 +950,7 @@ public abstract class MessageImpl implements MessageInternal { if ((bodySize + 4) > buffer.capacity()) { buffer.setIndex(0, bodySize); buffer.writeInt(0); - } - else { + } else { buffer.setIndex(0, bodySize + DataConstants.SIZE_INT); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java index a7930cb..206796d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/ClientPacketDecoder.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.artemis.core.protocol; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG; - import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl; import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl; @@ -27,6 +24,9 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveClientLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG; + public class ClientPacketDecoder extends PacketDecoder { private static final long serialVersionUID = 6952614096979334582L;
