http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index f449788..d60dc56 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -254,8 +254,7 @@ public class QueueImpl implements Queue { }); try { flush.await(10, TimeUnit.SECONDS); - } - catch (Exception ignored) { + } catch (Exception ignored) { } synchronized (this) { @@ -361,16 +360,14 @@ public class QueueImpl implements Queue { if (addressSettingsRepository != null) { addressSettingsRepositoryListener = new AddressSettingsRepositoryListener(); addressSettingsRepository.registerListener(addressSettingsRepositoryListener); - } - else { + } else { expiryAddress = null; } if (pageSubscription != null) { pageSubscription.setQueue(this); this.pageIterator = pageSubscription.iterator(); - } - else { + } else { this.pageIterator = null; } @@ -477,8 +474,7 @@ public class QueueImpl implements Queue { synchronized (QueueImpl.this) { if (groups.remove(groupIDToRemove) != null) { logger.debug("Removing group after unproposal " + groupID + " from queue " + QueueImpl.this); - } - else { + } else { logger.debug("Couldn't remove Removing group " + groupIDToRemove + " after unproposal on queue " + QueueImpl.this); } } @@ -593,13 +589,11 @@ public class QueueImpl implements Queue { if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) { return true; - } - else { + } else { ActiveMQServerLogger.LOGGER.timeoutFlushInTransit(getName().toString(), getAddress().toString()); return false; } - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); return false; } @@ -627,8 +621,7 @@ public class QueueImpl implements Queue { scheduledRunners.incrementAndGet(); try { getExecutor().execute(deliverRunner); - } - catch (RejectedExecutionException ignored) { + } catch (RejectedExecutionException ignored) { // no-op scheduledRunners.decrementAndGet(); } @@ -649,8 +642,7 @@ public class QueueImpl implements Queue { public void run() { try { cancelRedistributor(); - } - catch (Exception e) { + } catch (Exception e) { // nothing that could be done anyway.. just logging ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); } @@ -667,8 +659,7 @@ public class QueueImpl implements Queue { if (pageSubscription != null && pageSubscription.isPaging()) { // When in page mode, we don't want to have concurrent IO on the same PageStore return pageSubscription.getExecutor(); - } - else { + } else { return executor; } } @@ -796,8 +787,7 @@ public class QueueImpl implements Queue { futures.add(redistributorFuture); } - } - else { + } else { internalAddRedistributor(executor); } } @@ -853,8 +843,7 @@ public class QueueImpl implements Queue { if (filter1 == null) { return true; - } - else { + } else { if (filter1.match(message)) { return true; } @@ -926,8 +915,7 @@ public class QueueImpl implements Queue { return messageReferences.size() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount(); - } - else { + } else { return messageReferences.size() + getScheduledCount() + deliveringCount.get(); } } @@ -975,8 +963,7 @@ public class QueueImpl implements Queue { if (ref.isPaged()) { pageSubscription.ack((PagedReference) ref); postAcknowledge(ref); - } - else { + } else { ServerMessage message = ref.getMessage(); boolean durableRef = message.isDurable() && durable; @@ -989,11 +976,9 @@ public class QueueImpl implements Queue { if (reason == AckReason.EXPIRED) { messagesExpired++; - } - else if (reason == AckReason.KILLED) { + } else if (reason == AckReason.KILLED) { messagesKilled++; - } - else { + } else { messagesAcknowledged++; } @@ -1010,8 +995,7 @@ public class QueueImpl implements Queue { pageSubscription.ackTx(tx, (PagedReference) ref); getRefsOperation(tx).addAck(ref); - } - else { + } else { ServerMessage message = ref.getMessage(); boolean durableRef = message.isDurable() && durable; @@ -1027,11 +1011,9 @@ public class QueueImpl implements Queue { if (reason == AckReason.EXPIRED) { messagesExpired++; - } - else if (reason == AckReason.KILLED) { + } else if (reason == AckReason.KILLED) { messagesKilled++; - } - else { + } else { messagesAcknowledged++; } } @@ -1094,8 +1076,7 @@ public class QueueImpl implements Queue { } resetAllIterators(); - } - else { + } else { decDelivering(); } } @@ -1112,8 +1093,7 @@ public class QueueImpl implements Queue { logger.trace("moving expired reference " + ref + " to address = " + messageExpiryAddress + " from queue=" + this.getName()); } move(null, messageExpiryAddress, ref, false, AckReason.EXPIRED); - } - else { + } else { if (logger.isTraceEnabled()) { logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName()); } @@ -1148,13 +1128,11 @@ public class QueueImpl implements Queue { private SimpleString extractAddress(ServerMessage message) { if (message.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) { return message.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS); - } - else { + } else { return message.getAddress(); } } - @Override public SimpleString getExpiryAddress() { return this.expiryAddress; @@ -1186,8 +1164,7 @@ public class QueueImpl implements Queue { public long getMessagesAdded() { if (pageSubscription != null) { return messagesAdded + pageSubscription.getCounter().getValue() - pagedReferences.get(); - } - else { + } else { return messagesAdded; } } @@ -1302,8 +1279,7 @@ public class QueueImpl implements Queue { count++; txCount++; messageAction.actMessage(tx, reference); - } - else { + } else { addTail(reference, false); } @@ -1404,8 +1380,7 @@ public class QueueImpl implements Queue { } tx.commit(); - } - catch (Exception e) { + } catch (Exception e) { tx.rollback(); throw e; } @@ -1514,8 +1489,7 @@ public class QueueImpl implements Queue { iter.remove(); refRemoved(ref); } - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref); } @@ -1525,12 +1499,10 @@ public class QueueImpl implements Queue { if ((!hasElements || expired) && pageIterator != null && pageIterator.hasNext()) { scheduleDepage(true); } - } - finally { + } finally { try { iter.close(); - } - catch (Throwable ignored) { + } catch (Throwable ignored) { } scannerRunning.decrementAndGet(); } @@ -1592,8 +1564,7 @@ public class QueueImpl implements Queue { incDelivering(); try { move(null, toAddress, ref, rejectDuplicate, AckReason.NORMAL); - } - catch (Exception e) { + } catch (Exception e) { decDelivering(); throw e; } @@ -1681,8 +1652,7 @@ public class QueueImpl implements Queue { if (targetQueue != null) { move(originalMessageAddress, tx, ref, false, false, targetQueue.longValue()); - } - else { + } else { move(originalMessageAddress, tx, ref, false, false); } @@ -1744,8 +1714,7 @@ public class QueueImpl implements Queue { public synchronized void pause() { try { this.flushDeliveriesInTransit(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); } paused = true; @@ -1834,8 +1803,7 @@ public class QueueImpl implements Queue { private int getPriority(MessageReference ref) { try { return ref.getMessage().getPriority(); - } - catch (Throwable e) { + } catch (Throwable e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); return 4; // the default one in case of failure } @@ -1939,14 +1907,12 @@ public class QueueImpl implements Queue { if (holder.iter.hasNext()) { ref = holder.iter.next(); - } - else { + } else { ref = null; } if (ref == null) { noDelivery++; - } - else { + } else { if (checkExpired(ref)) { if (logger.isTraceEnabled()) { logger.trace("Reference " + ref + " being expired"); @@ -1993,13 +1959,11 @@ public class QueueImpl implements Queue { } handled++; - } - else if (status == HandleStatus.BUSY) { + } else if (status == HandleStatus.BUSY) { holder.iter.repeat(); noDelivery++; - } - else if (status == HandleStatus.NO_MATCH) { + } else if (status == HandleStatus.NO_MATCH) { // nothing to be done on this case, the iterators will just jump next } } @@ -2012,8 +1976,7 @@ public class QueueImpl implements Queue { // this shouldn't really happen, // however I'm keeping this as an assertion case future developers ever change the logic here on this class ActiveMQServerLogger.LOGGER.nonDeliveryHandled(); - } - else { + } else { if (logger.isDebugEnabled()) { logger.debug(this + "::All the consumers were busy, giving up now"); } @@ -2064,13 +2027,11 @@ public class QueueImpl implements Queue { private SimpleString extractGroupID(MessageReference ref) { if (internalQueue) { return null; - } - else { + } else { try { // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID); - } - catch (Throwable e) { + } catch (Throwable e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); return null; } @@ -2137,14 +2098,14 @@ public class QueueImpl implements Queue { if (logger.isDebugEnabled()) { logger.debug("Queue Memory Size after depage on queue=" + this.getName() + - " is " + - queueMemorySize.get() + - " with maxSize = " + - maxSize + - ". Depaged " + - depaged + - " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + - ", queueDelivering=" + deliveringCount.get()); + " is " + + queueMemorySize.get() + + " with maxSize = " + + maxSize + + ". Depaged " + + depaged + + " messages, pendingDelivery=" + messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + + ", queueDelivering=" + deliveringCount.get()); } } @@ -2207,8 +2168,7 @@ public class QueueImpl implements Queue { sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress()); return false; - } - else { + } else { // Second check Redelivery Delay if (!ignoreRedeliveryDelay && redeliveryDelay > 0) { redeliveryDelay = calculateRedeliveryDelay(addressSettings, deliveryCount); @@ -2299,8 +2259,7 @@ public class QueueImpl implements Queue { if (targetBinding == null) { ActiveMQServerLogger.LOGGER.unableToFindTargetQueue(targetNodeID); - } - else { + } else { logger.debug("Routing on binding: " + targetBinding); targetBinding.route(copyMessage, routingContext); } @@ -2364,8 +2323,7 @@ public class QueueImpl implements Queue { logger.debug("Message now destined for " + remoteQueueBinding.getRoutingName() + " with ID: " + remoteQueueBinding.getRemoteQueueID() + " on address " + copyMessage.getAddress() + " on node " + targetNodeID); } break; - } - else { + } else { logger.debug("Failed to match: " + remoteQueueBinding); } } @@ -2408,12 +2366,10 @@ public class QueueImpl implements Queue { if (bindingList.getBindings().isEmpty()) { ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress); - } - else { + } else { move(expiryAddress, tx, ref, true, true); } - } - else { + } else { ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name); acknowledge(tx, ref); @@ -2425,7 +2381,8 @@ public class QueueImpl implements Queue { sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress()); } - private void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref, + private void sendToDeadLetterAddress(final Transaction tx, + final MessageReference ref, final SimpleString deadLetterAddress) throws Exception { if (deadLetterAddress != null) { Bindings bindingList = postOffice.getBindingsForAddress(deadLetterAddress); @@ -2433,13 +2390,11 @@ public class QueueImpl implements Queue { if (bindingList.getBindings().isEmpty()) { ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress); ref.acknowledge(tx, AckReason.KILLED); - } - else { + } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name); move(tx, deadLetterAddress, ref, false, AckReason.KILLED); } - } - else { + } else { ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(name); ref.acknowledge(tx, AckReason.KILLED); @@ -2455,8 +2410,7 @@ public class QueueImpl implements Queue { if (originalTX != null) { tx = originalTX; - } - else { + } else { // if no TX we create a new one to commit at the end tx = new TransactionImpl(storageManager); } @@ -2546,8 +2500,7 @@ public class QueueImpl implements Queue { try { consumer.proceedDeliver(reference); deliveriesInTransit.countDown(); - } - catch (Throwable t) { + } catch (Throwable t) { deliveriesInTransit.countDown(); ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference); @@ -2555,8 +2508,7 @@ public class QueueImpl implements Queue { // If the consumer throws an exception we remove the consumer try { removeConsumer(consumer); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e); } @@ -2576,18 +2528,15 @@ public class QueueImpl implements Queue { try { expire(reference); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorExpiringRef(e); } return true; - } - else { + } else { return false; } - } - catch (Throwable e) { + } catch (Throwable e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); return false; } @@ -2597,15 +2546,13 @@ public class QueueImpl implements Queue { HandleStatus status; try { status = consumer.handle(reference); - } - catch (Throwable t) { + } catch (Throwable t) { ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference); // If the consumer throws an exception we remove the consumer try { removeConsumer(consumer); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e); } return HandleStatus.BUSY; @@ -2642,20 +2589,19 @@ public class QueueImpl implements Queue { try { message = ref.getMessage(); - } - catch (Throwable e) { + } catch (Throwable e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); message = null; } - if (message == null) return; + if (message == null) + return; boolean durableRef = message.isDurable() && queue.durable; try { message.decrementRefCount(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorDecrementingRefCount(e); } @@ -2679,8 +2625,7 @@ public class QueueImpl implements Queue { // There is a startup check to remove non referenced messages case these deletes fail try { storageManager.deleteMessage(message.getMessageID()); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorRemovingMessage(e, message.getMessageID()); } } @@ -2787,11 +2732,9 @@ public class QueueImpl implements Queue { synchronized (QueueImpl.this.deliverRunner) { deliver(); } - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorDelivering(e); - } - finally { + } finally { scheduledRunners.decrementAndGet(); } } @@ -2809,8 +2752,7 @@ public class QueueImpl implements Queue { public void run() { try { depage(scheduleExpiry); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorDelivering(e); } } @@ -2966,12 +2908,10 @@ public class QueueImpl implements Queue { logger.debug("Cancelled slow-consumer-reaper thread for queue \"" + getName() + "\""); } } - } - else { + } else { if (slowConsumerReaperRunnable == null) { scheduleSlowConsumerReaper(settings); - } - else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() || + } else if (slowConsumerReaperRunnable.checkPeriod != settings.getSlowConsumerCheckPeriod() || slowConsumerReaperRunnable.threshold != settings.getSlowConsumerThreshold() || !slowConsumerReaperRunnable.policy.equals(settings.getSlowConsumerPolicy())) { slowConsumerReaperFuture.cancel(false); @@ -2987,9 +2927,9 @@ public class QueueImpl implements Queue { if (logger.isDebugEnabled()) { logger.debug("Scheduled slow-consumer-reaper thread for queue \"" + getName() + - "\"; slow-consumer-check-period=" + settings.getSlowConsumerCheckPeriod() + - ", slow-consumer-threshold=" + settings.getSlowConsumerThreshold() + - ", slow-consumer-policy=" + settings.getSlowConsumerPolicy()); + "\"; slow-consumer-check-period=" + settings.getSlowConsumerCheckPeriod() + + ", slow-consumer-threshold=" + settings.getSlowConsumerThreshold() + + ", slow-consumer-policy=" + settings.getSlowConsumerPolicy()); } } @@ -3029,8 +2969,7 @@ public class QueueImpl implements Queue { if (logger.isDebugEnabled()) { logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer."); } - } - else if (consumerRate < threshold) { + } else if (consumerRate < threshold) { RemotingConnection connection = null; ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer(); RemotingService remotingService = server.getRemotingService(); @@ -3049,8 +2988,7 @@ public class QueueImpl implements Queue { connection.killMessage(server.getNodeID()); remotingService.removeConnection(connection.getID()); connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())); - } - else if (policy.equals(SlowConsumerPolicy.NOTIFY)) { + } else if (policy.equals(SlowConsumerPolicy.NOTIFY)) { TypedProperties props = new TypedProperties(); props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, getConsumerCount()); @@ -3072,8 +3010,7 @@ public class QueueImpl implements Queue { ManagementService managementService = ((PostOfficeImpl) postOffice).getServer().getManagementService(); try { managementService.sendNotification(notification); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.failedToSendSlowConsumerNotification(notification, e); } }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index b90a30e..8e3a94b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -101,8 +101,7 @@ public class RefsOperation extends TransactionOperationAbstract { toCancel.addFirst(ref); } - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorCheckingDLQ(e); } } @@ -140,8 +139,7 @@ public class RefsOperation extends TransactionOperationAbstract { message.incrementRefCount(); } ackedTX.commit(true); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); } } @@ -165,12 +163,10 @@ public class RefsOperation extends TransactionOperationAbstract { private void decrementRefCount(MessageReference refmsg) { try { refmsg.getMessage().decrementRefCount(); - } - catch (NonExistentPage e) { + } catch (NonExistentPage e) { // This could happen on after commit, since the page could be deleted on file earlier by another thread logger.debug(e); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java index 55c5ada..7c333a5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java @@ -22,8 +22,8 @@ import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LiveNodeLocator; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java index 6bbeaf4..6e0aa95 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java @@ -56,8 +56,7 @@ public final class RoutingContextImpl implements RoutingContext { if (queue.isDurable()) { listing.getDurableQueues().add(queue); - } - else { + } else { listing.getNonDurableQueues().add(queue); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java index c6d3f70..3e6f005 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java @@ -102,7 +102,10 @@ public class ScaleDownHandler { return num; } - public long scaleDownMessages(ClientSessionFactory sessionFactory, SimpleString nodeId, String user, String password) throws Exception { + public long scaleDownMessages(ClientSessionFactory sessionFactory, + SimpleString nodeId, + String user, + String password) throws Exception { long messageCount = 0; targetNodeId = nodeId != null ? nodeId.toString() : getTargetNodeId(sessionFactory); @@ -127,8 +130,7 @@ public class ScaleDownHandler { if (address.toString().startsWith("sf.")) { messageCount += scaleDownSNF(address, queues, producer); - } - else { + } else { messageCount += scaleDownRegularMessages(address, queues, session, producer); } @@ -176,8 +178,7 @@ public class ScaleDownHandler { if (controlEntry.getKey() == loopQueue) { // no need to lookup on itself, we just add it queuesFound.add(controlEntry.getValue()); - } - else if (controlEntry.getValue().lookup(messageReference)) { + } else if (controlEntry.getValue().lookup(messageReference)) { logger.debug("Message existed on queue " + controlEntry.getKey().getID() + " removeID=" + controlEntry.getValue().getQueueID()); queuesFound.add(controlEntry.getValue()); } @@ -196,8 +197,7 @@ public class ScaleDownHandler { if (logger.isDebugEnabled()) { if (messageReference.isPaged()) { logger.debug("*********************<<<<< Scaling down pdgmessage " + message); - } - else { + } else { logger.debug("*********************<<<<< Scaling down message " + message); } } @@ -223,8 +223,7 @@ public class ScaleDownHandler { } return messageCount; - } - finally { + } finally { pageStore.enableCleanup(); pageStore.getCursorProvider().scheduleCleanup(); } @@ -242,8 +241,7 @@ public class ScaleDownHandler { if (queueOnTarget) { propertyEnd = targetNodeId; - } - else { + } else { propertyEnd = address.toString().substring(address.toString().lastIndexOf(".")); } @@ -283,8 +281,7 @@ public class ScaleDownHandler { if (queueOnTarget) { message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, oldRouteToIDs); - } - else { + } else { message.putBytesProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS, oldRouteToIDs); } @@ -337,8 +334,7 @@ public class ScaleDownHandler { if (queueIDs.containsKey(queueName)) { queueID = queueIDs.get(queueName); - } - else { + } else { queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddress()); queueIDs.put(queueName, queueID); // store it so we don't have to look it up every time } @@ -349,8 +345,7 @@ public class ScaleDownHandler { } queueIds.getA().add(queueID); } - } - else if (operation instanceof RefsOperation) { + } else if (operation instanceof RefsOperation) { RefsOperation refsOperation = (RefsOperation) operation; List<MessageReference> refs = refsOperation.getReferencesToAcknowledge(); for (MessageReference ref : refs) { @@ -361,8 +356,7 @@ public class ScaleDownHandler { if (queueIDs.containsKey(queueName)) { queueID = queueIDs.get(queueName); - } - else { + } else { queueID = createQueueIfNecessaryAndGetID(queueCreateSession, queue, message.getAddress()); queueIDs.put(queueName, queueID); // store it so we don't have to look it up every time } @@ -409,7 +403,7 @@ public class ScaleDownHandler { try (ClientSession session = sessionFactory.createSession(user, password, true, false, false, false, 0); ClientProducer producer = session.createProducer(managementAddress)) { //todo - https://issues.jboss.org/browse/HORNETQ-1336 - for (Map.Entry<SimpleString,List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet()) { + for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet()) { ClientMessage message = session.createMessage(false); List<Pair<byte[], Long>> list = entry.getValue(); String[] array = new String[list.size()]; @@ -545,8 +539,7 @@ public class ScaleDownHandler { if (subscription.contains((PagedReference) reference)) { return true; } - } - else { + } else { if (lastRef != null && lastRef.getMessage().equals(reference.getMessage())) { lastRef = null; @@ -579,8 +572,7 @@ public class ScaleDownHandler { if (initialRef == null) { initialRef = lastRef; - } - else { + } else { if (initialRef.equals(lastRef)) { if (!memoryIterator.hasNext()) { // if by coincidence we are at the end of the iterator, we just reset the iterator http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java index b70c1b1..6eaba4c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java @@ -143,8 +143,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { // if delay == 0 we will avoid races between adding the scheduler and finishing it ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(deliveryTime); scheduledExecutor.schedule(runnable, 0, TimeUnit.MILLISECONDS); - } - else if (!runnables.containsKey(deliveryTime)) { + } else if (!runnables.containsKey(deliveryTime)) { ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(deliveryTime); if (logger.isTraceEnabled()) { @@ -153,8 +152,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { runnables.put(deliveryTime, runnable); scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS); - } - else { + } else { if (logger.isTraceEnabled()) { logger.trace("Couldn't make another scheduler as " + deliveryTime + " is already set, now is " + now); } @@ -186,8 +184,8 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { // this is basically a hack to work around an OS or JDK bug! if (logger.isTraceEnabled()) { logger.trace("Scheduler is working around OS imprecisions on " + - "timing and re-scheduling an executor. now=" + now + - " and deliveryTime=" + deliveryTime); + "timing and re-scheduling an executor. now=" + now + + " and deliveryTime=" + deliveryTime); } ScheduledDeliveryHandlerImpl.this.scheduleDelivery(deliveryTime); } @@ -281,19 +279,16 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { // Even if ref1 and ref2 have the same delivery time, we only want to return 0 if they are identical if (ref1 == ref2) { return 0; - } - else { + } else { if (ref1.isTail() && !ref2.isTail()) { return 1; - } - else if (!ref1.isTail() && ref2.isTail()) { + } else if (!ref1.isTail() && ref2.isTail()) { return -1; } if (!ref1.isTail() && !ref2.isTail()) { return -1; - } - else { + } else { return 1; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 7de9fc5..24eacf5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -207,8 +207,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (browseOnly) { browserDeliverer = new BrowserDeliverer(messageQueue.totalIterator()); - } - else { + } else { messageQueue.addConsumer(this); } this.supportLargeMessage = supportLargeMessage; @@ -216,8 +215,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (credits != null) { if (credits == -1) { availableCredits = null; - } - else { + } else { availableCredits.set(credits); } } @@ -314,9 +312,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (callback != null && !callback.hasCredits(this) || availableCredits != null && availableCredits.get() <= 0) { if (logger.isDebugEnabled()) { logger.debug(this + " is busy for the lack of credits. Current credits = " + - availableCredits + - " Can't receive reference " + - ref); + availableCredits + + " Can't receive reference " + + ref); } return HandleStatus.BUSY; @@ -336,9 +334,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (largeMessageDeliverer != null) { if (logger.isDebugEnabled()) { logger.debug(this + " is busy delivering large message " + - largeMessageDeliverer + - ", can't deliver reference " + - ref); + largeMessageDeliverer + + ", can't deliver reference " + + ref); } return HandleStatus.BUSY; } @@ -412,12 +410,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // The deliverer was prepared during handle, as we can't have more than one pending large message // as it would return busy if there is anything pending largeMessageDeliverer.deliver(); - } - else { + } else { deliverStandardMessage(reference, message); } - } - finally { + } finally { lockDelivery.readLock().unlock(); callback.afterDelivery(); } @@ -495,8 +491,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { public void removeItself() throws Exception { if (browseOnly) { browserDeliverer.close(); - } - else { + } else { messageQueue.removeConsumer(this); } @@ -544,13 +539,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { forceDelivery(sequence, r); } }); - } - else { + } else { r.run(); } } - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorSendingForcedDelivery(e); } } @@ -567,11 +560,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (largeMessageDeliverer != null) { largeMessageDeliverer.finish(); } - } - catch (Throwable e) { + } catch (Throwable e) { ActiveMQServerLogger.LOGGER.errorResttingLargeMessage(e, largeMessageDeliverer); - } - finally { + } finally { largeMessageDeliverer = null; } @@ -584,8 +575,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { ref.acknowledge(tx); performACK = false; - } - else { + } else { refs.add(ref); updateDeliveryCountForCanceledRef(ref, failed); } @@ -626,8 +616,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // you restart the server try { this.started = browseOnly || started; - } - finally { + } finally { if (locked) { lockDelivery.writeLock().unlock(); } @@ -650,8 +639,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { return false; } return true; - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); return false; } @@ -665,8 +653,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { boolean locked = lockDelivery(); try { this.transferring = transferring; - } - finally { + } finally { if (locked) { lockDelivery.writeLock().unlock(); } @@ -706,22 +693,20 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // There may be messages already in the queue promptDelivery(); - } - else if (credits == 0) { + } else if (credits == 0) { // reset, used on slow consumers logger.debug(this + ":: FlowControl::Received reset flow control message"); availableCredits.set(0); - } - else { + } else { int previous = availableCredits.getAndAdd(credits); if (logger.isDebugEnabled()) { logger.debug(this + "::FlowControl::Received " + - credits + - " credits, previous value = " + - previous + - " currentValue = " + - availableCredits.get()); + credits + + " credits, previous value = " + + previous + + " currentValue = " + + availableCredits.get()); } if (previous <= 0 && previous + credits > 0) { @@ -738,14 +723,17 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { return messageQueue; } - - /** Remove references based on the protocolData. - * there will be an interval defined between protocolDataStart and protocolDataEnd. - * This method will fetch the delivering references, remove them from the delivering list and return a list. + /** + * Remove references based on the protocolData. + * there will be an interval defined between protocolDataStart and protocolDataEnd. + * This method will fetch the delivering references, remove them from the delivering list and return a list. * - * This will be useful for other protocols that will need this such as openWire or MQTT. */ + * This will be useful for other protocols that will need this such as openWire or MQTT. + */ @Override - public List<MessageReference> getDeliveringReferencesBasedOnProtocol(boolean remove, Object protocolDataStart, Object protocolDataEnd) { + public List<MessageReference> getDeliveringReferencesBasedOnProtocol(boolean remove, + Object protocolDataStart, + Object protocolDataEnd) { LinkedList<MessageReference> retReferences = new LinkedList<>(); boolean hit = false; synchronized (lock) { @@ -822,23 +810,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (startedTransaction) { tx.commit(); } - } - catch (ActiveMQException e) { + } catch (ActiveMQException e) { if (startedTransaction) { tx.rollback(); - } - else { + } else { tx.markAsRollbackOnly(e); } throw e; - } - catch (Throwable e) { + } catch (Throwable e) { ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e); ActiveMQException activeMQIllegalStateException = new ActiveMQIllegalStateException(e.getMessage()); if (startedTransaction) { tx.rollback(); - } - else { + } else { tx.markAsRollbackOnly(activeMQIllegalStateException); } throw activeMQIllegalStateException; @@ -846,15 +830,13 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } @Override - public void individualAcknowledge(Transaction tx, - final long messageID) throws Exception { + public void individualAcknowledge(Transaction tx, final long messageID) throws Exception { if (browseOnly) { return; } boolean startedTransaction = false; - if (logger.isTraceEnabled()) { logger.trace("individualACK messageID=" + messageID); } @@ -887,23 +869,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (startedTransaction) { tx.commit(); } - } - catch (ActiveMQException e) { + } catch (ActiveMQException e) { if (startedTransaction) { tx.rollback(); - } - else { + } else { tx.markAsRollbackOnly(e); } throw e; - } - catch (Throwable e) { + } catch (Throwable e) { ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e); ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage()); if (startedTransaction) { tx.rollback(); - } - else { + } else { tx.markAsRollbackOnly(hqex); } throw hqex; @@ -930,7 +908,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { ref.getQueue().cancel(ref, System.currentTimeMillis()); } - @Override public void backToDelivering(MessageReference reference) { deliveringRefs.addFirst(reference); @@ -1016,8 +993,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // if we don't acquire a lock, we will have NPE eventually if (largeMessageDeliverer != null) { resumeLargeMessage(); - } - else { + } else { forceDelivery(); } } @@ -1025,8 +1001,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private void forceDelivery() { if (browseOnly) { messageQueue.getExecutor().execute(browserDeliverer); - } - else { + } else { messageQueue.deliverAsync(); } } @@ -1047,9 +1022,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (logger.isTraceEnabled()) { logger.trace(this + "::FlowControl::delivery standard taking " + - packetSize + - " from credits, available now is " + - availableCredits); + packetSize + + " from credits, available now is " + + availableCredits); } } } @@ -1065,8 +1040,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (largeMessageDeliverer == null || largeMessageDeliverer.deliver()) { forceDelivery(); } - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorRunningLargeMessageDeliverer(e); } } @@ -1117,7 +1091,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (availableCredits != null && availableCredits.get() <= 0) { if (logger.isTraceEnabled()) { logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + - availableCredits); + availableCredits); } return false; @@ -1139,10 +1113,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (logger.isTraceEnabled()) { logger.trace(this + "::FlowControl::" + - " deliver initialpackage with " + - packetSize + - " delivered, available now = " + - availableCredits); + " deliver initialpackage with " + + packetSize + + " delivered, available now = " + + availableCredits); } } @@ -1152,12 +1126,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { resumeLargeMessage(); return false; - } - else { + } else { if (availableCredits != null && availableCredits.get() <= 0) { if (logger.isTraceEnabled()) { logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + - availableCredits); + availableCredits); } return false; @@ -1175,8 +1148,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (bodyBuffer.toByteBuffer().hasArray()) { body = bodyBuffer.toByteBuffer().array(); - } - else { + } else { body = new byte[0]; } @@ -1189,9 +1161,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (logger.isTraceEnabled()) { logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + - packetSize + - " available now=" + - availableCredits); + packetSize + + " available now=" + + availableCredits); } } @@ -1211,8 +1183,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { finish(); return true; - } - finally { + } finally { lockDelivery.readLock().unlock(); } } @@ -1275,8 +1246,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } current = null; - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, current); return; } @@ -1301,15 +1271,13 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { if (status == HandleStatus.HANDLED) { proceedDeliver(ref); - } - else if (status == HandleStatus.BUSY) { + } else if (status == HandleStatus.BUSY) { // keep a reference on the current message reference // to handle it next time the browser deliverer is executed current = ref; break; } - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorBrowserHandlingMessage(e, ref); break; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerInfo.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerInfo.java index fce327e..84ce874 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerInfo.java @@ -68,8 +68,7 @@ public class ServerInfo { try { pageStore = pagingManager.getPageStore(storeName); info.append(String.format("\t%s: %s%n", storeName, SizeFormatterUtil.sizeof(pageStore.getPageSizeBytes() * pageStore.getNumberOfPages()))); - } - catch (Exception e) { + } catch (Exception e) { info.append(String.format("\t%s: %s%n", storeName, e.getMessage())); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java index 26cbde7..984b682 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerMessageImpl.java @@ -50,8 +50,7 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage { if (MemorySize.is64bitArch()) { memoryOffset = 352; - } - else { + } else { memoryOffset = 232; } } @@ -115,8 +114,7 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage { if (pagingStore != null) { if (count == 1) { pagingStore.addSize(getMemoryEstimate() + MessageReferenceImpl.getMemoryEstimate()); - } - else { + } else { pagingStore.addSize(MessageReferenceImpl.getMemoryEstimate()); } } @@ -136,8 +134,7 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage { // release the buffer now buffer.byteBuf().release(); } - } - else { + } else { pagingStore.addSize(-MessageReferenceImpl.getMemoryEstimate()); } } @@ -228,8 +225,7 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage { if (originalQueue != null) { putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue); - } - else if (originalReference != null) { + } else if (originalReference != null) { putStringProperty(Message.HDR_ORIGINAL_QUEUE, originalReference.getQueue().getName()); } @@ -237,8 +233,7 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage { putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS)); putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getLongProperty(Message.HDR_ORIG_MESSAGE_ID)); - } - else { + } else { putStringProperty(Message.HDR_ORIGINAL_ADDRESS, other.getAddress()); putLongProperty(Message.HDR_ORIG_MESSAGE_ID, other.getMessageID()); @@ -280,8 +275,7 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage { public boolean storeIsPaging() { if (pagingStore != null) { return pagingStore.isPaging(); - } - else { + } else { return false; } } @@ -292,8 +286,7 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage { return "ServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", bodySize=" + this.getBodyBufferDuplicate().capacity() + ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this); - } - catch (Throwable e) { + } catch (Throwable e) { return "ServerMessage[messageID=" + messageID + "]"; } } @@ -301,8 +294,7 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage { private static String toDate(long timestamp) { if (timestamp == 0) { return "0"; - } - else { + } else { return new java.util.Date(timestamp).toString(); } @@ -328,12 +320,10 @@ public class ServerMessageImpl extends MessageImpl implements ServerMessage { if (duplicateID == null) { return null; - } - else { + } else { if (duplicateID instanceof SimpleString) { return ((SimpleString) duplicateID).getData(); - } - else { + } else { return (byte[]) duplicateID; } }
