actual persistence work
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4e378c16 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4e378c16 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4e378c16 Branch: refs/heads/ARTEMIS-780 Commit: 4e378c16425245eb49fa653520a99cb6c6281812 Parents: 892bea4 Author: jbertram <[email protected]> Authored: Fri Oct 21 19:58:01 2016 -0500 Committer: Clebert Suconic <[email protected]> Committed: Mon Nov 7 11:28:07 2016 -0500 ---------------------------------------------------------------------- .../core/persistence/AddressBindingInfo.java | 4 -- .../core/persistence/StorageManager.java | 8 +++- .../journal/AbstractJournalStorageManager.java | 43 +++++++++++++++-- .../impl/journal/DescribeJournal.java | 2 +- .../impl/journal/JournalRecordIds.java | 2 + .../codec/PersistentAddressBindingEncoding.java | 49 +------------------ .../impl/nullpm/NullStorageManager.java | 13 ++++- .../core/server/impl/ActiveMQServerImpl.java | 20 +++++++- .../artemis/core/server/impl/AddressInfo.java | 9 ++-- .../artemis/core/server/impl/JournalLoader.java | 4 ++ .../server/impl/PostOfficeJournalLoader.java | 16 +++++++ .../transaction/impl/TransactionImplTest.java | 15 +++++- .../addressing/AddressConfigTest.java | 50 ++++++++++++++++++++ .../DeleteMessagesOnStartupTest.java | 3 +- .../integration/persistence/RestartSMTest.java | 5 +- .../persistence/StorageManagerTestBase.java | 3 +- .../impl/DuplicateDetectionUnitTest.java | 7 +-- .../server/impl/fakes/FakeJournalLoader.java | 6 +++ 18 files changed, 188 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java index 83d37bc..838be12 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/AddressBindingInfo.java @@ -25,10 +25,6 @@ public interface AddressBindingInfo { SimpleString getName(); - boolean isAutoCreated(); - - SimpleString getUser(); - AddressInfo.RoutingType getRoutingType(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index bbfec14..ee11577 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.JournalLoader; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -298,8 +299,13 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { void deleteQueueStatus(long recordID) throws Exception; + void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception; + + void deleteAddressBinding(long tx, long addressBindingID) throws Exception; + JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, - List<GroupingInfo> groupingInfos) throws Exception; + List<GroupingInfo> groupingInfos, + List<AddressBindingInfo> addressBindingInfos) throws Exception; // grouping related operations void addGrouping(GroupBinding groupBinding) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index ecaa86e..b67cfa6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -56,6 +56,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PagePosition; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl; import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl; +import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; @@ -77,6 +78,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCount import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentAddressBindingEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.QueueStatusEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding; @@ -93,6 +95,7 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.JournalLoader; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.transaction.ResourceManager; @@ -1261,7 +1264,29 @@ public abstract class AbstractJournalStorageManager implements StorageManager { } finally { readUnLock(); } + } + + public void addAddressBinding(final long tx, final AddressInfo addressInfo) throws Exception { + PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(addressInfo.getName(), addressInfo.getRoutingType()); + readLock(); + try { + long recordID = idGenerator.generateID(); + bindingEncoding.setId(recordID); + bindingsJournal.appendAddRecordTransactional(tx, recordID, JournalRecordIds.ADDRESS_BINDING_RECORD, bindingEncoding); + } finally { + readUnLock(); + } + } + + @Override + public void deleteAddressBinding(long tx, final long addressBindingID) throws Exception { + readLock(); + try { + bindingsJournal.appendDeleteRecordTransactional(tx, addressBindingID); + } finally { + readUnLock(); + } } @Override @@ -1347,7 +1372,8 @@ public abstract class AbstractJournalStorageManager implements StorageManager { @Override public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, - final List<GroupingInfo> groupingInfos) throws Exception { + final List<GroupingInfo> groupingInfos, + final List<AddressBindingInfo> addressBindingInfos) throws Exception { List<RecordInfo> records = new ArrayList<>(); List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>(); @@ -1364,12 +1390,15 @@ public abstract class AbstractJournalStorageManager implements StorageManager { byte rec = record.getUserRecordType(); if (rec == JournalRecordIds.QUEUE_BINDING_RECORD) { - PersistentQueueBindingEncoding bindingEncoding = newBindingEncoding(id, buffer); - + PersistentQueueBindingEncoding bindingEncoding = newQueueBindingEncoding(id, buffer); queueBindingInfos.add(bindingEncoding); mapBindings.put(bindingEncoding.getId(), bindingEncoding); } else if (rec == JournalRecordIds.ID_COUNTER_RECORD) { idGenerator.loadState(record.id, buffer); + } else if (rec == JournalRecordIds.ADDRESS_BINDING_RECORD) { + PersistentAddressBindingEncoding bindingEncoding = newAddressBindingEncoding(id, buffer); + ActiveMQServerLogger.LOGGER.info("=== Loading: " + bindingEncoding); + addressBindingInfos.add(bindingEncoding); } else if (rec == JournalRecordIds.GROUP_RECORD) { GroupingEncoding encoding = newGroupEncoding(id, buffer); groupingInfos.add(encoding); @@ -1849,7 +1878,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager { * @param buffer * @return */ - protected static PersistentQueueBindingEncoding newBindingEncoding(long id, ActiveMQBuffer buffer) { + protected static PersistentQueueBindingEncoding newQueueBindingEncoding(long id, ActiveMQBuffer buffer) { PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(); bindingEncoding.decode(buffer); @@ -1872,8 +1901,14 @@ public abstract class AbstractJournalStorageManager implements StorageManager { return statusEncoding; } + protected static PersistentAddressBindingEncoding newAddressBindingEncoding(long id, ActiveMQBuffer buffer) { + PersistentAddressBindingEncoding bindingEncoding = new PersistentAddressBindingEncoding(); + bindingEncoding.decode(buffer); + bindingEncoding.setId(id); + return bindingEncoding; + } @Override public boolean addToPage(PagingStore store, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index 58723c6..a5c1fd7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -555,7 +555,7 @@ public final class DescribeJournal { return AbstractJournalStorageManager.newQueueStatusEncoding(id, buffer); case QUEUE_BINDING_RECORD: - return AbstractJournalStorageManager.newBindingEncoding(id, buffer); + return AbstractJournalStorageManager.newQueueBindingEncoding(id, buffer); case ID_COUNTER_RECORD: EncodingSupport idReturn = new IDCounterEncoding(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java index 0169f38..cd1d526 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java @@ -83,4 +83,6 @@ public final class JournalRecordIds { public static final byte PAGE_CURSOR_COMPLETE = 42; public static final byte PAGE_CURSOR_PENDING_COUNTER = 43; + + public static final byte ADDRESS_BINDING_RECORD = 44; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java index 9f47362..7ef7e4d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentAddressBindingEncoding.java @@ -29,10 +29,6 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres public SimpleString name; - public boolean autoCreated; - - public SimpleString user; - public AddressInfo.RoutingType routingType; public PersistentAddressBindingEncoding() { @@ -43,22 +39,14 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres return "PersistentAddressBindingEncoding [id=" + id + ", name=" + name + - ", user=" + - user + - ", autoCreated=" + - autoCreated + ", routingType=" + routingType + "]"; } public PersistentAddressBindingEncoding(final SimpleString name, - final SimpleString user, - final boolean autoCreated, final AddressInfo.RoutingType routingType) { this.name = name; - this.user = user; - this.autoCreated = autoCreated; this.routingType = routingType; } @@ -77,16 +65,6 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres } @Override - public SimpleString getUser() { - return user; - } - - @Override - public boolean isAutoCreated() { - return autoCreated; - } - - @Override public AddressInfo.RoutingType getRoutingType() { return routingType; } @@ -94,42 +72,17 @@ public class PersistentAddressBindingEncoding implements EncodingSupport, Addres @Override public void decode(final ActiveMQBuffer buffer) { name = buffer.readSimpleString(); - - String metadata = buffer.readNullableSimpleString().toString(); - if (metadata != null) { - String[] elements = metadata.split(";"); - for (String element : elements) { - String[] keyValuePair = element.split("="); - if (keyValuePair.length == 2) { - if (keyValuePair[0].equals("user")) { - user = SimpleString.toSimpleString(keyValuePair[1]); - } - } - } - } - - autoCreated = buffer.readBoolean(); routingType = AddressInfo.RoutingType.getType(buffer.readByte()); } @Override public void encode(final ActiveMQBuffer buffer) { buffer.writeSimpleString(name); - buffer.writeNullableSimpleString(createMetadata()); - buffer.writeBoolean(autoCreated); buffer.writeByte(routingType.getType()); } @Override public int getEncodeSize() { - return SimpleString.sizeofString(name) + DataConstants.SIZE_BOOLEAN + - SimpleString.sizeofNullableString(createMetadata()) + - DataConstants.SIZE_BYTE; - } - - private SimpleString createMetadata() { - StringBuilder metadata = new StringBuilder(); - metadata.append("user=").append(user).append(";"); - return SimpleString.toSimpleString(metadata.toString()); + return SimpleString.sizeofString(name) + DataConstants.SIZE_BYTE; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 3a2999e..404f248 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; +import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; @@ -55,6 +56,7 @@ import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.JournalLoader; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -155,12 +157,21 @@ public class NullStorageManager implements StorageManager { } @Override + public void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception { + } + + @Override + public void deleteAddressBinding(long tx, long addressBindingID) throws Exception { + } + + @Override public void commit(final long txID) throws Exception { } @Override public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, - final List<GroupingInfo> groupingInfos) throws Exception { + final List<GroupingInfo> groupingInfos, + final List<AddressBindingInfo> addressBindingInfos) throws Exception { return new JournalLoadInformation(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 375e678..cce81c5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; +import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; @@ -2137,7 +2138,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { List<GroupingInfo> groupingInfos = new ArrayList<>(); - journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos, groupingInfos); + List<AddressBindingInfo> addressBindingInfos = new ArrayList<>(); + + journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos, groupingInfos, addressBindingInfos); recoverStoredConfigs(); @@ -2147,6 +2150,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { journalLoader.handleGroupingBindings(groupingInfos); + Map<Long, AddressBindingInfo> addressBindingInfosMap = new HashMap<>(); + + journalLoader.initAddresses(addressBindingInfosMap, addressBindingInfos); + Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>(); HashSet<Pair<Long, Long>> pendingLargeMessages = new HashSet<>(); @@ -2245,6 +2252,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build(); final Queue queue = queueFactory.createQueueWith(queueConfig); + boolean addressAlreadyExists = true; + + if (postOffice.getAddressInfo(queue.getAddress()) == null) { + postOffice.addAddressInfo(new AddressInfo(queue.getAddress()) + .setRoutingType(AddressInfo.RoutingType.MULTICAST)); + addressAlreadyExists = false; + } + if (transientQueue) { queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName())); } else if (queue.isAutoCreated()) { @@ -2255,6 +2270,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (queue.isDurable()) { storageManager.addQueueBinding(txID, localQueueBinding); + if (!addressAlreadyExists) { + storageManager.addAddressBinding(txID, getAddressInfo(queue.getAddress())); + } } try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index 1449107..4e982c4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -36,24 +36,27 @@ public class AddressInfo { return routingType; } - public void setRoutingType(RoutingType routingType) { + public AddressInfo setRoutingType(RoutingType routingType) { this.routingType = routingType; + return this; } public boolean isDefaultDeleteOnNoConsumers() { return defaultDeleteOnNoConsumers; } - public void setDefaultDeleteOnNoConsumers(boolean defaultDeleteOnNoConsumers) { + public AddressInfo setDefaultDeleteOnNoConsumers(boolean defaultDeleteOnNoConsumers) { this.defaultDeleteOnNoConsumers = defaultDeleteOnNoConsumers; + return this; } public int getDefaultMaxConsumers() { return defaultMaxConsumers; } - public void setDefaultMaxConsumers(int defaultMaxConsumers) { + public AddressInfo setDefaultMaxConsumers(int defaultMaxConsumers) { this.defaultMaxConsumers = defaultMaxConsumers; + return this; } public SimpleString getName() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java index 6f36ff5..40cef50 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.impl.PageCountPending; @@ -37,6 +38,9 @@ public interface JournalLoader { void initQueues(Map<Long, QueueBindingInfo> queueBindingInfosMap, List<QueueBindingInfo> queueBindingInfos) throws Exception; + void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap, + List<AddressBindingInfo> addressBindingInfo) throws Exception; + void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception; void handleNoMessageReferences(Map<Long, ServerMessage> messages); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 71c5b2b..4e89e8a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.paging.impl.Page; +import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.QueueStatus; @@ -166,6 +167,21 @@ public class PostOfficeJournalLoader implements JournalLoader { } @Override + public void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap, + List<AddressBindingInfo> addressBindingInfos) throws Exception { + for (AddressBindingInfo addressBindingInfo : addressBindingInfos) { + addressBindingInfosMap.put(addressBindingInfo.getId(), addressBindingInfo); + + // TODO: figure out what else to set here + AddressInfo addressInfo = new AddressInfo(addressBindingInfo.getName()) + .setRoutingType(addressBindingInfo.getRoutingType()); + + postOffice.addAddressInfo(addressInfo); + managementService.registerAddress(addressInfo.getName()); + } + } + + @Override public void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception { for (Map.Entry<Long, Map<Long, AddMessageRecord>> entry : queueMap.entrySet()) { long queueID = entry.getKey(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index 93c5c9d..97dc90d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; +import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; @@ -55,6 +56,7 @@ import org.apache.activemq.artemis.core.server.RouteContextList; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.JournalLoader; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -529,8 +531,19 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override + public void addAddressBinding(long tx, AddressInfo addressInfo) throws Exception { + + } + + @Override + public void deleteAddressBinding(long tx, long addressBindingID) throws Exception { + + } + + @Override public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, - List<GroupingInfo> groupingInfos) throws Exception { + List<GroupingInfo> groupingInfos, + List<AddressBindingInfo> addressBindingInfos) throws Exception { return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java new file mode 100644 index 0000000..f3a0beb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressConfigTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <br> + * http://www.apache.org/licenses/LICENSE-2.0 + * <br> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.addressing; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Before; +import org.junit.Test; + +public class AddressConfigTest extends ActiveMQTestBase { + + protected ActiveMQServer server; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration configuration = createDefaultInVMConfig(); + server = createServer(true, configuration); + server.start(); + } + + @Test + public void persistAddressConfigTest() throws Exception { + server.createQueue(SimpleString.toSimpleString("myAddress"), SimpleString.toSimpleString("myQueue"), null, true, false); + server.stop(); + server.start(); + AddressInfo addressInfo = server.getAddressInfo(SimpleString.toSimpleString("myAddress")); + assertNotNull(addressInfo); + assertEquals(AddressInfo.RoutingType.MULTICAST, addressInfo.getRoutingType()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java index 7d515d8..90f7c5f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java @@ -24,6 +24,7 @@ import java.util.HashMap; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.StoreConfiguration; +import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; @@ -76,7 +77,7 @@ public class DeleteMessagesOnStartupTest extends StorageManagerTestBase { journal.start(); - journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); + journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>()); FakePostOffice postOffice = new FakePostOffice(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java index 49d3a12..2ee879f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/RestartSMTest.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; +import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; @@ -73,7 +74,7 @@ public class RestartSMTest extends ActiveMQTestBase { List<QueueBindingInfo> queueBindingInfos = new ArrayList<>(); - journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>()); + journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>()); journal.loadMessageJournal(postOffice, null, null, null, null, null, null, new FakeJournalLoader()); @@ -87,7 +88,7 @@ public class RestartSMTest extends ActiveMQTestBase { queueBindingInfos = new ArrayList<>(); - journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>()); + journal.loadBindingJournal(queueBindingInfos, new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>()); journal.start(); } finally { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java index a104363..508f23b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/StorageManagerTestBase.java @@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.StoreConfiguration; +import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -128,7 +129,7 @@ public abstract class StorageManagerTestBase extends ActiveMQTestBase { journal.start(); - journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); + journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>()); journal.loadMessageJournal(new FakePostOffice(), null, null, null, null, null, null, new FakeJournalLoader()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java index 96fa35c..58c5c4f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; @@ -95,7 +96,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { journal = new JournalStorageManager(configuration, factory, factory); journal.start(); - journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); + journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>()); HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<>(); @@ -114,7 +115,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { journal = new JournalStorageManager(configuration, factory, factory); journal.start(); - journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); + journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>()); journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null)); @@ -137,7 +138,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { journal = new JournalStorageManager(configuration, factory, factory); journal.start(); - journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>()); + journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>(), new ArrayList<AddressBindingInfo>()); journal.loadMessageJournal(postOffice, pagingManager, new ResourceManagerImpl(0, 0, scheduledThreadPool), null, mapDups, null, null, new PostOfficeJournalLoader(postOffice, pagingManager, null, null, null, null, null, null)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4e378c16/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeJournalLoader.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeJournalLoader.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeJournalLoader.java index 32ad718..547d669 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeJournalLoader.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeJournalLoader.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.core.persistence.AddressBindingInfo; import org.apache.activemq.artemis.core.persistence.GroupingInfo; import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.impl.PageCountPending; @@ -49,6 +50,11 @@ public class FakeJournalLoader implements JournalLoader { } @Override + public void initAddresses(Map<Long, AddressBindingInfo> addressBindingInfosMap, + List<AddressBindingInfo> addressBindingInfo) throws Exception { + } + + @Override public void handleGroupingBindings(List<GroupingInfo> groupingInfos) { }
