http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 6b75e74..cd86191 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -184,7 +184,9 @@ public abstract class AbstractJournalStorageManager implements StorageManager { protected final Set<Long> largeMessagesToDelete = new HashSet<>(); - public AbstractJournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutorService) { + public AbstractJournalStorageManager(final Configuration config, + final ExecutorFactory executorFactory, + final ScheduledExecutorService scheduledExecutorService) { this(config, executorFactory, scheduledExecutorService, null); } @@ -212,6 +214,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { /** * Called during initialization. Used by implementations to setup Journals, Stores etc... + * * @param config * @param criticalErrorListener */ @@ -241,8 +244,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { digest = md.digest(); } return Base64.encodeBytes(digest); - } - catch (Exception e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -315,15 +317,13 @@ public abstract class AbstractJournalStorageManager implements StorageManager { // Non transactional operations - @Override public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long recordID) throws Exception { readLock(); try { installLargeMessageConfirmationOnTX(tx, recordID); messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID, new DeleteEncoding(JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, messageID)); - } - finally { + } finally { readUnLock(); } } @@ -336,8 +336,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendDeleteRecord(recordID, true, getContext()); - } - finally { + } finally { readUnLock(); } } @@ -356,12 +355,10 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (message.isLargeMessage()) { messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, new LargeMessageEncoding((LargeServerMessage) message), false, getContext(false)); - } - else { + } else { messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_MESSAGE, message, false, getContext(false)); } - } - finally { + } finally { readUnLock(); } } @@ -371,8 +368,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, getContext(last && syncNonTransactional)); - } - finally { + } finally { readUnLock(); } } @@ -392,8 +388,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, getContext(syncNonTransactional)); - } - finally { + } finally { readUnLock(); } } @@ -405,8 +400,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { long ackID = idGenerator.generateID(); position.setRecordID(ackID); messageJournal.appendAddRecord(ackID, JournalRecordIds.ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position), syncNonTransactional, getContext(syncNonTransactional)); - } - finally { + } finally { readUnLock(); } } @@ -420,8 +414,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { // increasing chances of losing deletes. // The StorageManager should verify messages without references messageJournal.appendDeleteRecord(messageID, false, getContext(false)); - } - finally { + } finally { readUnLock(); } } @@ -432,8 +425,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, getContext(syncNonTransactional)); - } - finally { + } finally { readUnLock(); } } @@ -445,8 +437,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID); messageJournal.appendAddRecord(recordID, JournalRecordIds.DUPLICATE_ID, encoding, syncNonTransactional, getContext(syncNonTransactional)); - } - finally { + } finally { readUnLock(); } } @@ -456,8 +447,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional)); - } - finally { + } finally { readUnLock(); } } @@ -474,13 +464,11 @@ public abstract class AbstractJournalStorageManager implements StorageManager { try { if (message.isLargeMessage()) { messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, new LargeMessageEncoding(((LargeServerMessage) message))); - } - else { + } else { messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_MESSAGE, message); } - } - finally { + } finally { readUnLock(); } } @@ -491,8 +479,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { try { pageTransaction.setRecordID(generateID()); messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, pageTransaction); - } - finally { + } finally { readUnLock(); } } @@ -504,8 +491,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages)); - } - finally { + } finally { readUnLock(); } } @@ -515,8 +501,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendUpdateRecord(pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages), syncNonTransactional, getContext(syncNonTransactional)); - } - finally { + } finally { readUnLock(); } } @@ -526,8 +511,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendUpdateRecordTransactional(txID, messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID)); - } - finally { + } finally { readUnLock(); } } @@ -539,8 +523,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendUpdateRecordTransactional(txID, messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID)); - } - finally { + } finally { readUnLock(); } } @@ -552,8 +535,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { long ackID = idGenerator.generateID(); position.setRecordID(ackID); messageJournal.appendAddRecordTransactional(txID, ackID, JournalRecordIds.ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position)); - } - finally { + } finally { readUnLock(); } } @@ -575,8 +557,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendDeleteRecordTransactional(txID, ackID); - } - finally { + } finally { readUnLock(); } } @@ -594,8 +575,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { messageJournal.appendAddRecord(id, JournalRecordIds.HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true, getContext(true)); return id; - } - finally { + } finally { readUnLock(); } } @@ -606,8 +586,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { try { messageJournal.appendDeleteRecord(id, true, getContext(true)); - } - finally { + } finally { readUnLock(); } } @@ -617,8 +596,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendDeleteRecord(recordID, false); - } - finally { + } finally { readUnLock(); } } @@ -630,8 +608,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { try { messageJournal.appendUpdateRecordTransactional(txID, ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding); - } - finally { + } finally { readUnLock(); } } @@ -641,8 +618,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional, getContext(syncTransactional)); - } - finally { + } finally { readUnLock(); } } @@ -678,8 +654,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { */ getContext(true).done(); } - } - finally { + } finally { readUnLock(); } } @@ -689,8 +664,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendRollbackRecord(txID, syncTransactional, getContext(syncTransactional)); - } - finally { + } finally { readUnLock(); } } @@ -705,8 +679,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.DUPLICATE_ID, encoding); - } - finally { + } finally { readUnLock(); } } @@ -721,8 +694,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendUpdateRecordTransactional(txID, recordID, JournalRecordIds.DUPLICATE_ID, encoding); - } - finally { + } finally { readUnLock(); } } @@ -732,8 +704,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendDeleteRecordTransactional(txID, recordID); - } - finally { + } finally { readUnLock(); } } @@ -754,8 +725,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional)); - } - finally { + } finally { readUnLock(); } } @@ -769,8 +739,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { addressSetting.setStoreId(id); bindingsJournal.appendAddRecord(id, JournalRecordIds.ADDRESS_SETTING_RECORD, addressSetting, true); mapPersistedAddressSettings.put(addressSetting.getAddressMatch(), addressSetting); - } - finally { + } finally { readUnLock(); } } @@ -795,8 +764,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { persistedRoles.setStoreId(id); bindingsJournal.appendAddRecord(id, JournalRecordIds.SECURITY_RECORD, persistedRoles, true); mapPersistedRoles.put(persistedRoles.getAddressMatch(), persistedRoles); - } - finally { + } finally { readUnLock(); } } @@ -806,8 +774,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { bindingsJournal.appendAddRecord(journalID, JournalRecordIds.ID_COUNTER_RECORD, BatchingIDGenerator.createIDEncodingSupport(id), true); - } - finally { + } finally { readUnLock(); } } @@ -817,8 +784,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { bindingsJournal.appendDeleteRecord(journalD, false); - } - finally { + } finally { readUnLock(); } } @@ -830,8 +796,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false); - } - finally { + } finally { readUnLock(); } } @@ -844,8 +809,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false); - } - finally { + } finally { readUnLock(); } } @@ -942,8 +906,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (message == null) { ActiveMQServerLogger.LOGGER.cannotFindMessage(record.id); - } - else { + } else { queueMessages.put(messageID, new AddMessageRecord(message)); } @@ -960,8 +923,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (queueMessages == null) { ActiveMQServerLogger.LOGGER.journalCannotFindQueue(encoding.queueID, messageID); - } - else { + } else { AddMessageRecord rec = queueMessages.remove(messageID); if (rec == null) { @@ -982,14 +944,12 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (queueMessages == null) { ActiveMQServerLogger.LOGGER.journalCannotFindQueueDelCount(encoding.queueID); - } - else { + } else { AddMessageRecord rec = queueMessages.get(messageID); if (rec == null) { ActiveMQServerLogger.LOGGER.journalCannotFindMessageDelCount(messageID); - } - else { + } else { rec.setDeliveryCount(encoding.count); } } @@ -1005,8 +965,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX); pageTX.onUpdate(pageUpdate.recods, null, null); - } - else { + } else { PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl(); pageTransactionInfo.decode(buff); @@ -1029,15 +988,13 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (queueMessages == null) { ActiveMQServerLogger.LOGGER.journalCannotFindQueueScheduled(encoding.queueID, messageID); - } - else { + } else { AddMessageRecord rec = queueMessages.get(messageID); if (rec == null) { ActiveMQServerLogger.LOGGER.cannotFindMessage(messageID); - } - else { + } else { rec.setScheduledDeliveryTime(encoding.scheduledDeliveryTime); } } @@ -1077,8 +1034,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (sub != null) { sub.reloadACK(encoding.position); - } - else { + } else { ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloading(encoding.queueID); messageJournal.appendDeleteRecord(record.id, false); @@ -1095,8 +1051,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (sub != null) { sub.getCounter().loadValue(record.id, encoding.getValue()); - } - else { + } else { ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID()); messageJournal.appendDeleteRecord(record.id, false); } @@ -1113,8 +1068,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (sub != null) { sub.getCounter().loadInc(record.id, encoding.getValue()); - } - else { + } else { ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID()); messageJournal.appendDeleteRecord(record.id, false); } @@ -1132,8 +1086,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (sub != null) { sub.reloadPageCompletion(encoding.position); - } - else { + } else { ActiveMQServerLogger.LOGGER.cantFindQueueOnPageComplete(encoding.queueID); messageJournal.appendDeleteRecord(record.id, false); } @@ -1200,8 +1153,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { journalLoader.postLoad(messageJournal, resourceManager, duplicateIDMap); journalLoaded = true; return info; - } - finally { + } finally { readUnLock(); } } @@ -1239,8 +1191,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { bindingsJournal.appendAddRecord(groupBinding.getId(), JournalRecordIds.GROUP_RECORD, groupingEncoding, true); - } - finally { + } finally { readUnLock(); } } @@ -1250,8 +1201,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { bindingsJournal.appendDeleteRecordTransactional(tx, groupBinding.getId()); - } - finally { + } finally { readUnLock(); } } @@ -1271,8 +1221,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { bindingsJournal.appendAddRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding); - } - finally { + } finally { readUnLock(); } } @@ -1282,8 +1231,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { bindingsJournal.appendDeleteRecordTransactional(tx, queueBindingID); - } - finally { + } finally { readUnLock(); } } @@ -1295,8 +1243,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { long recordID = idGenerator.generateID(); messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value)); return recordID; - } - finally { + } finally { readUnLock(); } } @@ -1308,8 +1255,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { final long recordID = idGenerator.generateID(); messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value), true, getContext()); return recordID; - } - finally { + } finally { readUnLock(); } } @@ -1321,8 +1267,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { final long recordID = idGenerator.generateID(); messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value)); return recordID; - } - finally { + } finally { readUnLock(); } } @@ -1337,8 +1282,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { // on the counter messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER, pendingInc, true); return recordID; - } - finally { + } finally { readUnLock(); } } @@ -1348,8 +1292,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendDeleteRecordTransactional(txID, recordID); - } - finally { + } finally { readUnLock(); } } @@ -1359,8 +1302,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendDeleteRecordTransactional(txID, recordID); - } - finally { + } finally { readUnLock(); } } @@ -1370,8 +1312,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.appendDeleteRecordTransactional(txID, recordID); - } - finally { + } finally { readUnLock(); } } @@ -1396,23 +1337,18 @@ public abstract class AbstractJournalStorageManager implements StorageManager { PersistentQueueBindingEncoding bindingEncoding = newBindingEncoding(id, buffer); queueBindingInfos.add(bindingEncoding); - } - else if (rec == JournalRecordIds.ID_COUNTER_RECORD) { + } else if (rec == JournalRecordIds.ID_COUNTER_RECORD) { idGenerator.loadState(record.id, buffer); - } - else if (rec == JournalRecordIds.GROUP_RECORD) { + } else if (rec == JournalRecordIds.GROUP_RECORD) { GroupingEncoding encoding = newGroupEncoding(id, buffer); groupingInfos.add(encoding); - } - else if (rec == JournalRecordIds.ADDRESS_SETTING_RECORD) { + } else if (rec == JournalRecordIds.ADDRESS_SETTING_RECORD) { PersistedAddressSetting setting = newAddressEncoding(id, buffer); mapPersistedAddressSettings.put(setting.getAddressMatch(), setting); - } - else if (rec == JournalRecordIds.SECURITY_RECORD) { + } else if (rec == JournalRecordIds.SECURITY_RECORD) { PersistedRoles roles = newSecurityRecord(id, buffer); mapPersistedRoles.put(roles.getAddressMatch(), roles); - } - else { + } else { throw new IllegalStateException("Invalid record type " + rec); } } @@ -1428,8 +1364,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { messageJournal.lineUpContext(getContext()); - } - finally { + } finally { readUnLock(); } } @@ -1533,8 +1468,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { info[1] = messageJournal.loadInternalOnly(); return info; - } - finally { + } finally { readUnLock(); } } @@ -1572,8 +1506,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { try { confirmPendingLargeMessage(largeServerMessage.getPendingRecordID()); largeServerMessage.setPendingRecordID(-1); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); } } @@ -1666,8 +1599,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (record.isUpdate) { PageTransactionInfo pgTX = pagingManager.getTransaction(pageTransactionInfo.getTransactionID()); pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages()); - } - else { + } else { pageTransactionInfo.setCommitted(false); tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo); @@ -1709,8 +1641,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (sub != null) { sub.reloadPreparedACK(tx, encoding.position); referencesToAck.add(new PagedReferenceImpl(encoding.position, null, sub)); - } - else { + } else { ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID); } break; @@ -1731,8 +1662,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { if (sub != null) { sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue()); sub.notEmpty(); - } - else { + } else { ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID()); } @@ -1775,8 +1705,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { OperationContext getContext(final boolean sync) { if (sync) { return getContext(); - } - else { + } else { return DummyOperationContext.getInstance(); } }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java index cdcb4fa..c61bf64 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java @@ -128,8 +128,7 @@ public final class BatchingIDGenerator implements IDGenerator { // If the ID is intended to the journal you would know soon enough // so we just ignore this for now logger.debug("The journalStorageManager is not loaded. " + "This is probably ok as long as it's a notification being sent after shutdown"); - } - else { + } else { storeID(counter.getAndIncrement(), nextID); } } @@ -146,8 +145,7 @@ public final class BatchingIDGenerator implements IDGenerator { private void storeID(final long journalID, final long id) { try { storageManager.storeID(journalID, id); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.batchingIdError(e); } } @@ -155,8 +153,7 @@ public final class BatchingIDGenerator implements IDGenerator { private void deleteID(final long journalID) { try { storageManager.deleteID(journalID); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.batchingIdError(e); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index 741053f..34a47bf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -212,13 +212,11 @@ public final class DescribeJournal { out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + ", result=" + subsCounter.getValue()); if (subsCounter.getValue() < 0) { out.println(" #NegativeCounter!!!!"); - } - else { + } else { out.println(); } out.println(); - } - else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) { + } else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) { PageCountRecordInc encoding = (PageCountRecordInc) newObjectEncoding(info); long queueIDForCounter = encoding.getQueueID(); @@ -229,8 +227,7 @@ public final class DescribeJournal { out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " increased by " + encoding.getValue()); if (subsCounter.getValue() < 0) { out.println(" #NegativeCounter!!!!"); - } - else { + } else { out.println(); } out.println(); @@ -286,29 +283,24 @@ public final class DescribeJournal { Object o = newObjectEncoding(info); if (info.getUserRecordType() == JournalRecordIds.ADD_MESSAGE) { messageCount++; - } - else if (info.getUserRecordType() == JournalRecordIds.ADD_REF) { + } else if (info.getUserRecordType() == JournalRecordIds.ADD_REF) { ReferenceDescribe ref = (ReferenceDescribe) o; Integer count = messageRefCounts.get(ref.refEncoding.queueID); if (count == null) { count = 1; messageRefCounts.put(ref.refEncoding.queueID, count); - } - else { + } else { messageRefCounts.put(ref.refEncoding.queueID, count + 1); } - } - else if (info.getUserRecordType() == JournalRecordIds.ACKNOWLEDGE_REF) { + } else if (info.getUserRecordType() == JournalRecordIds.ACKNOWLEDGE_REF) { AckDescribe ref = (AckDescribe) o; Integer count = messageRefCounts.get(ref.refEncoding.queueID); if (count == null) { messageRefCounts.put(ref.refEncoding.queueID, 0); - } - else { + } else { messageRefCounts.put(ref.refEncoding.queueID, count - 1); } - } - else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) { + } else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) { PageCountRecord encoding = (PageCountRecord) o; queueIDForCounter = encoding.getQueueID(); @@ -316,8 +308,7 @@ public final class DescribeJournal { subsCounter.loadValue(info.id, encoding.getValue()); subsCounter.processReload(); - } - else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) { + } else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) { PageCountRecordInc encoding = (PageCountRecordInc) o; queueIDForCounter = encoding.getQueueID(); @@ -350,15 +341,13 @@ public final class DescribeJournal { out.println("- " + describeRecord(info, o)); if (info.getUserRecordType() == 31) { preparedMessageCount++; - } - else if (info.getUserRecordType() == 32) { + } else if (info.getUserRecordType() == 32) { ReferenceDescribe ref = (ReferenceDescribe) o; Integer count = preparedMessageRefCount.get(ref.refEncoding.queueID); if (count == null) { count = 1; preparedMessageRefCount.put(ref.refEncoding.queueID, count); - } - else { + } else { preparedMessageRefCount.put(ref.refEncoding.queueID, count + 1); } } @@ -428,8 +417,7 @@ public final class DescribeJournal { private static Xid toXid(final byte[] data) { try { return XidCodecSupport.decodeXid(ActiveMQBuffers.wrappedBuffer(data)); - } - catch (Exception e) { + } catch (Exception e) { return null; } } @@ -492,8 +480,7 @@ public final class DescribeJournal { pageUpdate.decode(buffer); return pageUpdate; - } - else { + } else { PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl(); pageTransactionInfo.decode(buffer); @@ -655,8 +642,7 @@ public final class DescribeJournal { if (refEncoding == null) { if (other.refEncoding != null) return false; - } - else if (!refEncoding.equals(other.refEncoding)) + } else if (!refEncoding.equals(other.refEncoding)) return false; return true; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index 3c313fe..5d30b48 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -57,41 +57,17 @@ public class JDBCJournalStorageManager extends JournalStorageManager { if (sqlProviderFactory == null) { sqlProviderFactory = new GenericSQLProvider.Factory(); } - bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), - sqlProviderFactory.create(dbConf.getBindingsTableName()), - dbConf.getBindingsTableName(), - scheduledExecutorService, - executorFactory.getExecutor()); - messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), - sqlProviderFactory.create(dbConf.getMessageTableName()), - dbConf.getMessageTableName(), - scheduledExecutorService, - executorFactory.getExecutor()); - largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), - sqlProviderFactory.create(dbConf.getLargeMessageTableName()), - dbConf.getLargeMessageTableName(), - executor); - } - else { + bindingsJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getBindingsTableName()), dbConf.getBindingsTableName(), scheduledExecutorService, executorFactory.getExecutor()); + messageJournal = new JDBCJournalImpl(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getMessageTableName()), dbConf.getMessageTableName(), scheduledExecutorService, executorFactory.getExecutor()); + largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getDataSource(), sqlProviderFactory.create(dbConf.getLargeMessageTableName()), dbConf.getLargeMessageTableName(), executor); + } else { String driverClassName = dbConf.getJdbcDriverClassName(); - bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), - driverClassName, - JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName()), - scheduledExecutorService, - executorFactory.getExecutor()); - messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), - driverClassName, - JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName()), - scheduledExecutorService, - executorFactory.getExecutor()); - largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), - driverClassName, - JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName()), - executor); + bindingsJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getBindingsTableName()), scheduledExecutorService, executorFactory.getExecutor()); + messageJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getMessageTableName()), scheduledExecutorService, executorFactory.getExecutor()); + largeMessagesFactory = new JDBCSequentialFileFactory(dbConf.getJdbcConnectionUrl(), driverClassName, JDBCUtils.getSQLProvider(driverClassName, dbConf.getLargeMessageTableName()), executor); } largeMessagesFactory.start(); - } - catch (Exception e) { + } catch (Exception e) { criticalErrorListener.onIOException(e, e.getMessage(), null); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 2aefbef..9eaa203 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -66,6 +66,7 @@ import org.apache.activemq.artemis.utils.ExecutorFactory; import org.jboss.logging.Logger; public class JournalStorageManager extends AbstractJournalStorageManager { + private static final Logger logger = Logger.getLogger(JournalStorageManager.class); private SequentialFileFactory journalFF; @@ -82,7 +83,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager { private ReplicationManager replicator; - public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutorService) { + public JournalStorageManager(final Configuration config, + final ExecutorFactory executorFactory, + final ScheduledExecutorService scheduledExecutorService) { this(config, executorFactory, scheduledExecutorService, null); } @@ -121,12 +124,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager { ActiveMQServerLogger.LOGGER.journalUseAIO(); journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener); - } - else if (config.getJournalType() == JournalType.NIO) { + } else if (config.getJournalType() == JournalType.NIO) { ActiveMQServerLogger.LOGGER.journalUseNIO(); journalFF = new NIOSequentialFileFactory(config.getJournalLocation(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener); - } - else { + } else { throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType()); } @@ -142,8 +143,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { if (config.getPageMaxConcurrentIO() != 1) { pageMaxConcurrentIO = new Semaphore(config.getPageMaxConcurrentIO()); - } - else { + } else { pageMaxConcurrentIO = null; } } @@ -215,8 +215,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { if (token != null) { try { token.waitCompletion(5000); - } - catch (Exception e) { + } catch (Exception e) { // ignore it } } @@ -242,8 +241,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { SequentialFile msg = createFileForLargeMessage(largeMsgId, LargeMessageExtension.DURABLE); try { msg.delete(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeMsgId); } if (replicator != null) { @@ -256,8 +254,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { protected SequentialFile createFileForLargeMessage(final long messageID, final boolean durable) { if (durable) { return createFileForLargeMessage(messageID, LargeMessageExtension.DURABLE); - } - else { + } else { return createFileForLargeMessage(messageID, LargeMessageExtension.TEMPORARY); } } @@ -268,9 +265,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager { * @param buff * @return * @throws Exception - */ - protected LargeServerMessage parseLargeMessage(final Map<Long, ServerMessage> messages, - final ActiveMQBuffer buff) throws Exception { + */ protected LargeServerMessage parseLargeMessage(final Map<Long, ServerMessage> messages, + final ActiveMQBuffer buff) throws Exception { LargeServerMessage largeMessage = createLargeMessage(); LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage); @@ -304,8 +300,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { try { if (isReplicated()) replicator.pageClosed(storeName, pageNumber); - } - finally { + } finally { readUnLock(); } } @@ -318,8 +313,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { try { if (isReplicated()) replicator.pageDeleted(storeName, pageNumber); - } - finally { + } finally { readUnLock(); } } @@ -338,8 +332,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { try { if (isReplicated()) replicator.pageWrite(message, pageNumber); - } - finally { + } finally { readUnLock(); } } @@ -363,8 +356,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), true, getContext(true)); return recordID; - } - finally { + } finally { readUnLock(); } } @@ -378,8 +370,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { // We set a temporary record (short lived) on the journal // to avoid a situation where the server is restarted and pending large message stays on forever largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID())); - } - catch (Exception e) { + } catch (Exception e) { throw new ActiveMQInternalErrorException(e.getMessage(), e); } } @@ -398,8 +389,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } return; } - } - finally { + } finally { readUnLock(); } } @@ -416,12 +406,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager { // The confirm could only be done after the actual delete is done confirmLargeMessage(largeServerMessage); - } - finally { + } finally { readUnLock(); } - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeServerMessage.getMessageID()); } } @@ -430,8 +418,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { if (executor == null) { deleteAction.run(); - } - else { + } else { executor.execute(deleteAction); } } @@ -467,8 +454,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } return largeMessage; - } - finally { + } finally { readUnLock(); } } @@ -549,12 +535,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager { bindingsFiles = prepareJournalForCopy(originalBindingsJournal, JournalContent.BINDINGS, nodeID, autoFailBack); pageFilesToSync = getPageInformationForSync(pagingManager); pendingLargeMessages = recoverPendingLargeMessages(); - } - finally { + } finally { pagingManager.unlock(); } - } - finally { + } finally { originalMessageJournal.synchronizationUnlock(); originalBindingsJournal.synchronizationUnlock(); } @@ -564,8 +548,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { // We need to send the list while locking otherwise part of the body might get sent too soon // it will send a list of IDs that we are allocating replicator.sendLargeMessageIdListMessage(pendingLargeMessages); - } - finally { + } finally { storageManagerLock.writeLock().unlock(); } @@ -580,17 +563,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager { replicator.sendSynchronizationDone(nodeID, initialReplicationSyncTimeout); performCachedLargeMessageDeletes(); } - } - finally { + } finally { storageManagerLock.writeLock().unlock(); } - } - catch (Exception e) { + } catch (Exception e) { logger.warn(e.getMessage(), e); stopReplication(); throw e; - } - finally { + } finally { pagingManager.resumeCleanup(); // Re-enable compact and reclaim of journal files originalBindingsJournal.replicationSyncFinished(); @@ -610,8 +590,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { continue; if (replicator != null) { replicator.syncLargeMessageFile(seqFile, size, id); - } - else { + } else { throw ActiveMQMessageBundle.BUNDLE.replicatorIsNull(); } } @@ -632,15 +611,13 @@ public class JournalStorageManager extends AbstractJournalStorageManager { return info; } - private void checkAndCreateDir(final File dir, final boolean create) { if (!dir.exists()) { if (create) { if (!dir.mkdirs()) { throw new IllegalStateException("Failed to create directory " + dir); } - } - else { + } else { throw ActiveMQMessageBundle.BUNDLE.cannotCreateDir(dir.getAbsolutePath()); } } @@ -708,8 +685,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { messageJournal = originalMessageJournal; try { replicator.stop(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorStoppingReplicationManager(e); } replicator = null; @@ -717,8 +693,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { // startReplication. // This method should not be called under normal circumstances performCachedLargeMessageDeletes(); - } - finally { + } finally { storageManagerLock.writeLock().unlock(); } } @@ -736,8 +711,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { if (isReplicated()) { replicator.largeMessageWrite(messageId, bytes); } - } - finally { + } finally { readUnLock(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java index 045be46..8953291 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java @@ -54,8 +54,7 @@ public class LargeMessageTXFailureCallback implements TransactionFailureCallback try { LargeServerMessage serverMessage = journalStorageManager.parseLargeMessage(messages, buff); serverMessage.decrementDelayDeletionCount(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.journalError(e); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index fd7eeeb..90b1fdd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -131,8 +131,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L bufferOut.writeBytes(bufferRead.array(), 0, bytesRead); } - } - catch (Exception e) { + } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } } @@ -159,8 +158,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L delayDeletionCount.incrementAndGet(); try { incrementRefCount(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorIncrementDelayDeletionCount(e); } } @@ -190,8 +188,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L try { deleteFile(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.error(e.getMessage(), e); } } @@ -244,8 +241,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L if (file != null && file.isOpen()) { try { file.close(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e); } } @@ -281,7 +277,6 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L validateFile(); - byte[] bufferBytes = new byte[100 * 1024]; ByteBuffer buffer = ByteBuffer.wrap(bufferBytes); @@ -300,11 +295,9 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L byte[] bufferToWrite; if (bytesRead <= 0) { break; - } - else if (bytesRead == bufferBytes.length) { + } else if (bytesRead == bufferBytes.length) { bufferToWrite = bufferBytes; - } - else { + } else { bufferToWrite = new byte[bytesRead]; System.arraycopy(bufferBytes, 0, bufferToWrite, 0, bytesRead); } @@ -325,9 +318,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L return newMessage; - - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.lareMessageErrorCopying(e, this); return null; } @@ -349,14 +340,12 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L private static String toDate(long timestamp) { if (timestamp == 0) { return "0"; - } - else { + } else { return new java.util.Date(timestamp).toString(); } } - // Package protected --------------------------------------------- // Protected ----------------------------------------------------- @@ -382,8 +371,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L bodySize = file.size(); } - } - catch (Exception e) { + } catch (Exception e) { // TODO: There is an IO_ERROR on trunk now, this should be used here instead throw new ActiveMQInternalErrorException(e.getMessage(), e); } @@ -399,8 +387,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L protected void openFile() throws Exception { if (file == null) { validateFile(); - } - else if (!file.isOpen()) { + } else if (!file.isOpen()) { file.open(); } } @@ -425,8 +412,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L } cFile = file.cloneFile(); cFile.open(); - } - catch (Exception e) { + } catch (Exception e) { throw new ActiveMQException(ActiveMQExceptionType.INTERNAL_ERROR, e.getMessage(), e); } } @@ -437,8 +423,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L if (cFile != null) { cFile.close(); } - } - catch (Exception e) { + } catch (Exception e) { throw new ActiveMQInternalErrorException(e.getMessage(), e); } } @@ -447,8 +432,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L public int encode(final ByteBuffer bufferRead) throws ActiveMQException { try { return cFile.read(bufferRead); - } - catch (Exception e) { + } catch (Exception e) { throw new ActiveMQInternalErrorException(e.getMessage(), e); } } @@ -477,8 +461,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L if (bodySize < 0) { try { bodySize = file.size(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java index 0c95b75..42126d4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java @@ -33,7 +33,6 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage { private static final Logger logger = Logger.getLogger(LargeServerMessageInSync.class); - private final LargeServerMessage mainLM; private final StorageManager storageManager; private SequentialFile appendFile; @@ -64,14 +63,12 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage { FileIOUtil.copyData(appendFile, mainSeqFile, buffer); deleteAppendFile(); - } - else { + } else { if (logger.isTraceEnabled()) { logger.trace("joinSyncedData, appendFile is null, ignoring joinSyncedData on " + mainLM); } } - } - catch (Throwable e) { + } catch (Throwable e) { logger.warn("Error while sincing data on largeMessageInSync::" + mainLM); } @@ -79,8 +76,6 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage { logger.trace("joinedSyncData on " + mainLM + " finished with " + mainSeqFile.size()); } - - syncDone = true; } @@ -109,8 +104,7 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage { if (appendFile != null && appendFile.isOpen()) { try { appendFile.close(); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e); } } @@ -121,8 +115,7 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage { deleted = true; try { mainLM.deleteFile(); - } - finally { + } finally { deleteAppendFile(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java index 06e07f7..ea96ce1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java @@ -58,8 +58,7 @@ public class OperationContextImpl implements OperationContext { if (token == null) { if (executorFactory == null) { return null; - } - else { + } else { token = new OperationContextImpl(executorFactory.getExecutor()); OperationContextImpl.threadLocalContext.set(token); } @@ -145,8 +144,7 @@ public class OperationContextImpl implements OperationContext { if (storeOnlyTasks == null) { storeOnlyTasks = new LinkedList<>(); } - } - else { + } else { if (tasks == null) { tasks = new LinkedList<>(); minimalReplicated = replicationLineUp.intValue(); @@ -166,16 +164,13 @@ public class OperationContextImpl implements OperationContext { // No need to use an executor here or a context switch // there are no actions pending.. hence we can just execute the task directly on the same thread executeNow = true; - } - else { + } else { execute(completion); } - } - else { + } else { if (storeOnly) { storeOnlyTasks.add(new TaskHolder(completion)); - } - else { + } else { tasks.add(new TaskHolder(completion)); } } @@ -218,8 +213,7 @@ public class OperationContextImpl implements OperationContext { execute(holder.task); iter.remove(); - } - else { + } else { // End of list here. No other task will be completed after this break; } @@ -240,14 +234,12 @@ public class OperationContextImpl implements OperationContext { // If any IO is done inside the callback, it needs to be done on a new context OperationContextImpl.clearContext(); task.done(); - } - finally { + } finally { executorsPending.decrementAndGet(); } } }); - } - catch (Throwable e) { + } catch (Throwable e) { ActiveMQServerLogger.LOGGER.errorExecutingAIOCallback(e); executorsPending.decrementAndGet(); task.onError(ActiveMQExceptionType.INTERNAL_ERROR.getCode(), "It wasn't possible to complete IO operation - " + e.getMessage()); @@ -317,8 +309,7 @@ public class OperationContextImpl implements OperationContext { if (timeout == 0) { waitCallback.waitCompletion(); return true; - } - else { + } else { return waitCallback.waitCompletion(timeout); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java index bac83e6..9e252e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java @@ -37,8 +37,7 @@ public final class TXLargeMessageConfirmationOperation extends TransactionOperat for (Long msg : confirmedMessages) { try { journalStorageManager.confirmPendingLargeMessage(msg); - } - catch (Throwable e) { + } catch (Throwable e) { ActiveMQServerLogger.LOGGER.journalErrorConfirmingLargeMessage(e, msg); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DuplicateIDEncoding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DuplicateIDEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DuplicateIDEncoding.java index fccc2e2..1dd41ff 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DuplicateIDEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DuplicateIDEncoding.java @@ -91,8 +91,7 @@ public class DuplicateIDEncoding implements EncodingSupport { long id = buff.getLong(); bridgeRepresentation = "nodeUUID=" + uuid.toString() + " messageID=" + id; - } - catch (Throwable ignored) { + } catch (Throwable ignored) { bridgeRepresentation = null; } } @@ -100,8 +99,7 @@ public class DuplicateIDEncoding implements EncodingSupport { if (bridgeRepresentation != null) { return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + " / " + bridgeRepresentation + "]"; - } - else { + } else { return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]"; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/BindingType.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/BindingType.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/BindingType.java index 4576dea..35f96f8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/BindingType.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/BindingType.java @@ -30,14 +30,11 @@ public enum BindingType { public int toInt() { if (equals(BindingType.LOCAL_QUEUE)) { return BindingType.LOCAL_QUEUE_INDEX; - } - else if (equals(BindingType.REMOTE_QUEUE)) { + } else if (equals(BindingType.REMOTE_QUEUE)) { return BindingType.REMOTE_QUEUE_INDEX; - } - else if (equals(BindingType.DIVERT)) { + } else if (equals(BindingType.DIVERT)) { return BindingType.DIVERT_INDEX; - } - else { + } else { throw ActiveMQMessageBundle.BUNDLE.cannotConvertToInt(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index c6b0e9e..4c0c4b0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -70,24 +70,27 @@ public interface PostOffice extends ActiveMQComponent { RoutingStatus route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception; - RoutingStatus route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception; + RoutingStatus route(ServerMessage message, + QueueCreator queueCreator, + Transaction tx, + boolean direct) throws Exception; RoutingStatus route(ServerMessage message, - QueueCreator queueCreator, - Transaction tx, - boolean direct, - boolean rejectDuplicates) throws Exception; + QueueCreator queueCreator, + Transaction tx, + boolean direct, + boolean rejectDuplicates) throws Exception; RoutingStatus route(ServerMessage message, - QueueCreator queueCreator, - RoutingContext context, - boolean direct) throws Exception; + QueueCreator queueCreator, + RoutingContext context, + boolean direct) throws Exception; RoutingStatus route(ServerMessage message, - QueueCreator queueCreator, - RoutingContext context, - boolean direct, - boolean rejectDuplicates) throws Exception; + QueueCreator queueCreator, + RoutingContext context, + boolean direct, + boolean rejectDuplicates) throws Exception; MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java index d569a8c..e67011a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/AddressImpl.java @@ -93,19 +93,15 @@ public class AddressImpl implements Address { if (currMatch.equals(WildcardAddressManager.SINGLE_WORD_SIMPLESTRING)) { pos++; matchPos++; - } - else if (currMatch.equals(WildcardAddressManager.ANY_WORDS_SIMPLESTRING)) { + } else if (currMatch.equals(WildcardAddressManager.ANY_WORDS_SIMPLESTRING)) { if (matchPos == addressParts.length - 1) { pos++; matchPos++; - } - else if (next == null) { + } else if (next == null) { return false; - } - else if (matchPos == add.getAddressParts().length - 1) { + } else if (matchPos == add.getAddressParts().length - 1) { return true; - } - else { + } else { nextToMatch = add.getAddressParts()[matchPos + 1]; while (curr != null) { if (curr.equals(nextToMatch)) { @@ -120,8 +116,7 @@ public class AddressImpl implements Address { } matchPos++; } - } - else { + } else { if (!curr.equals(currMatch)) { return false; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index c86b722..e5df737 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -98,8 +98,7 @@ public final class BindingsImpl implements Bindings { } if (binding.isExclusive()) { exclusiveBindings.add(binding); - } - else { + } else { SimpleString routingName = binding.getRoutingName(); List<Binding> bindings = routingNameBindingMap.get(routingName); @@ -131,8 +130,7 @@ public final class BindingsImpl implements Bindings { public void removeBinding(final Binding binding) { if (binding.isExclusive()) { exclusiveBindings.remove(binding); - } - else { + } else { SimpleString routingName = binding.getRoutingName(); List<Binding> bindings = routingNameBindingMap.get(routingName); @@ -190,8 +188,7 @@ public final class BindingsImpl implements Bindings { Binding binding; try { binding = bindings.get(pos); - } - catch (IndexOutOfBoundsException e) { + } catch (IndexOutOfBoundsException e) { // This can occur if binding is removed while in route if (!bindings.isEmpty()) { pos = 0; @@ -199,8 +196,7 @@ public final class BindingsImpl implements Bindings { length = bindings.size(); continue; - } - else { + } else { break; } } @@ -228,8 +224,7 @@ public final class BindingsImpl implements Bindings { theBinding.route(message, context); return true; - } - else { + } else { return false; } } @@ -283,11 +278,9 @@ public final class BindingsImpl implements Bindings { if (ids != null) { routeFromCluster(message, context, ids); - } - else if (groupingHandler != null && groupRouting && groupId != null) { + } else if (groupingHandler != null && groupRouting && groupId != null) { routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0); - } - else { + } else { if (logger.isTraceEnabled()) { logger.trace("Routing message " + message + " on binding=" + this); } @@ -344,8 +337,7 @@ public final class BindingsImpl implements Bindings { Binding binding; try { binding = bindings.get(pos); - } - catch (IndexOutOfBoundsException e) { + } catch (IndexOutOfBoundsException e) { // This can occur if binding is removed while in route if (!bindings.isEmpty()) { pos = 0; @@ -353,8 +345,7 @@ public final class BindingsImpl implements Bindings { length = bindings.size(); continue; - } - else { + } else { break; } } @@ -370,8 +361,7 @@ public final class BindingsImpl implements Bindings { pos = incrementPos(pos, length); break; - } - else { + } else { //https://issues.jboss.org/browse/HORNETQ-1254 When !routeWhenNoConsumers, // the localQueue should always have the priority over the secondary bindings if (lastLowPriorityBinding == -1 || messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) && binding instanceof LocalQueueBinding) { @@ -388,8 +378,7 @@ public final class BindingsImpl implements Bindings { if (lastLowPriorityBinding != -1) { try { theBinding = bindings.get(lastLowPriorityBinding); - } - catch (IndexOutOfBoundsException e) { + } catch (IndexOutOfBoundsException e) { // This can occur if binding is removed while in route if (!bindings.isEmpty()) { pos = 0; @@ -397,8 +386,7 @@ public final class BindingsImpl implements Bindings { lastLowPriorityBinding = -1; continue; - } - else { + } else { break; } } @@ -465,8 +453,7 @@ public final class BindingsImpl implements Bindings { } routeAndCheckNull(message, context, resp, theBinding, groupId, tries); - } - else { + } else { // ok, we need to find the binding and route it Binding chosen = locateBinding(resp.getChosenClusterName(), bindings); @@ -494,8 +481,7 @@ public final class BindingsImpl implements Bindings { // and let's route it if (theBinding != null) { theBinding.route(message, context); - } - else { + } else { if (resp != null) { groupingHandler.forceRemove(resp.getGroupId(), resp.getClusterName()); } @@ -504,8 +490,7 @@ public final class BindingsImpl implements Bindings { //in this case all we can do is remove it and try again. if (tries < MAX_GROUP_RETRY) { routeUsingStrictOrdering(message, context, groupingHandler, groupId, tries + 1); - } - else { + } else { ActiveMQServerLogger.LOGGER.impossibleToRouteGrouped(); route(message, context, false); } @@ -590,12 +575,10 @@ public final class BindingsImpl implements Bindings { if (binding != null) { if (idsToAckList.contains(bindingID)) { binding.routeWithAck(message, context); - } - else { + } else { binding.route(message, context); } - } - else { + } else { ActiveMQServerLogger.LOGGER.bindingNotFound(bindingID, message.toString(), this.toString()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java index 28896c3..ce3c782 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java @@ -95,8 +95,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { storageManager.deleteDuplicateIDTransactional(txID, id.getB()); deleteCount--; - } - else { + } else { ByteArrayHolder bah = new ByteArrayHolder(id.getA()); Pair<ByteArrayHolder, Long> pair = new Pair<>(bah, id.getB()); @@ -155,8 +154,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { private String describeID(byte[] duplicateID, long id) { if (id != 0) { return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID); - } - else { + } else { return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id; } } @@ -189,8 +187,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { tx.markAsRollbackOnly(new ActiveMQDuplicateIdException()); } return false; - } - else { + } else { addToCache(duplID, tx, true); return true; } @@ -208,8 +205,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } addToCacheInMemory(duplID, recordID); - } - else { + } else { if (persist) { recordID = storageManager.generateID(); storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID); @@ -219,8 +215,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { if (instantAdd) { addToCacheInMemory(duplID, recordID); - } - else { + } else { if (logger.isTraceEnabled()) { logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx); } @@ -266,8 +261,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { if (id.getB() != null) { try { storageManager.deleteDuplicateID(id.getB()); - } - catch (Exception e) { + } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e); } } @@ -284,8 +278,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } holder.pos = pos; - } - else { + } else { id = new Pair<>(holder, recordID >= 0 ? recordID : null); if (logger.isTraceEnabled()) { @@ -389,8 +382,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } return true; - } - else { + } else { return false; } }
