http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index b45775c..e27ed30 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -23,13 +23,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; @@ -45,7 +45,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.RouteContextList; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -172,7 +171,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { */ void confirmPendingLargeMessage(long recordID) throws Exception; - void storeMessage(ServerMessage message) throws Exception; + void storeMessage(Message message) throws Exception; void storeReference(long queueID, long messageID, boolean last) throws Exception; @@ -190,7 +189,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { void deleteDuplicateID(long recordID) throws Exception; - void storeMessageTransactional(long txID, ServerMessage message) throws Exception; + void storeMessageTransactional(long txID, Message message) throws Exception; void storeReferenceTransactional(long txID, long queueID, long messageID) throws Exception; @@ -225,7 +224,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { * @return a large message object * @throws Exception */ - LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception; + LargeServerMessage createLargeMessage(long id, Message message) throws Exception; enum LargeMessageExtension { DURABLE(".msg"), TEMPORARY(".tmp"), SYNC(".sync"); @@ -265,11 +264,6 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception; - /** - * FIXME Unused - */ - void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception; - void deletePageTransactional(long recordID) throws Exception; JournalLoadInformation loadMessageJournal(final PostOffice postOffice, @@ -383,7 +377,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { * needs to be sent to the journal * @throws Exception */ - boolean addToPage(PagingStore store, ServerMessage msg, Transaction tx, RouteContextList listCtx) throws Exception; + boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception; /** * Stops the replication of data from the live to the backup.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 2708c72..8311057 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -39,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; @@ -72,7 +73,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.Duplicate import org.apache.activemq.artemis.core.persistence.impl.journal.codec.FinishPageMessageOperation; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.GroupingEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.HeuristicCompletionEncoding; -import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountPendingImpl; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc; @@ -93,15 +94,14 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RouteContextList; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.JournalLoader; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.IDGenerator; @@ -174,8 +174,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager { private final boolean syncNonTransactional; - protected int perfBlastPages = -1; - protected boolean journalLoaded = false; private final IOCriticalErrorListener ioCriticalErrorListener; @@ -347,7 +345,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { } @Override - public void storeMessage(final ServerMessage message) throws Exception { + public void storeMessage(final Message message) throws Exception { if (message.getMessageID() <= 0) { // Sanity check only... this shouldn't happen unless there is a bug throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned(); @@ -359,9 +357,9 @@ public abstract class AbstractJournalStorageManager implements StorageManager { // appropriate if (message.isLargeMessage()) { - messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, new LargeMessageEncoding((LargeServerMessage) message), false, getContext(false)); + messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, LargeMessagePersister.getInstance(), message, false, getContext(false)); } else { - messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_MESSAGE, message, false, getContext(false)); + messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, message.getPersister(), message, false, getContext(false)); } } finally { readUnLock(); @@ -460,7 +458,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { // Transactional operations @Override - public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception { + public void storeMessageTransactional(final long txID, final Message message) throws Exception { if (message.getMessageID() <= 0) { throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned(); } @@ -468,9 +466,9 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { if (message.isLargeMessage()) { - messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, new LargeMessageEncoding(((LargeServerMessage) message))); + messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, LargeMessagePersister.getInstance(), message); } else { - messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_MESSAGE, message); + messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, message.getPersister(), message); } } finally { @@ -502,16 +500,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager { } @Override - public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception { - readLock(); - try { - messageJournal.appendUpdateRecord(pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages), syncNonTransactional, getContext(syncNonTransactional)); - } finally { - readUnLock(); - } - } - - @Override public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception { readLock(); try { @@ -833,7 +821,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>(); - Map<Long, ServerMessage> messages = new HashMap<>(); + Map<Long, Message> messages = new HashMap<>(); readLock(); try { @@ -884,9 +872,12 @@ public abstract class AbstractJournalStorageManager implements StorageManager { break; } case JournalRecordIds.ADD_MESSAGE: { - ServerMessage message = new ServerMessageImpl(record.id, 50); + throw new IllegalStateException("This is using old journal data, export your data and import at the correct version"); + } - message.decode(buff); + case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { + + Message message = MessagePersister.getInstance().decode(buff, null); messages.put(record.id, message); @@ -907,7 +898,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { queueMap.put(encoding.queueID, queueMessages); } - ServerMessage message = messages.get(messageID); + Message message = messages.get(messageID); if (message == null) { ActiveMQServerLogger.LOGGER.cannotFindMessage(record.id); @@ -1151,10 +1142,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager { pagingManager.processReload(); } - if (perfBlastPages != -1) { - messageJournal.perfBlast(perfBlastPages); - } - journalLoader.postLoad(messageJournal, resourceManager, duplicateIDMap); journalLoaded = true; return info; @@ -1581,7 +1568,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { } } - protected abstract LargeServerMessage parseLargeMessage(Map<Long, ServerMessage> messages, + protected abstract LargeServerMessage parseLargeMessage(Map<Long, Message> messages, ActiveMQBuffer buff) throws Exception; private void loadPreparedTransactions(final PostOffice postOffice, @@ -1603,7 +1590,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { List<MessageReference> referencesToAck = new ArrayList<>(); - Map<Long, ServerMessage> messages = new HashMap<>(); + Map<Long, Message> messages = new HashMap<>(); // Use same method as load message journal to prune out acks, so they don't get added. // Then have reacknowledge(tx) methods on queue, which needs to add the page size @@ -1623,9 +1610,11 @@ public abstract class AbstractJournalStorageManager implements StorageManager { break; } case JournalRecordIds.ADD_MESSAGE: { - ServerMessage message = new ServerMessageImpl(record.id, 50); - message.decode(buff); + break; + } + case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { + Message message = MessagePersister.getInstance().decode(buff, null); messages.put(record.id, message); @@ -1638,7 +1627,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { encoding.decode(buff); - ServerMessage message = messages.get(messageID); + Message message = messages.get(messageID); if (message == null) { throw new IllegalStateException("Cannot find message with id " + messageID); @@ -1915,7 +1904,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { @Override public boolean addToPage(PagingStore store, - ServerMessage msg, + Message msg, Transaction tx, RouteContextList listCtx) throws Exception { /** @@ -1939,4 +1928,5 @@ public abstract class AbstractJournalStorageManager implements StorageManager { } txoper.confirmedMessages.add(recordID); } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java index 3ca38e3..acf9c8e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java @@ -16,21 +16,21 @@ */ package org.apache.activemq.artemis.core.persistence.impl.journal; -import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.api.core.Message; public final class AddMessageRecord { - public AddMessageRecord(final ServerMessage message) { + public AddMessageRecord(final Message message) { this.message = message; } - final ServerMessage message; + final Message message; private long scheduledDeliveryTime; private int deliveryCount; - public ServerMessage getMessage() { + public Message getMessage() { return message; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index b9449bc..698978b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -44,7 +44,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAck import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DuplicateIDEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.HeuristicCompletionEncoding; -import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountPendingImpl; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc; @@ -53,8 +53,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLa import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.XidCodecSupport; @@ -64,6 +63,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE_PROTOCOL; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_REF; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.HEURISTIC_COMPLETION; @@ -445,16 +445,15 @@ public final class DescribeJournal { LargeServerMessage largeMessage = new LargeServerMessageImpl(storageManager); - LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage); - - messageEncoding.decode(buffer); + LargeMessagePersister.getInstance().decode(buffer, largeMessage); return new MessageDescribe(largeMessage); } case ADD_MESSAGE: { - ServerMessage message = new ServerMessageImpl(rec, 50); - - message.decode(buffer); + return "ADD-MESSAGE is not supported any longer, use export/import"; + } + case ADD_MESSAGE_PROTOCOL: { + Message message = MessagePersister.getInstance().decode(buffer, null); return new MessageDescribe(message); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java index cd1d526..348ac9b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java @@ -85,4 +85,7 @@ public final class JournalRecordIds { public static final byte PAGE_CURSOR_PENDING_COUNTER = 43; public static final byte ADDRESS_BINDING_RECORD = 44; + + public static final byte ADD_MESSAGE_PROTOCOL = 45; + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 51fd6cc..c31de52 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -49,12 +49,11 @@ import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; -import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage; import org.apache.activemq.artemis.core.replication.ReplicatedJournal; @@ -63,7 +62,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.jboss.logging.Logger; @@ -157,8 +155,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager { largeMessagesFactory = new NIOSequentialFileFactory(config.getLargeMessagesLocation(), false, criticalErrorListener, 1); - perfBlastPages = config.getJournalPerfBlastPages(); - if (config.getPageMaxConcurrentIO() != 1) { pageMaxConcurrentIO = new Semaphore(config.getPageMaxConcurrentIO()); } else { @@ -287,13 +283,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager { * @param buff * @return * @throws Exception - */ protected LargeServerMessage parseLargeMessage(final Map<Long, ServerMessage> messages, + */ protected LargeServerMessage parseLargeMessage(final Map<Long, Message> messages, final ActiveMQBuffer buff) throws Exception { LargeServerMessage largeMessage = createLargeMessage(); - LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage); - - messageEncoding.decode(buff); + LargeMessagePersister.getInstance().decode(buff, largeMessage); if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) { // for compatibility: couple with old behaviour, copying the old file to avoid message loss @@ -451,7 +445,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } @Override - public LargeServerMessage createLargeMessage(final long id, final MessageInternal message) throws Exception { + public LargeServerMessage createLargeMessage(final long id, final Message message) throws Exception { readLock(); try { if (isReplicated()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java index 8953291..33be342 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java @@ -21,21 +21,21 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.core.server.ServerMessage; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE; public class LargeMessageTXFailureCallback implements TransactionFailureCallback { private AbstractJournalStorageManager journalStorageManager; - private final Map<Long, ServerMessage> messages; + private final Map<Long, Message> messages; public LargeMessageTXFailureCallback(AbstractJournalStorageManager journalStorageManager, - final Map<Long, ServerMessage> messages) { + final Map<Long, Message> messages) { super(); this.journalStorageManager = journalStorageManager; this.messages = messages; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 90b1fdd..46bd335 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -25,17 +25,15 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.io.SequentialFile; -import org.apache.activemq.artemis.core.message.BodyEncoder; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.TypedProperties; import org.jboss.logging.Logger; -public final class LargeServerMessageImpl extends ServerMessageImpl implements LargeServerMessage { +public final class LargeServerMessageImpl extends CoreMessage implements LargeServerMessage { // Constants ----------------------------------------------------- private static final Logger logger = Logger.getLogger(LargeServerMessageImpl.class); @@ -59,9 +57,8 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L private final AtomicInteger delayDeletionCount = new AtomicInteger(0); - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- + // We cache this + private volatile int memoryEstimate = -1; public LargeServerMessageImpl(final JournalStorageManager storageManager) { this.storageManager = storageManager; @@ -85,14 +82,18 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L setMessageID(newID); } - // Public -------------------------------------------------------- + private static String toDate(long timestamp) { + if (timestamp == 0) { + return "0"; + } else { + return new java.util.Date(timestamp).toString(); + } + + } - /** - * @param pendingRecordID - */ @Override - public void setPendingRecordID(long pendingRecordID) { - this.pendingRecordID = pendingRecordID; + public boolean isServerMessage() { + return true; } @Override @@ -100,6 +101,14 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L return this.pendingRecordID; } + /** + * @param pendingRecordID + */ + @Override + public void setPendingRecordID(long pendingRecordID) { + this.pendingRecordID = pendingRecordID; + } + @Override public void setPaged() { paged = true; @@ -118,39 +127,19 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L bodySize += bytes.length; } - public void encodeBody(final ActiveMQBuffer bufferOut, final BodyEncoder context, final int size) { - try { - // This could maybe be optimized (maybe reading directly into bufferOut) - ByteBuffer bufferRead = ByteBuffer.allocate(size); - - int bytesRead = context.encode(bufferRead); - - bufferRead.flip(); - - if (bytesRead > 0) { - bufferOut.writeBytes(bufferRead.array(), 0, bytesRead); - } - - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - } - @Override public synchronized int getEncodeSize() { return getHeadersAndPropertiesEncodeSize(); } - @Override public void encode(final ActiveMQBuffer buffer1) { - super.encodeHeadersAndProperties(buffer1); + super.encodeHeadersAndProperties(buffer1.byteBuf()); } - @Override public void decode(final ActiveMQBuffer buffer1) { file = null; - super.decodeHeadersAndProperties(buffer1); + super.decodeHeadersAndProperties(buffer1.byteBuf()); } @Override @@ -175,7 +164,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L } @Override - public BodyEncoder getBodyEncoder() throws ActiveMQException { + public LargeBodyEncoder getBodyEncoder() throws ActiveMQException { validateFile(); return new DecodingContext(); } @@ -220,9 +209,6 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L storageManager.deleteLargeMessageFile(this); } - // We cache this - private volatile int memoryEstimate = -1; - @Override public synchronized int getMemoryEstimate() { if (memoryEstimate == -1) { @@ -248,28 +234,29 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L } @Override - public void setOriginalHeaders(final ServerMessage other, - final MessageReference originalReference, - final boolean expiry) { - super.setOriginalHeaders(other, originalReference, expiry); - - LargeServerMessageImpl otherLM = (LargeServerMessageImpl) other; - this.paged = otherLM.paged; - if (this.paged) { - this.removeProperty(Message.HDR_ORIG_MESSAGE_ID); + public void referenceOriginalMessage(final Message original, String originalQueue) { + + super.referenceOriginalMessage(original, originalQueue); + + if (original instanceof LargeServerMessageImpl) { + LargeServerMessageImpl otherLM = (LargeServerMessageImpl) original; + this.paged = otherLM.paged; + if (this.paged) { + this.removeProperty(Message.HDR_ORIG_MESSAGE_ID); + } } } @Override - public ServerMessage copy() { + public Message copy() { SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable); - ServerMessage newMessage = new LargeServerMessageImpl(this, properties, newfile, messageID); + Message newMessage = new LargeServerMessageImpl(this, properties, newfile, messageID); return newMessage; } @Override - public ServerMessage copy(final long newID) { + public Message copy(final long newID) { try { LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this); @@ -337,19 +324,6 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this); } - private static String toDate(long timestamp) { - if (timestamp == 0) { - return "0"; - } else { - return new java.util.Date(timestamp).toString(); - } - - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - @Override protected void finalize() throws Throwable { releaseResources(); @@ -400,7 +374,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L // Inner classes ------------------------------------------------- - class DecodingContext implements BodyEncoder { + class DecodingContext implements LargeBodyEncoder { private SequentialFile cFile; @@ -454,7 +428,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.message.BodyEncoder#getLargeBodySize() + * @see org.apache.activemq.artemis.core.message.LargeBodyEncoder#getLargeBodySize() */ @Override public long getLargeBodySize() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessageEncoding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessageEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessageEncoding.java deleted file mode 100644 index cdb5702..0000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessageEncoding.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.persistence.impl.journal.codec; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.server.LargeServerMessage; - -public class LargeMessageEncoding implements EncodingSupport { - - public final LargeServerMessage message; - - public LargeMessageEncoding(final LargeServerMessage message) { - this.message = message; - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer) - */ - @Override - public void decode(final ActiveMQBuffer buffer) { - message.decodeHeadersAndProperties(buffer); - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer) - */ - @Override - public void encode(final ActiveMQBuffer buffer) { - message.encode(buffer); - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize() - */ - @Override - public int getEncodeSize() { - return message.getEncodeSize(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java new file mode 100644 index 0000000..b715f97 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.persistence.impl.journal.codec; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.core.server.LargeServerMessage; + +public class LargeMessagePersister implements Persister<LargeServerMessage> { + + private static final LargeMessagePersister theInstance = new LargeMessagePersister(); + + + public static LargeMessagePersister getInstance() { + return theInstance; + } + + protected LargeMessagePersister() { + } + + /* (non-Javadoc) + * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer) + */ + @Override + public LargeServerMessage decode(final ActiveMQBuffer buffer, LargeServerMessage message) { + ((CoreMessage)message).decodeHeadersAndProperties(buffer.byteBuf()); + return message; + } + + /* (non-Javadoc) + * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer) + */ + @Override + public void encode(final ActiveMQBuffer buffer, LargeServerMessage message) { + ((CoreMessage)message).encodeHeadersAndProperties(buffer.byteBuf()); + } + + /* (non-Javadoc) + * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize() + */ + @Override + public int getEncodeSize(LargeServerMessage message) { + return message.getEncodeSize(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java index 5b325b6..edd37b7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java @@ -17,12 +17,12 @@ package org.apache.activemq.artemis.core.persistence.impl.nullpm; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; -class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeServerMessage { +class NullStorageLargeServerMessage extends CoreMessage implements LargeServerMessage { NullStorageLargeServerMessage() { super(); @@ -39,7 +39,7 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe @Override public synchronized void addBytes(final byte[] bytes) { if (buffer == null) { - buffer = ActiveMQBuffers.dynamicBuffer(bytes.length); + buffer = ActiveMQBuffers.dynamicBuffer(bytes.length).byteBuf(); } // expand the buffer @@ -67,6 +67,12 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe } @Override + public boolean isServerMessage() { + return true; + } + + + @Override public synchronized int getEncodeSize() { return getHeadersAndPropertiesEncodeSize(); } @@ -77,7 +83,7 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe } @Override - public ServerMessage copy() { + public Message copy() { // This is a simple copy, used only to avoid changing original properties return new NullStorageLargeServerMessage(this); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 2154879..2c297d9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; @@ -32,7 +33,6 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; @@ -53,7 +53,6 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.RouteContextList; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -214,11 +213,11 @@ public class NullStorageManager implements StorageManager { } @Override - public void storeMessage(final ServerMessage message) throws Exception { + public void storeMessage(final Message message) throws Exception { } @Override - public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception { + public void storeMessageTransactional(final long txID, final Message message) throws Exception { } @Override @@ -274,7 +273,7 @@ public class NullStorageManager implements StorageManager { } @Override - public LargeServerMessage createLargeMessage(final long id, final MessageInternal message) { + public LargeServerMessage createLargeMessage(final long id, final Message message) { NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage(); largeMessage.copyHeadersAndProperties(message); @@ -464,10 +463,6 @@ public class NullStorageManager implements StorageManager { } @Override - public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depage) throws Exception { - } - - @Override public long storePageCounter(final long txID, final long queueID, final long value) throws Exception { return 0; } @@ -543,7 +538,7 @@ public class NullStorageManager implements StorageManager { @Override public boolean addToPage(PagingStore store, - ServerMessage msg, + Message msg, Transaction tx, RouteContextList listCtx) throws Exception { /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java index 4c6763d..f1e83d2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java @@ -16,11 +16,12 @@ */ package org.apache.activemq.artemis.core.postoffice; + +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.group.UnproposalListener; public interface Binding extends UnproposalListener { @@ -39,7 +40,7 @@ public interface Binding extends UnproposalListener { Filter getFilter(); - boolean isHighAcceptPriority(ServerMessage message); + boolean isHighAcceptPriority(Message message); boolean isExclusive(); @@ -47,9 +48,9 @@ public interface Binding extends UnproposalListener { int getDistance(); - void route(ServerMessage message, RoutingContext context) throws Exception; + void route(Message message, RoutingContext context) throws Exception; - void routeWithAck(ServerMessage message, RoutingContext context) throws Exception; + void routeWithAck(Message message, RoutingContext context) throws Exception; void close() throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java index b79f1da..1d335ad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java @@ -18,9 +18,9 @@ package org.apache.activemq.artemis.core.postoffice; import java.util.Collection; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.group.UnproposalListener; @@ -34,7 +34,7 @@ public interface Bindings extends UnproposalListener { void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType); - boolean redistribute(ServerMessage message, Queue originatingQueue, RoutingContext context) throws Exception; + boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception; - void route(ServerMessage message, RoutingContext context) throws Exception; + void route(Message message, RoutingContext context) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index 7b8ce18..f682777 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -28,7 +29,6 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -96,33 +96,33 @@ public interface PostOffice extends ActiveMQComponent { SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception; - RoutingStatus route(ServerMessage message, boolean direct) throws Exception; + RoutingStatus route(Message message, boolean direct) throws Exception; - RoutingStatus route(ServerMessage message, + RoutingStatus route(Message message, Transaction tx, boolean direct) throws Exception; - RoutingStatus route(ServerMessage message, + RoutingStatus route(Message message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception; - RoutingStatus route(ServerMessage message, + RoutingStatus route(Message message, RoutingContext context, boolean direct) throws Exception; - RoutingStatus route(ServerMessage message, + RoutingStatus route(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception; - MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception; + MessageReference reroute(Message message, Queue queue, Transaction tx) throws Exception; - Pair<RoutingContext, ServerMessage> redistribute(ServerMessage message, + Pair<RoutingContext, Message> redistribute(Message message, final Queue originatingQueue, Transaction tx) throws Exception; - void processRoute(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception; + void processRoute(final Message message, final RoutingContext context, final boolean direct) throws Exception; DuplicateIDCache getDuplicateIDCache(SimpleString address); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 6be0311..2de97e2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -30,14 +30,12 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.group.GroupingHandler; @@ -152,7 +150,7 @@ public final class BindingsImpl implements Bindings { } @Override - public boolean redistribute(final ServerMessage message, + public boolean redistribute(final Message message, final Queue originatingQueue, final RoutingContext context) throws Exception { if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) { @@ -230,18 +228,18 @@ public final class BindingsImpl implements Bindings { } @Override - public void route(final ServerMessage message, final RoutingContext context) throws Exception { + public void route(final Message message, final RoutingContext context) throws Exception { route(message, context, true); } - private void route(final ServerMessage message, + private void route(final Message message, final RoutingContext context, final boolean groupRouting) throws Exception { /* This is a special treatment for scaled-down messages involving SnF queues. * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property */ - if (message.containsProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS)) { - byte[] ids = (byte[]) message.removeProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS); + if (message.containsProperty(Message.HDR_SCALEDOWN_TO_IDS)) { + byte[] ids = (byte[]) message.removeProperty(Message.HDR_SCALEDOWN_TO_IDS); if (ids != null) { ByteBuffer buffer = ByteBuffer.wrap(ids); @@ -251,7 +249,7 @@ public final class BindingsImpl implements Bindings { if (entry.getValue() instanceof RemoteQueueBinding) { RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue(); if (remoteQueueBinding.getRemoteQueueID() == id) { - message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array()); + message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array()); } } } @@ -272,7 +270,7 @@ public final class BindingsImpl implements Bindings { if (!routed) { // Remove the ids now, in order to avoid double check - byte[] ids = (byte[]) message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS); + byte[] ids = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_IDS); // Fetch the groupId now, in order to avoid double checking SimpleString groupId = message.getSimpleStringProperty(Message.HDR_GROUP_ID); @@ -319,7 +317,7 @@ public final class BindingsImpl implements Bindings { * these two servers. This will eventually send more messages to one server than the other * (depending if you are using multi-thread), and not lose messages. */ - private Binding getNextBinding(final ServerMessage message, + private Binding getNextBinding(final Message message, final SimpleString routingName, final List<Binding> bindings) { Integer ipos = routingNamePositions.get(routingName); @@ -407,7 +405,7 @@ public final class BindingsImpl implements Bindings { return theBinding; } - private void routeUsingStrictOrdering(final ServerMessage message, + private void routeUsingStrictOrdering(final Message message, final RoutingContext context, final GroupingHandler groupingGroupingHandler, final SimpleString groupId, @@ -473,7 +471,7 @@ public final class BindingsImpl implements Bindings { return null; } - private void routeAndCheckNull(ServerMessage message, + private void routeAndCheckNull(Message message, RoutingContext context, Response resp, Binding theBinding, @@ -552,10 +550,10 @@ public final class BindingsImpl implements Bindings { return writer.toString(); } - private void routeFromCluster(final ServerMessage message, + private void routeFromCluster(final Message message, final RoutingContext context, final byte[] ids) throws Exception { - byte[] idsToAck = (byte[]) message.removeProperty(MessageImpl.HDR_ROUTE_TO_ACK_IDS); + byte[] idsToAck = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_ACK_IDS); List<Long> idsToAckList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java index 04f432d..8f4ab48 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.Binding; @@ -23,7 +24,6 @@ import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.core.server.ServerMessage; public class DivertBinding implements Binding { @@ -98,12 +98,12 @@ public class DivertBinding implements Binding { } @Override - public boolean isHighAcceptPriority(final ServerMessage message) { + public boolean isHighAcceptPriority(final Message message) { return true; } @Override - public void route(final ServerMessage message, final RoutingContext context) throws Exception { + public void route(final Message message, final RoutingContext context) throws Exception { divert.route(message, context); } @@ -150,7 +150,7 @@ public class DivertBinding implements Binding { } @Override - public void routeWithAck(ServerMessage message, RoutingContext context) { + public void routeWithAck(Message message, RoutingContext context) { //noop } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java index af49c4d..176d614 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.BindingType; @@ -24,7 +25,6 @@ import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; public class LocalQueueBinding implements QueueBinding { @@ -104,7 +104,7 @@ public class LocalQueueBinding implements QueueBinding { } @Override - public boolean isHighAcceptPriority(final ServerMessage message) { + public boolean isHighAcceptPriority(final Message message) { // It's a high accept priority if the queue has at least one matching consumer return queue.hasMatchingConsumer(message); @@ -116,14 +116,14 @@ public class LocalQueueBinding implements QueueBinding { } @Override - public void route(final ServerMessage message, final RoutingContext context) throws Exception { + public void route(final Message message, final RoutingContext context) throws Exception { if (isMatchRoutingType(context)) { queue.route(message, context); } } @Override - public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception { + public void routeWithAck(Message message, RoutingContext context) throws Exception { if (isMatchRoutingType(context)) { queue.routeWithAck(message, context); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 2f6ae3d..464859f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,6 +37,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; @@ -45,7 +45,7 @@ import org.apache.activemq.artemis.api.core.management.NotificationType; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.io.IOCallback; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -69,12 +69,9 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; @@ -665,20 +662,20 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public RoutingStatus route(final ServerMessage message, + public RoutingStatus route(final Message message, final boolean direct) throws Exception { return route(message, (Transaction) null, direct); } @Override - public RoutingStatus route(final ServerMessage message, + public RoutingStatus route(final Message message, final Transaction tx, final boolean direct) throws Exception { return route(message, new RoutingContextImpl(tx), direct); } @Override - public RoutingStatus route(final ServerMessage message, + public RoutingStatus route(final Message message, final Transaction tx, final boolean direct, final boolean rejectDuplicates) throws Exception { @@ -686,14 +683,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public RoutingStatus route(final ServerMessage message, + public RoutingStatus route(final Message message, final RoutingContext context, final boolean direct) throws Exception { return route(message, context, direct, true); } @Override - public RoutingStatus route(final ServerMessage message, + public RoutingStatus route(final Message message, final RoutingContext context, final boolean direct, boolean rejectDuplicates) throws Exception { @@ -708,7 +705,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding AtomicBoolean startedTX = new AtomicBoolean(false); - final SimpleString address = message.getAddress(); + final SimpleString address = message.getAddressSimpleString(); applyExpiryDelay(message, address); @@ -716,13 +713,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return RoutingStatus.DUPLICATED_ID; } - if (message.hasInternalProperties()) { - // We need to perform some cleanup on internal properties, - // but we don't do it every time, otherwise it wouldn't be optimal - cleanupInternalPropertiesBeforeRouting(message); - } + message.cleanupInternalProperties(); - Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress() == null ? message.getAddress() : context.getAddress()); + Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress() == null ? message.getAddressSimpleString() : context.getAddress()); // TODO auto-create queues here? // first check for the auto-queue creation thing @@ -768,7 +761,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding result = RoutingStatus.NO_BINDINGS; ActiveMQServerLogger.LOGGER.noDLA(address); } else { - message.setOriginalHeaders(message, null, false); + message.referenceOriginalMessage(message, null); message.setAddress(dlaAddress); @@ -806,7 +799,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } // HORNETQ-1029 - private void applyExpiryDelay(ServerMessage message, SimpleString address) { + private void applyExpiryDelay(Message message, SimpleString address) { long expirationOverride = addressSettingsRepository.getMatch(address.toString()).getExpiryDelay(); // A -1 <expiry-delay> means don't do anything @@ -819,12 +812,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public MessageReference reroute(final ServerMessage message, + public MessageReference reroute(final Message message, final Queue queue, final Transaction tx) throws Exception { + setPagingStore(message); - MessageReference reference = message.createReference(queue); + MessageReference reference = MessageReference.Factory.createReference(message, queue); if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); @@ -852,15 +846,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding * The redistribution can't process the route right away as we may be dealing with a large message which will need to be processed on a different thread */ @Override - public Pair<RoutingContext, ServerMessage> redistribute(final ServerMessage message, + public Pair<RoutingContext, Message> redistribute(final Message message, final Queue originatingQueue, final Transaction tx) throws Exception { // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message // arrived the target node // as described on https://issues.jboss.org/browse/JBPAPP-6130 - ServerMessage copyRedistribute = message.copy(storageManager.generateID()); + Message copyRedistribute = message.copy(storageManager.generateID()); - Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddress()); + Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString()); if (bindings != null) { RoutingContext context = new RoutingContextImpl(tx); @@ -937,7 +931,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding synchronized (notificationLock) { // First send a reset message - ServerMessage message = new ServerMessageImpl(storageManager.generateID(), 50); + Message message = new CoreMessage(storageManager.generateID(), 50); message.setAddress(queueName); message.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA, true); @@ -987,7 +981,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } } - ServerMessage completeMessage = new ServerMessageImpl(storageManager.generateID(), 50); + Message completeMessage = new CoreMessage(storageManager.generateID(), 50); completeMessage.setAddress(queueName); completeMessage.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA_COMPLETE, true); @@ -1006,37 +1000,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // Private ----------------------------------------------------------------- - /** - * @param message - */ - protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message) { - LinkedList<SimpleString> valuesToRemove = null; - - for (SimpleString name : message.getPropertyNames()) { - // We use properties to establish routing context on clustering. - // However if the client resends the message after receiving, it needs to be removed - if ((name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_IDS)) || (name.startsWith(MessageImpl.HDR_ROUTE_TO_ACK_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_ACK_IDS))) { - if (valuesToRemove == null) { - valuesToRemove = new LinkedList<>(); - } - valuesToRemove.add(name); - } - } - - if (valuesToRemove != null) { - for (SimpleString removal : valuesToRemove) { - message.removeProperty(removal); - } - } - } - private void setPagingStore(final ServerMessage message) throws Exception { - PagingStore store = pagingManager.getPageStore(message.getAddress()); + private void setPagingStore(final Message message) throws Exception { + PagingStore store = pagingManager.getPageStore(message.getAddressSimpleString()); - message.setPagingStore(store); + message.setContext(store); } - private void routeQueueInfo(final ServerMessage message, + private void routeQueueInfo(final Message message, final Queue queue, final boolean applyFilters) throws Exception { if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message)) { @@ -1074,13 +1045,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public void processRoute(final ServerMessage message, + public void processRoute(final Message message, final RoutingContext context, final boolean direct) throws Exception { final List<MessageReference> refs = new ArrayList<>(); Transaction tx = context.getTransaction(); + Long deliveryTime = message.getScheduledDeliveryTime(); + for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) { PagingStore store = pagingManager.getPageStore(entry.getKey()); @@ -1095,14 +1068,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } for (Queue queue : entry.getValue().getNonDurableQueues()) { - MessageReference reference = message.createReference(queue); - - refs.add(reference); - if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { - Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); + MessageReference reference = MessageReference.Factory.createReference(message, queue); - reference.setScheduledDeliveryTime(scheduledDeliveryTime); + if (deliveryTime != null) { + reference.setScheduledDeliveryTime(deliveryTime); } + refs.add(reference); message.incrementRefCount(); } @@ -1112,22 +1083,20 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding while (iter.hasNext()) { Queue queue = iter.next(); - MessageReference reference = message.createReference(queue); + MessageReference reference = MessageReference.Factory.createReference(message, queue); - if (context.isAlreadyAcked(message.getAddress(), queue)) { + if (context.isAlreadyAcked(message.getAddressSimpleString(), queue)) { reference.setAlreadyAcked(); if (tx != null) { queue.acknowledge(tx, reference); } } - refs.add(reference); - - if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { - Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); - reference.setScheduledDeliveryTime(scheduledDeliveryTime); + if (deliveryTime != null) { + reference.setScheduledDeliveryTime(deliveryTime); } + refs.add(reference); if (message.isDurable()) { int durableRefCount = message.incrementDurableRefCount(); @@ -1152,7 +1121,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext()); } - if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { + if (message.containsDeliveryAnnotationProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { if (tx != null) { storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference); } else { @@ -1189,7 +1158,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding * @param message * @throws Exception */ - private void confirmLargeMessageSend(Transaction tx, final ServerMessage message) throws Exception { + private void confirmLargeMessageSend(Transaction tx, final Message message) throws Exception { LargeServerMessage largeServerMessage = (LargeServerMessage) message; if (largeServerMessage.getPendingRecordID() >= 0) { if (tx == null) { @@ -1245,13 +1214,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - private boolean checkDuplicateID(final ServerMessage message, + private boolean checkDuplicateID(final Message message, final RoutingContext context, boolean rejectDuplicates, AtomicBoolean startedTX) throws Exception { // Check the DuplicateCache for the Bridge first - Object bridgeDup = message.getObjectProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID); + Object bridgeDup = message.removeDeliveryAnnoationProperty(Message.HDR_BRIDGE_DUPLICATE_ID); if (bridgeDup != null) { // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one byte[] bridgeDupBytes = (byte[]) bridgeDup; @@ -1269,8 +1238,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding message.decrementRefCount(); return false; } - - message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID); } else { // if used BridgeDuplicate, it's not going to use the regular duplicate // since this will would break redistribution (re-setting the duplicateId) @@ -1281,7 +1248,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding boolean isDuplicate = false; if (duplicateIDBytes != null) { - cache = getDuplicateIDCache(message.getAddress()); + cache = getDuplicateIDCache(message.getAddressSimpleString()); isDuplicate = cache.contains(duplicateIDBytes); @@ -1338,8 +1305,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName) { - ServerMessage message = new ServerMessageImpl(storageManager.generateID(), 50); + private Message createQueueInfoMessage(final NotificationType type, final SimpleString queueName) { + Message message = new CoreMessage().initBuffer(50).setMessageID(storageManager.generateID()); message.setAddress(queueName); @@ -1433,7 +1400,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // Reverse the ref counts, and paging sizes for (MessageReference ref : refs) { - ServerMessage message = ref.getMessage(); + Message message = ref.getMessage(); if (message.isDurable() && ref.getQueue().isDurable()) { message.decrementDurableRefCount(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java index 2869e38..45082b9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupRegistrationMessage; @@ -47,7 +48,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE; @@ -87,11 +87,11 @@ public class ServerPacketDecoder extends ClientPacketDecoder { switch (packetType) { case SESS_SEND: { - packet = new SessionSendMessage(new ServerMessageImpl()); + packet = new SessionSendMessage(new CoreMessage()); break; } case SESS_SEND_LARGE: { - packet = new SessionSendLargeMessage(new ServerMessageImpl()); + packet = new SessionSendLargeMessage(new CoreMessage()); break; } case REPLICATION_APPEND: {
