http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 2708c72..8311057 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -39,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; @@ -72,7 +73,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.Duplicate import org.apache.activemq.artemis.core.persistence.impl.journal.codec.FinishPageMessageOperation; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.GroupingEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.HeuristicCompletionEncoding; -import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountPendingImpl; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc; @@ -93,15 +94,14 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RouteContextList; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.JournalLoader; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.IDGenerator; @@ -174,8 +174,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager { private final boolean syncNonTransactional; - protected int perfBlastPages = -1; - protected boolean journalLoaded = false; private final IOCriticalErrorListener ioCriticalErrorListener; @@ -347,7 +345,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { } @Override - public void storeMessage(final ServerMessage message) throws Exception { + public void storeMessage(final Message message) throws Exception { if (message.getMessageID() <= 0) { // Sanity check only... this shouldn't happen unless there is a bug throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned(); @@ -359,9 +357,9 @@ public abstract class AbstractJournalStorageManager implements StorageManager { // appropriate if (message.isLargeMessage()) { - messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, new LargeMessageEncoding((LargeServerMessage) message), false, getContext(false)); + messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, LargeMessagePersister.getInstance(), message, false, getContext(false)); } else { - messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_MESSAGE, message, false, getContext(false)); + messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, message.getPersister(), message, false, getContext(false)); } } finally { readUnLock(); @@ -460,7 +458,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { // Transactional operations @Override - public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception { + public void storeMessageTransactional(final long txID, final Message message) throws Exception { if (message.getMessageID() <= 0) { throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned(); } @@ -468,9 +466,9 @@ public abstract class AbstractJournalStorageManager implements StorageManager { readLock(); try { if (message.isLargeMessage()) { - messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, new LargeMessageEncoding(((LargeServerMessage) message))); + messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, LargeMessagePersister.getInstance(), message); } else { - messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_MESSAGE, message); + messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, message.getPersister(), message); } } finally { @@ -502,16 +500,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager { } @Override - public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception { - readLock(); - try { - messageJournal.appendUpdateRecord(pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages), syncNonTransactional, getContext(syncNonTransactional)); - } finally { - readUnLock(); - } - } - - @Override public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception { readLock(); try { @@ -833,7 +821,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>(); - Map<Long, ServerMessage> messages = new HashMap<>(); + Map<Long, Message> messages = new HashMap<>(); readLock(); try { @@ -884,9 +872,12 @@ public abstract class AbstractJournalStorageManager implements StorageManager { break; } case JournalRecordIds.ADD_MESSAGE: { - ServerMessage message = new ServerMessageImpl(record.id, 50); + throw new IllegalStateException("This is using old journal data, export your data and import at the correct version"); + } - message.decode(buff); + case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { + + Message message = MessagePersister.getInstance().decode(buff, null); messages.put(record.id, message); @@ -907,7 +898,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { queueMap.put(encoding.queueID, queueMessages); } - ServerMessage message = messages.get(messageID); + Message message = messages.get(messageID); if (message == null) { ActiveMQServerLogger.LOGGER.cannotFindMessage(record.id); @@ -1151,10 +1142,6 @@ public abstract class AbstractJournalStorageManager implements StorageManager { pagingManager.processReload(); } - if (perfBlastPages != -1) { - messageJournal.perfBlast(perfBlastPages); - } - journalLoader.postLoad(messageJournal, resourceManager, duplicateIDMap); journalLoaded = true; return info; @@ -1581,7 +1568,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { } } - protected abstract LargeServerMessage parseLargeMessage(Map<Long, ServerMessage> messages, + protected abstract LargeServerMessage parseLargeMessage(Map<Long, Message> messages, ActiveMQBuffer buff) throws Exception; private void loadPreparedTransactions(final PostOffice postOffice, @@ -1603,7 +1590,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { List<MessageReference> referencesToAck = new ArrayList<>(); - Map<Long, ServerMessage> messages = new HashMap<>(); + Map<Long, Message> messages = new HashMap<>(); // Use same method as load message journal to prune out acks, so they don't get added. // Then have reacknowledge(tx) methods on queue, which needs to add the page size @@ -1623,9 +1610,11 @@ public abstract class AbstractJournalStorageManager implements StorageManager { break; } case JournalRecordIds.ADD_MESSAGE: { - ServerMessage message = new ServerMessageImpl(record.id, 50); - message.decode(buff); + break; + } + case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { + Message message = MessagePersister.getInstance().decode(buff, null); messages.put(record.id, message); @@ -1638,7 +1627,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { encoding.decode(buff); - ServerMessage message = messages.get(messageID); + Message message = messages.get(messageID); if (message == null) { throw new IllegalStateException("Cannot find message with id " + messageID); @@ -1915,7 +1904,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { @Override public boolean addToPage(PagingStore store, - ServerMessage msg, + Message msg, Transaction tx, RouteContextList listCtx) throws Exception { /** @@ -1939,4 +1928,5 @@ public abstract class AbstractJournalStorageManager implements StorageManager { } txoper.confirmedMessages.add(recordID); } + }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java index 3ca38e3..acf9c8e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java @@ -16,21 +16,21 @@ */ package org.apache.activemq.artemis.core.persistence.impl.journal; -import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.api.core.Message; public final class AddMessageRecord { - public AddMessageRecord(final ServerMessage message) { + public AddMessageRecord(final Message message) { this.message = message; } - final ServerMessage message; + final Message message; private long scheduledDeliveryTime; private int deliveryCount; - public ServerMessage getMessage() { + public Message getMessage() { return message; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index b9449bc..698978b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -44,7 +44,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAck import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DuplicateIDEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.HeuristicCompletionEncoding; -import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountPendingImpl; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc; @@ -53,8 +53,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLa import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.XidCodecSupport; @@ -64,6 +63,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE_PROTOCOL; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_REF; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.HEURISTIC_COMPLETION; @@ -445,16 +445,15 @@ public final class DescribeJournal { LargeServerMessage largeMessage = new LargeServerMessageImpl(storageManager); - LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage); - - messageEncoding.decode(buffer); + LargeMessagePersister.getInstance().decode(buffer, largeMessage); return new MessageDescribe(largeMessage); } case ADD_MESSAGE: { - ServerMessage message = new ServerMessageImpl(rec, 50); - - message.decode(buffer); + return "ADD-MESSAGE is not supported any longer, use export/import"; + } + case ADD_MESSAGE_PROTOCOL: { + Message message = MessagePersister.getInstance().decode(buffer, null); return new MessageDescribe(message); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java index cd1d526..348ac9b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java @@ -85,4 +85,7 @@ public final class JournalRecordIds { public static final byte PAGE_CURSOR_PENDING_COUNTER = 43; public static final byte ADDRESS_BINDING_RECORD = 44; + + public static final byte ADD_MESSAGE_PROTOCOL = 45; + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 51fd6cc..c31de52 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -49,12 +49,11 @@ import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; -import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessagePersister; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage; import org.apache.activemq.artemis.core.replication.ReplicatedJournal; @@ -63,7 +62,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.jboss.logging.Logger; @@ -157,8 +155,6 @@ public class JournalStorageManager extends AbstractJournalStorageManager { largeMessagesFactory = new NIOSequentialFileFactory(config.getLargeMessagesLocation(), false, criticalErrorListener, 1); - perfBlastPages = config.getJournalPerfBlastPages(); - if (config.getPageMaxConcurrentIO() != 1) { pageMaxConcurrentIO = new Semaphore(config.getPageMaxConcurrentIO()); } else { @@ -287,13 +283,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager { * @param buff * @return * @throws Exception - */ protected LargeServerMessage parseLargeMessage(final Map<Long, ServerMessage> messages, + */ protected LargeServerMessage parseLargeMessage(final Map<Long, Message> messages, final ActiveMQBuffer buff) throws Exception { LargeServerMessage largeMessage = createLargeMessage(); - LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage); - - messageEncoding.decode(buff); + LargeMessagePersister.getInstance().decode(buff, largeMessage); if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID)) { // for compatibility: couple with old behaviour, copying the old file to avoid message loss @@ -451,7 +445,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } @Override - public LargeServerMessage createLargeMessage(final long id, final MessageInternal message) throws Exception { + public LargeServerMessage createLargeMessage(final long id, final Message message) throws Exception { readLock(); try { if (isReplicated()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java index 8953291..33be342 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java @@ -21,21 +21,21 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.core.server.ServerMessage; import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE; public class LargeMessageTXFailureCallback implements TransactionFailureCallback { private AbstractJournalStorageManager journalStorageManager; - private final Map<Long, ServerMessage> messages; + private final Map<Long, Message> messages; public LargeMessageTXFailureCallback(AbstractJournalStorageManager journalStorageManager, - final Map<Long, ServerMessage> messages) { + final Map<Long, Message> messages) { super(); this.journalStorageManager = journalStorageManager; this.messages = messages; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 90b1fdd..817a56a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -25,17 +25,15 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.io.SequentialFile; -import org.apache.activemq.artemis.core.message.BodyEncoder; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.TypedProperties; import org.jboss.logging.Logger; -public final class LargeServerMessageImpl extends ServerMessageImpl implements LargeServerMessage { +public final class LargeServerMessageImpl extends CoreMessage implements LargeServerMessage { // Constants ----------------------------------------------------- private static final Logger logger = Logger.getLogger(LargeServerMessageImpl.class); @@ -43,30 +41,28 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L // Attributes ---------------------------------------------------- private final JournalStorageManager storageManager; - + private final AtomicInteger delayDeletionCount = new AtomicInteger(0); private long pendingRecordID = -1; - private boolean paged; - // We should only use the NIO implementation on the Journal private SequentialFile file; - // set when a copyFrom is called // The actual copy is done when finishCopy is called private SequentialFile pendingCopy; - private long bodySize = -1; - private final AtomicInteger delayDeletionCount = new AtomicInteger(0); - // Static -------------------------------------------------------- // Constructors -------------------------------------------------- + // We cache this + private volatile int memoryEstimate = -1; public LargeServerMessageImpl(final JournalStorageManager storageManager) { this.storageManager = storageManager; } + // Public -------------------------------------------------------- + /** * Copy constructor * @@ -85,14 +81,18 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L setMessageID(newID); } - // Public -------------------------------------------------------- + private static String toDate(long timestamp) { + if (timestamp == 0) { + return "0"; + } else { + return new java.util.Date(timestamp).toString(); + } + + } - /** - * @param pendingRecordID - */ @Override - public void setPendingRecordID(long pendingRecordID) { - this.pendingRecordID = pendingRecordID; + public boolean isServerMessage() { + return true; } @Override @@ -100,6 +100,14 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L return this.pendingRecordID; } + /** + * @param pendingRecordID + */ + @Override + public void setPendingRecordID(long pendingRecordID) { + this.pendingRecordID = pendingRecordID; + } + @Override public void setPaged() { paged = true; @@ -118,39 +126,19 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L bodySize += bytes.length; } - public void encodeBody(final ActiveMQBuffer bufferOut, final BodyEncoder context, final int size) { - try { - // This could maybe be optimized (maybe reading directly into bufferOut) - ByteBuffer bufferRead = ByteBuffer.allocate(size); - - int bytesRead = context.encode(bufferRead); - - bufferRead.flip(); - - if (bytesRead > 0) { - bufferOut.writeBytes(bufferRead.array(), 0, bytesRead); - } - - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - } - @Override public synchronized int getEncodeSize() { return getHeadersAndPropertiesEncodeSize(); } - @Override public void encode(final ActiveMQBuffer buffer1) { - super.encodeHeadersAndProperties(buffer1); + super.encodeHeadersAndProperties(buffer1.byteBuf()); } - @Override public void decode(final ActiveMQBuffer buffer1) { file = null; - super.decodeHeadersAndProperties(buffer1); + super.decodeHeadersAndProperties(buffer1.byteBuf()); } @Override @@ -175,7 +163,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L } @Override - public BodyEncoder getBodyEncoder() throws ActiveMQException { + public LargeBodyEncoder getBodyEncoder() throws ActiveMQException { validateFile(); return new DecodingContext(); } @@ -220,9 +208,6 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L storageManager.deleteLargeMessageFile(this); } - // We cache this - private volatile int memoryEstimate = -1; - @Override public synchronized int getMemoryEstimate() { if (memoryEstimate == -1) { @@ -248,28 +233,29 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L } @Override - public void setOriginalHeaders(final ServerMessage other, - final MessageReference originalReference, - final boolean expiry) { - super.setOriginalHeaders(other, originalReference, expiry); - - LargeServerMessageImpl otherLM = (LargeServerMessageImpl) other; - this.paged = otherLM.paged; - if (this.paged) { - this.removeProperty(Message.HDR_ORIG_MESSAGE_ID); + public void referenceOriginalMessage(final Message original, String originalQueue) { + + super.referenceOriginalMessage(original, originalQueue); + + if (original instanceof LargeServerMessageImpl) { + LargeServerMessageImpl otherLM = (LargeServerMessageImpl) original; + this.paged = otherLM.paged; + if (this.paged) { + this.removeProperty(Message.HDR_ORIG_MESSAGE_ID); + } } } @Override - public ServerMessage copy() { + public Message copy() { SequentialFile newfile = storageManager.createFileForLargeMessage(messageID, durable); - ServerMessage newMessage = new LargeServerMessageImpl(this, properties, newfile, messageID); + Message newMessage = new LargeServerMessageImpl(this, properties, newfile, messageID); return newMessage; } @Override - public ServerMessage copy(final long newID) { + public Message copy(final long newID) { try { LargeServerMessage newMessage = storageManager.createLargeMessage(newID, this); @@ -286,7 +272,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L file.open(); file.position(0); - for (;;) { + for (; ; ) { // The buffer is reused... // We need to make sure we clear the limits and the buffer before reusing it buffer.clear(); @@ -337,19 +323,6 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this); } - private static String toDate(long timestamp) { - if (timestamp == 0) { - return "0"; - } else { - return new java.util.Date(timestamp).toString(); - } - - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - @Override protected void finalize() throws Throwable { releaseResources(); @@ -400,7 +373,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L // Inner classes ------------------------------------------------- - class DecodingContext implements BodyEncoder { + class DecodingContext implements LargeBodyEncoder { private SequentialFile cFile; @@ -454,7 +427,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L } /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.message.BodyEncoder#getLargeBodySize() + * @see org.apache.activemq.artemis.core.message.LargeBodyEncoder#getLargeBodySize() */ @Override public long getLargeBodySize() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessagePersister.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessagePersister.java new file mode 100644 index 0000000..cb578e1 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessagePersister.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.persistence.impl.journal; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.utils.DataConstants; + +public class LargeServerMessagePersister implements Persister<LargeServerMessage> { + + /** + * for future usage... + * when we have refactored large message properly + * this could be used to differentiate other protocols large message persisters + */ + byte PERSISTER_ID = 11; + + public static LargeServerMessagePersister theInstance = new LargeServerMessagePersister(); + + public static LargeServerMessagePersister getInstance() { + return theInstance; + } + + protected LargeServerMessagePersister() { + } + + @Override + public int getEncodeSize(LargeServerMessage record) { + return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + + record.getPersistSize(); + } + + /** Sub classes must add the first short as the protocol-id */ + @Override + public void encode(ActiveMQBuffer buffer, LargeServerMessage record) { + buffer.writeByte(PERSISTER_ID); + buffer.writeLong(record.getMessageID()); + buffer.writeNullableSimpleString(record.getAddressSimpleString()); + record.persist(buffer); + } + + + @Override + public LargeServerMessage decode(ActiveMQBuffer buffer, LargeServerMessage record) { + // the caller must consume the first byte already, as that will be used to decide what persister (protocol) to use + buffer.readByte(); // for future usage, not used now + long id = buffer.readLong(); + SimpleString address = buffer.readNullableSimpleString(); + record.reloadPersistence(buffer); + record.setMessageID(id); + record.setAddress(address); + return record; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessageEncoding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessageEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessageEncoding.java deleted file mode 100644 index cdb5702..0000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessageEncoding.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.persistence.impl.journal.codec; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.server.LargeServerMessage; - -public class LargeMessageEncoding implements EncodingSupport { - - public final LargeServerMessage message; - - public LargeMessageEncoding(final LargeServerMessage message) { - this.message = message; - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer) - */ - @Override - public void decode(final ActiveMQBuffer buffer) { - message.decodeHeadersAndProperties(buffer); - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer) - */ - @Override - public void encode(final ActiveMQBuffer buffer) { - message.encode(buffer); - } - - /* (non-Javadoc) - * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize() - */ - @Override - public int getEncodeSize() { - return message.getEncodeSize(); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java new file mode 100644 index 0000000..cb3129b --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/LargeMessagePersister.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.persistence.impl.journal.codec; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.core.server.LargeServerMessage; + +public class LargeMessagePersister implements Persister<LargeServerMessage> { + + private static final LargeMessagePersister theInstance = new LargeMessagePersister(); + + + public static LargeMessagePersister getInstance() { + return theInstance; + } + + + protected LargeMessagePersister() { + } + + /* (non-Javadoc) + * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer) + */ + @Override + public LargeServerMessage decode(final ActiveMQBuffer buffer, LargeServerMessage message) { + ((CoreMessage)message).decodeHeadersAndProperties(buffer.byteBuf()); + return message; + } + + /* (non-Javadoc) + * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer) + */ + @Override + public void encode(final ActiveMQBuffer buffer, LargeServerMessage message) { + ((CoreMessage)message).encodeHeadersAndProperties(buffer.byteBuf()); + } + + /* (non-Javadoc) + * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize() + */ + @Override + public int getEncodeSize(LargeServerMessage message) { + return message.getEncodeSize(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java index 5b325b6..28ccc09 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java @@ -17,12 +17,14 @@ package org.apache.activemq.artemis.core.persistence.impl.nullpm; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; -class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeServerMessage { + + +class NullStorageLargeServerMessage extends CoreMessage implements LargeServerMessage { NullStorageLargeServerMessage() { super(); @@ -39,7 +41,7 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe @Override public synchronized void addBytes(final byte[] bytes) { if (buffer == null) { - buffer = ActiveMQBuffers.dynamicBuffer(bytes.length); + buffer = ActiveMQBuffers.dynamicBuffer(bytes.length).byteBuf(); } // expand the buffer @@ -67,6 +69,12 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe } @Override + public boolean isServerMessage() { + return true; + } + + + @Override public synchronized int getEncodeSize() { return getHeadersAndPropertiesEncodeSize(); } @@ -77,7 +85,7 @@ class NullStorageLargeServerMessage extends ServerMessageImpl implements LargeSe } @Override - public ServerMessage copy() { + public Message copy() { // This is a simple copy, used only to avoid changing original properties return new NullStorageLargeServerMessage(this); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 2154879..cb035ec 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; @@ -32,7 +33,6 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; @@ -53,7 +53,7 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.RouteContextList; -import org.apache.activemq.artemis.core.server.ServerMessage; + import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -214,11 +214,11 @@ public class NullStorageManager implements StorageManager { } @Override - public void storeMessage(final ServerMessage message) throws Exception { + public void storeMessage(final Message message) throws Exception { } @Override - public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception { + public void storeMessageTransactional(final long txID, final Message message) throws Exception { } @Override @@ -274,7 +274,7 @@ public class NullStorageManager implements StorageManager { } @Override - public LargeServerMessage createLargeMessage(final long id, final MessageInternal message) { + public LargeServerMessage createLargeMessage(final long id, final Message message) { NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage(); largeMessage.copyHeadersAndProperties(message); @@ -464,10 +464,6 @@ public class NullStorageManager implements StorageManager { } @Override - public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depage) throws Exception { - } - - @Override public long storePageCounter(final long txID, final long queueID, final long value) throws Exception { return 0; } @@ -543,7 +539,7 @@ public class NullStorageManager implements StorageManager { @Override public boolean addToPage(PagingStore store, - ServerMessage msg, + Message msg, Transaction tx, RouteContextList listCtx) throws Exception { /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java index 4c6763d..f1e83d2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Binding.java @@ -16,11 +16,12 @@ */ package org.apache.activemq.artemis.core.postoffice; + +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.group.UnproposalListener; public interface Binding extends UnproposalListener { @@ -39,7 +40,7 @@ public interface Binding extends UnproposalListener { Filter getFilter(); - boolean isHighAcceptPriority(ServerMessage message); + boolean isHighAcceptPriority(Message message); boolean isExclusive(); @@ -47,9 +48,9 @@ public interface Binding extends UnproposalListener { int getDistance(); - void route(ServerMessage message, RoutingContext context) throws Exception; + void route(Message message, RoutingContext context) throws Exception; - void routeWithAck(ServerMessage message, RoutingContext context) throws Exception; + void routeWithAck(Message message, RoutingContext context) throws Exception; void close() throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java index b79f1da..1d335ad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java @@ -18,9 +18,9 @@ package org.apache.activemq.artemis.core.postoffice; import java.util.Collection; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.group.UnproposalListener; @@ -34,7 +34,7 @@ public interface Bindings extends UnproposalListener { void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType); - boolean redistribute(ServerMessage message, Queue originatingQueue, RoutingContext context) throws Exception; + boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception; - void route(ServerMessage message, RoutingContext context) throws Exception; + void route(Message message, RoutingContext context) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index 7b8ce18..f682777 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -28,7 +29,6 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -96,33 +96,33 @@ public interface PostOffice extends ActiveMQComponent { SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception; - RoutingStatus route(ServerMessage message, boolean direct) throws Exception; + RoutingStatus route(Message message, boolean direct) throws Exception; - RoutingStatus route(ServerMessage message, + RoutingStatus route(Message message, Transaction tx, boolean direct) throws Exception; - RoutingStatus route(ServerMessage message, + RoutingStatus route(Message message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception; - RoutingStatus route(ServerMessage message, + RoutingStatus route(Message message, RoutingContext context, boolean direct) throws Exception; - RoutingStatus route(ServerMessage message, + RoutingStatus route(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception; - MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception; + MessageReference reroute(Message message, Queue queue, Transaction tx) throws Exception; - Pair<RoutingContext, ServerMessage> redistribute(ServerMessage message, + Pair<RoutingContext, Message> redistribute(Message message, final Queue originatingQueue, Transaction tx) throws Exception; - void processRoute(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception; + void processRoute(final Message message, final RoutingContext context, final boolean direct) throws Exception; DuplicateIDCache getDuplicateIDCache(SimpleString address); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 6be0311..c1f0dba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -30,14 +30,13 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; + import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.group.GroupingHandler; @@ -152,7 +151,7 @@ public final class BindingsImpl implements Bindings { } @Override - public boolean redistribute(final ServerMessage message, + public boolean redistribute(final Message message, final Queue originatingQueue, final RoutingContext context) throws Exception { if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) { @@ -230,18 +229,18 @@ public final class BindingsImpl implements Bindings { } @Override - public void route(final ServerMessage message, final RoutingContext context) throws Exception { + public void route(final Message message, final RoutingContext context) throws Exception { route(message, context, true); } - private void route(final ServerMessage message, + private void route(final Message message, final RoutingContext context, final boolean groupRouting) throws Exception { /* This is a special treatment for scaled-down messages involving SnF queues. * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property */ - if (message.containsProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS)) { - byte[] ids = (byte[]) message.removeProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS); + if (message.containsProperty(Message.HDR_SCALEDOWN_TO_IDS)) { + byte[] ids = (byte[]) message.removeProperty(Message.HDR_SCALEDOWN_TO_IDS); if (ids != null) { ByteBuffer buffer = ByteBuffer.wrap(ids); @@ -251,7 +250,7 @@ public final class BindingsImpl implements Bindings { if (entry.getValue() instanceof RemoteQueueBinding) { RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue(); if (remoteQueueBinding.getRemoteQueueID() == id) { - message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array()); + message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array()); } } } @@ -272,7 +271,7 @@ public final class BindingsImpl implements Bindings { if (!routed) { // Remove the ids now, in order to avoid double check - byte[] ids = (byte[]) message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS); + byte[] ids = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_IDS); // Fetch the groupId now, in order to avoid double checking SimpleString groupId = message.getSimpleStringProperty(Message.HDR_GROUP_ID); @@ -319,7 +318,7 @@ public final class BindingsImpl implements Bindings { * these two servers. This will eventually send more messages to one server than the other * (depending if you are using multi-thread), and not lose messages. */ - private Binding getNextBinding(final ServerMessage message, + private Binding getNextBinding(final Message message, final SimpleString routingName, final List<Binding> bindings) { Integer ipos = routingNamePositions.get(routingName); @@ -407,7 +406,7 @@ public final class BindingsImpl implements Bindings { return theBinding; } - private void routeUsingStrictOrdering(final ServerMessage message, + private void routeUsingStrictOrdering(final Message message, final RoutingContext context, final GroupingHandler groupingGroupingHandler, final SimpleString groupId, @@ -473,7 +472,7 @@ public final class BindingsImpl implements Bindings { return null; } - private void routeAndCheckNull(ServerMessage message, + private void routeAndCheckNull(Message message, RoutingContext context, Response resp, Binding theBinding, @@ -552,10 +551,10 @@ public final class BindingsImpl implements Bindings { return writer.toString(); } - private void routeFromCluster(final ServerMessage message, + private void routeFromCluster(final Message message, final RoutingContext context, final byte[] ids) throws Exception { - byte[] idsToAck = (byte[]) message.removeProperty(MessageImpl.HDR_ROUTE_TO_ACK_IDS); + byte[] idsToAck = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_ACK_IDS); List<Long> idsToAckList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java index 04f432d..8f4ab48 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DivertBinding.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.Binding; @@ -23,7 +24,6 @@ import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.core.server.ServerMessage; public class DivertBinding implements Binding { @@ -98,12 +98,12 @@ public class DivertBinding implements Binding { } @Override - public boolean isHighAcceptPriority(final ServerMessage message) { + public boolean isHighAcceptPriority(final Message message) { return true; } @Override - public void route(final ServerMessage message, final RoutingContext context) throws Exception { + public void route(final Message message, final RoutingContext context) throws Exception { divert.route(message, context); } @@ -150,7 +150,7 @@ public class DivertBinding implements Binding { } @Override - public void routeWithAck(ServerMessage message, RoutingContext context) { + public void routeWithAck(Message message, RoutingContext context) { //noop } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java index af49c4d..176d614 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.postoffice.impl; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.postoffice.BindingType; @@ -24,7 +25,6 @@ import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; public class LocalQueueBinding implements QueueBinding { @@ -104,7 +104,7 @@ public class LocalQueueBinding implements QueueBinding { } @Override - public boolean isHighAcceptPriority(final ServerMessage message) { + public boolean isHighAcceptPriority(final Message message) { // It's a high accept priority if the queue has at least one matching consumer return queue.hasMatchingConsumer(message); @@ -116,14 +116,14 @@ public class LocalQueueBinding implements QueueBinding { } @Override - public void route(final ServerMessage message, final RoutingContext context) throws Exception { + public void route(final Message message, final RoutingContext context) throws Exception { if (isMatchRoutingType(context)) { queue.route(message, context); } } @Override - public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception { + public void routeWithAck(Message message, RoutingContext context) throws Exception { if (isMatchRoutingType(context)) { queue.routeWithAck(message, context); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 2f6ae3d..81a83ac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; @@ -45,7 +46,7 @@ import org.apache.activemq.artemis.api.core.management.NotificationType; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.io.IOCallback; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -69,12 +70,9 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; @@ -665,20 +663,20 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public RoutingStatus route(final ServerMessage message, + public RoutingStatus route(final Message message, final boolean direct) throws Exception { return route(message, (Transaction) null, direct); } @Override - public RoutingStatus route(final ServerMessage message, + public RoutingStatus route(final Message message, final Transaction tx, final boolean direct) throws Exception { return route(message, new RoutingContextImpl(tx), direct); } @Override - public RoutingStatus route(final ServerMessage message, + public RoutingStatus route(final Message message, final Transaction tx, final boolean direct, final boolean rejectDuplicates) throws Exception { @@ -686,14 +684,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public RoutingStatus route(final ServerMessage message, + public RoutingStatus route(final Message message, final RoutingContext context, final boolean direct) throws Exception { return route(message, context, direct, true); } @Override - public RoutingStatus route(final ServerMessage message, + public RoutingStatus route(final Message message, final RoutingContext context, final boolean direct, boolean rejectDuplicates) throws Exception { @@ -708,7 +706,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding AtomicBoolean startedTX = new AtomicBoolean(false); - final SimpleString address = message.getAddress(); + final SimpleString address = message.getAddressSimpleString(); applyExpiryDelay(message, address); @@ -716,13 +714,12 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return RoutingStatus.DUPLICATED_ID; } - if (message.hasInternalProperties()) { - // We need to perform some cleanup on internal properties, - // but we don't do it every time, otherwise it wouldn't be optimal - cleanupInternalPropertiesBeforeRouting(message); - } + // TODO-now: Internal properties shouldn't be part of the ServerMessage + // We need to perform some cleanup on internal properties, + // but we don't do it every time, otherwise it wouldn't be optimal + cleanupInternalPropertiesBeforeRouting(message); - Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress() == null ? message.getAddress() : context.getAddress()); + Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress() == null ? message.getAddressSimpleString() : context.getAddress()); // TODO auto-create queues here? // first check for the auto-queue creation thing @@ -768,8 +765,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding result = RoutingStatus.NO_BINDINGS; ActiveMQServerLogger.LOGGER.noDLA(address); } else { - message.setOriginalHeaders(message, null, false); + message.referenceOriginalMessage(message, null); message.setAddress(dlaAddress); route(message, context.getTransaction(), false); @@ -806,7 +803,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } // HORNETQ-1029 - private void applyExpiryDelay(ServerMessage message, SimpleString address) { + private void applyExpiryDelay(Message message, SimpleString address) { long expirationOverride = addressSettingsRepository.getMatch(address.toString()).getExpiryDelay(); // A -1 <expiry-delay> means don't do anything @@ -819,12 +816,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public MessageReference reroute(final ServerMessage message, + public MessageReference reroute(final Message message, final Queue queue, final Transaction tx) throws Exception { + setPagingStore(message); - MessageReference reference = message.createReference(queue); + MessageReference reference = MessageReference.Factory.createReference(message, queue); if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); @@ -852,15 +850,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding * The redistribution can't process the route right away as we may be dealing with a large message which will need to be processed on a different thread */ @Override - public Pair<RoutingContext, ServerMessage> redistribute(final ServerMessage message, + public Pair<RoutingContext, Message> redistribute(final Message message, final Queue originatingQueue, final Transaction tx) throws Exception { // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message // arrived the target node // as described on https://issues.jboss.org/browse/JBPAPP-6130 - ServerMessage copyRedistribute = message.copy(storageManager.generateID()); + Message copyRedistribute = message.copy(storageManager.generateID()); - Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddress()); + Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString()); if (bindings != null) { RoutingContext context = new RoutingContextImpl(tx); @@ -937,7 +935,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding synchronized (notificationLock) { // First send a reset message - ServerMessage message = new ServerMessageImpl(storageManager.generateID(), 50); + Message message = new CoreMessage(storageManager.generateID(), 50); message.setAddress(queueName); message.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA, true); @@ -987,7 +985,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } } - ServerMessage completeMessage = new ServerMessageImpl(storageManager.generateID(), 50); + Message completeMessage = new CoreMessage(storageManager.generateID(), 50); completeMessage.setAddress(queueName); completeMessage.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA_COMPLETE, true); @@ -1009,13 +1007,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding /** * @param message */ - protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message) { + protected void cleanupInternalPropertiesBeforeRouting(final Message message) { LinkedList<SimpleString> valuesToRemove = null; for (SimpleString name : message.getPropertyNames()) { // We use properties to establish routing context on clustering. // However if the client resends the message after receiving, it needs to be removed - if ((name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_IDS)) || (name.startsWith(MessageImpl.HDR_ROUTE_TO_ACK_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_ACK_IDS))) { + if ((name.startsWith(Message.HDR_ROUTE_TO_IDS) && !name.equals(Message.HDR_ROUTE_TO_IDS)) || (name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && !name.equals(Message.HDR_ROUTE_TO_ACK_IDS))) { if (valuesToRemove == null) { valuesToRemove = new LinkedList<>(); } @@ -1030,13 +1028,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - private void setPagingStore(final ServerMessage message) throws Exception { - PagingStore store = pagingManager.getPageStore(message.getAddress()); + private void setPagingStore(final Message message) throws Exception { + PagingStore store = pagingManager.getPageStore(message.getAddressSimpleString()); - message.setPagingStore(store); + message.setContext(store); } - private void routeQueueInfo(final ServerMessage message, + private void routeQueueInfo(final Message message, final Queue queue, final boolean applyFilters) throws Exception { if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message)) { @@ -1074,7 +1072,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public void processRoute(final ServerMessage message, + public void processRoute(final Message message, final RoutingContext context, final boolean direct) throws Exception { final List<MessageReference> refs = new ArrayList<>(); @@ -1095,7 +1093,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } for (Queue queue : entry.getValue().getNonDurableQueues()) { - MessageReference reference = message.createReference(queue); + MessageReference reference = MessageReference.Factory.createReference(message, queue); refs.add(reference); if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { @@ -1112,9 +1110,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding while (iter.hasNext()) { Queue queue = iter.next(); - MessageReference reference = message.createReference(queue); + MessageReference reference = MessageReference.Factory.createReference(message, queue); - if (context.isAlreadyAcked(message.getAddress(), queue)) { + if (context.isAlreadyAcked(message.getAddressSimpleString(), queue)) { reference.setAlreadyAcked(); if (tx != null) { queue.acknowledge(tx, reference); @@ -1189,7 +1187,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding * @param message * @throws Exception */ - private void confirmLargeMessageSend(Transaction tx, final ServerMessage message) throws Exception { + private void confirmLargeMessageSend(Transaction tx, final Message message) throws Exception { LargeServerMessage largeServerMessage = (LargeServerMessage) message; if (largeServerMessage.getPendingRecordID() >= 0) { if (tx == null) { @@ -1245,13 +1243,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - private boolean checkDuplicateID(final ServerMessage message, + private boolean checkDuplicateID(final Message message, final RoutingContext context, boolean rejectDuplicates, AtomicBoolean startedTX) throws Exception { // Check the DuplicateCache for the Bridge first - Object bridgeDup = message.getObjectProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID); + Object bridgeDup = message.getObjectProperty(Message.HDR_BRIDGE_DUPLICATE_ID); if (bridgeDup != null) { // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one byte[] bridgeDupBytes = (byte[]) bridgeDup; @@ -1270,7 +1268,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding return false; } - message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID); + message.removeProperty(Message.HDR_BRIDGE_DUPLICATE_ID); } else { // if used BridgeDuplicate, it's not going to use the regular duplicate // since this will would break redistribution (re-setting the duplicateId) @@ -1281,7 +1279,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding boolean isDuplicate = false; if (duplicateIDBytes != null) { - cache = getDuplicateIDCache(message.getAddress()); + cache = getDuplicateIDCache(message.getAddressSimpleString()); isDuplicate = cache.contains(duplicateIDBytes); @@ -1338,8 +1336,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } - private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName) { - ServerMessage message = new ServerMessageImpl(storageManager.generateID(), 50); + private Message createQueueInfoMessage(final NotificationType type, final SimpleString queueName) { + Message message = new CoreMessage().initBuffer(50).setMessageID(storageManager.generateID()); message.setAddress(queueName); @@ -1433,7 +1431,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // Reverse the ref counts, and paging sizes for (MessageReference ref : refs) { - ServerMessage message = ref.getMessage(); + Message message = ref.getMessage(); if (message.isDurable() && ref.getQueue().isDurable()) { message.decrementDurableRefCount(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/53887656/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java index 2869e38..45082b9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupRegistrationMessage; @@ -47,7 +48,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE; @@ -87,11 +87,11 @@ public class ServerPacketDecoder extends ClientPacketDecoder { switch (packetType) { case SESS_SEND: { - packet = new SessionSendMessage(new ServerMessageImpl()); + packet = new SessionSendMessage(new CoreMessage()); break; } case SESS_SEND_LARGE: { - packet = new SessionSendLargeMessage(new ServerMessageImpl()); + packet = new SessionSendLargeMessage(new CoreMessage()); break; } case REPLICATION_APPEND: {
