http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java index 1a64e6e..99573f8 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java +++ b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java @@ -22,10 +22,10 @@ import java.security.InvalidParameterException; import java.security.MessageDigest; import java.security.PrivilegedAction; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -45,6 +45,7 @@ import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.HornetQBuffers; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.HornetQIllegalStateException; +import org.hornetq.api.core.HornetQInternalErrorException; import org.hornetq.api.core.Message; import org.hornetq.api.core.Pair; import org.hornetq.api.core.SimpleString; @@ -106,6 +107,7 @@ import org.hornetq.core.transaction.TransactionOperationAbstract; import org.hornetq.core.transaction.TransactionPropertyIndexes; import org.hornetq.core.transaction.impl.TransactionImpl; import org.hornetq.utils.Base64; +import org.hornetq.utils.ByteUtil; import org.hornetq.utils.DataConstants; import org.hornetq.utils.ExecutorFactory; import org.hornetq.utils.HornetQThreadFactory; @@ -350,7 +352,7 @@ public class JournalStorageManager implements StorageManager * To achieve (2), instead of writing directly to instances of {@link JournalImpl}, we write to * instances of {@link ReplicatedJournal}. * <p/> - * At the backup-side replication is handled by {@link ReplicationEndpoint}. + * At the backup-side replication is handled by {@link org.hornetq.core.replication.ReplicationEndpoint}. * * @param replicationManager * @param pagingManager @@ -380,6 +382,10 @@ public class JournalStorageManager implements StorageManager JournalFile[] messageFiles = null; JournalFile[] bindingsFiles = null; + // We get a picture of the current sitaution on the large messages + // and we send the current messages while more state is coming + Map<Long, Pair<String, Long>> pendingLargeMessages = null; + try { Map<SimpleString, Collection<Integer>> pageFilesToSync; @@ -408,7 +414,7 @@ public class JournalStorageManager implements StorageManager bindingsFiles = prepareJournalForCopy(originalBindingsJournal, JournalContent.BINDINGS, nodeID, autoFailBack); pageFilesToSync = getPageInformationForSync(pagingManager); - getLargeMessageInformation(); + pendingLargeMessages = recoverPendingLargeMessages(); } finally { @@ -428,9 +434,11 @@ public class JournalStorageManager implements StorageManager storageManagerLock.writeLock().unlock(); } + // it will send a list of IDs that we are allocating + replicator.sendLargeMessageIdListMessage(pendingLargeMessages); sendJournalFile(messageFiles, JournalContent.MESSAGES); sendJournalFile(bindingsFiles, JournalContent.BINDINGS); - sendLargeMessageFiles(); + sendLargeMessageFiles(pendingLargeMessages); sendPagesToBackup(pageFilesToSync, pagingManager); storageManagerLock.writeLock().lock(); @@ -574,23 +582,18 @@ public class JournalStorageManager implements StorageManager return info; } - private void sendLargeMessageFiles() throws Exception + private void sendLargeMessageFiles(final Map<Long, Pair<String, Long>> pendingLargeMessages) throws Exception { - while (true) + Iterator<Entry<Long, Pair<String, Long>>> iter = pendingLargeMessages.entrySet().iterator(); + while (started && iter.hasNext()) { - Map.Entry<Long, Pair<String, Long>> entry = replicator.getNextLargeMessageToSync(); - if (entry == null) - { - break; - } + Map.Entry<Long, Pair<String, Long>> entry = iter.next(); String fileName = entry.getValue().getA(); final long id = entry.getKey(); long size = entry.getValue().getB(); SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName, 1); if (!seqFile.exists()) continue; - if (!started) - return; replicator.syncLargeMessageFile(seqFile, size, id); } } @@ -610,8 +613,9 @@ public class JournalStorageManager implements StorageManager * * @throws Exception */ - private void getLargeMessageInformation() throws Exception + private Map<Long, Pair<String, Long>> recoverPendingLargeMessages() throws Exception { + Map<Long, Pair<String, Long>> largeMessages = new HashMap<Long, Pair<String, Long>>(); // only send durable messages... // listFiles append a "." to anything... List<String> filenames = largeMessagesFactory.listFiles("msg"); @@ -620,12 +624,16 @@ public class JournalStorageManager implements StorageManager for (String filename : filenames) { Long id = getLargeMessageIdFromFilename(filename); - idList.add(id); - SequentialFile seqFile = largeMessagesFactory.createSequentialFile(filename, 1); - long size = seqFile.size(); - largeMessages.put(id, new Pair<String, Long>(filename, size)); + if (!largeMessagesToDelete.contains(id)) + { + idList.add(id); + SequentialFile seqFile = largeMessagesFactory.createSequentialFile(filename, 1); + long size = seqFile.size(); + largeMessages.put(id, new Pair<String, Long>(filename, size)); + } } - replicator.sendLargeMessageIdListMessage(largeMessages); + + return largeMessages; } /** @@ -763,12 +771,12 @@ public class JournalStorageManager implements StorageManager getContext().executeOnCompletion(run); } - public long generateUniqueID() + public long generateID() { return idGenerator.generateID(); } - public long getCurrentUniqueID() + public long getCurrentID() { return idGenerator.getCurrentID(); } @@ -838,7 +846,7 @@ public class JournalStorageManager implements StorageManager readLock(); try { - long recordID = generateUniqueID(); + long recordID = generateID(); messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new PendingLargeMessageEncoding(messageID), @@ -1081,7 +1089,7 @@ public class JournalStorageManager implements StorageManager readLock(); try { - pageTransaction.setRecordID(generateUniqueID()); + pageTransaction.setRecordID(generateID()); messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, pageTransaction); } @@ -1207,7 +1215,7 @@ public class JournalStorageManager implements StorageManager readLock(); try { - long id = generateUniqueID(); + long id = generateID(); messageJournal.appendAddRecord(id, JournalRecordIds.HEURISTIC_COMPLETION, @@ -2392,6 +2400,21 @@ public class JournalStorageManager implements StorageManager // This should be accessed from this package only void deleteLargeMessageFile(final LargeServerMessage largeServerMessage) throws HornetQException { + if (largeServerMessage.getPendingRecordID() < 0) + { + try + { + // The delete file happens asynchronously + // And the client won't be waiting for the actual file to be deleted. + // We set a temporary record (short lived) on the journal + // to avoid a situation where the server is restarted and pending large message stays on forever + largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID())); + } + catch (Exception e) + { + throw new HornetQInternalErrorException(e.getMessage(), e); + } + } final SequentialFile file = largeServerMessage.getFile(); if (file == null) { @@ -3434,7 +3457,7 @@ public class JournalStorageManager implements StorageManager // SimpleString simpleStr = new SimpleString(duplID); // return "DuplicateIDEncoding [address=" + address + ", duplID=" + simpleStr + "]"; - return "DuplicateIDEncoding [address=" + address + ", duplID=" + Arrays.toString(duplID) + "]"; + return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]"; } } @@ -3443,7 +3466,7 @@ public class JournalStorageManager implements StorageManager * This is only used when loading a transaction. * <p/> * it might be possible to merge the functionality of this class with - * {@link PagingStoreImpl.FinishPageMessageOperation} + * {@link org.hornetq.core.persistence.impl.journal.JournalStorageManager.FinishPageMessageOperation} */ // TODO: merge this class with the one on the PagingStoreImpl private static class FinishPageMessageOperation extends TransactionOperationAbstract implements TransactionOperation
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java index 0c494d2..5e6bfbd 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java +++ b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java @@ -18,6 +18,7 @@ package org.hornetq.core.persistence.impl.journal; import java.nio.ByteBuffer; import org.hornetq.api.core.HornetQException; +import org.hornetq.api.core.Message; import org.hornetq.core.journal.SequentialFile; import org.hornetq.core.persistence.StorageManager; import org.hornetq.core.persistence.StorageManager.LargeMessageExtension; @@ -77,15 +78,17 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage } @Override - public void setDurable(boolean durable) + public Message setDurable(boolean durable) { mainLM.setDurable(durable); + return mainLM; } @Override - public synchronized void setMessageID(long id) + public synchronized Message setMessageID(long id) { mainLM.setMessageID(id); + return mainLM; } @Override http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java index b29ea13..37866e9 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java +++ b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java @@ -232,7 +232,7 @@ public class NullStorageManager implements StorageManager @Override public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception { - return generateUniqueID(); + return generateID(); } @Override @@ -264,7 +264,7 @@ public class NullStorageManager implements StorageManager } @Override - public long generateUniqueID() + public long generateID() { long id = idSequence.getAndIncrement(); @@ -272,7 +272,7 @@ public class NullStorageManager implements StorageManager } @Override - public long getCurrentUniqueID() + public long getCurrentID() { return idSequence.get(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/postoffice/QueueInfo.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/postoffice/QueueInfo.java b/hornetq-server/src/main/java/org/hornetq/core/postoffice/QueueInfo.java index 05bd090..1e15014 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/postoffice/QueueInfo.java +++ b/hornetq-server/src/main/java/org/hornetq/core/postoffice/QueueInfo.java @@ -130,6 +130,27 @@ public class QueueInfo implements Serializable numberOfConsumers--; } + public boolean matchesAddress(SimpleString address) + { + boolean containsAddress = false; + + if (address != null) + { + SimpleString[] split = address.split(','); + for (SimpleString addressPart : split) + { + containsAddress = address.startsWith(addressPart); + + if (containsAddress) + { + break; + } + } + } + + return containsAddress; + } + /* (non-Javadoc) * @see java.lang.Object#toString() */ @@ -153,6 +174,4 @@ public class QueueInfo implements Serializable distance + "]"; } - - } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java index c037ff5..f545345 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/BindingsImpl.java @@ -34,6 +34,7 @@ import org.hornetq.core.server.HornetQServerLogger; import org.hornetq.core.server.Queue; import org.hornetq.core.server.RoutingContext; import org.hornetq.core.server.ServerMessage; +import org.hornetq.core.server.cluster.RemoteQueueBinding; import org.hornetq.core.server.group.GroupingHandler; import org.hornetq.core.server.group.impl.Proposal; import org.hornetq.core.server.group.impl.Response; @@ -262,6 +263,34 @@ public final class BindingsImpl implements Bindings private void route(final ServerMessage message, final RoutingContext context, final boolean groupRouting) throws Exception { + /* This is a special treatment for scaled-down messages involving SnF queues. + * See org.hornetq.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property + */ + if (message.containsProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS)) + { + byte[] ids = (byte[]) message.removeProperty(MessageImpl.HDR_SCALEDOWN_TO_IDS); + + if (ids != null) + { + ByteBuffer buffer = ByteBuffer.wrap(ids); + while (buffer.hasRemaining()) + { + long id = buffer.getLong(); + for (Map.Entry<Long, Binding> entry : bindingsMap.entrySet()) + { + if (entry.getValue() instanceof RemoteQueueBinding) + { + RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue(); + if (remoteQueueBinding.getRemoteQueueID() == id) + { + message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array()); + } + } + } + } + } + } + boolean routed = false; if (!exclusiveBindings.isEmpty()) http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java index ddd8a44..d10294a 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java @@ -93,7 +93,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache // cache size has been reduced in config - delete the extra records if (txID == -1) { - txID = storageManager.generateUniqueID(); + txID = storageManager.generateID(); } storageManager.deleteDuplicateIDTransactional(txID, id.getB()); @@ -156,7 +156,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { if (persist) { - recordID = storageManager.generateUniqueID(); + recordID = storageManager.generateID(); storageManager.storeDuplicateID(address, duplID, recordID); } @@ -166,7 +166,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { if (persist) { - recordID = storageManager.generateUniqueID(); + recordID = storageManager.generateID(); storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID); tx.setContainsPersistent(); @@ -245,12 +245,15 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { synchronized (this) { - long tx = storageManager.generateUniqueID(); - for (Pair<ByteArrayHolder, Long> id : ids) + if (ids.size() > 0) { - storageManager.deleteDuplicateIDTransactional(tx, id.getB()); + long tx = storageManager.generateID(); + for (Pair<ByteArrayHolder, Long> id : ids) + { + storageManager.deleteDuplicateIDTransactional(tx, id.getB()); + } + storageManager.commit(tx); } - storageManager.commit(tx); ids.clear(); cache.clear(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java index 8b90e9a..5b3e7cb 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java @@ -34,6 +34,7 @@ import org.hornetq.api.core.HornetQNonExistentQueueException; import org.hornetq.api.core.Message; import org.hornetq.api.core.Pair; import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.ManagementHelper; import org.hornetq.api.core.management.NotificationType; import org.hornetq.core.filter.Filter; @@ -218,13 +219,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding public void onNotification(final Notification notification) { + if (!(notification.getType() instanceof CoreNotificationType)) return; + if (isTrace) { HornetQServerLogger.LOGGER.trace("Receiving notification : " + notification + " on server " + this.server); } synchronized (notificationLock) { - NotificationType type = notification.getType(); + CoreNotificationType type = (CoreNotificationType) notification.getType(); switch (type) { @@ -446,7 +449,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // PostOffice implementation ----------------------------------------------- - // TODO - needs to be synchronized to prevent happening concurrently with activate(). + // TODO - needs to be synchronized to prevent happening concurrently with activate() // (and possible removeBinding and other methods) // Otherwise can have situation where createQueue comes in before failover, then failover occurs // and post office is activated but queue remains unactivated after failover so delivery never occurs @@ -483,7 +486,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding HornetQServerLogger.LOGGER.debug("ClusterCommunication::Sending notification for addBinding " + binding + " from server " + server); } - managementService.sendNotification(new Notification(uid, NotificationType.BINDING_ADDED, props)); + managementService.sendNotification(new Notification(uid, CoreNotificationType.BINDING_ADDED, props)); } public synchronized Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception @@ -539,7 +542,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, binding.getFilter().getFilterString()); } - managementService.sendNotification(new Notification(null, NotificationType.BINDING_REMOVED, props)); + managementService.sendNotification(new Notification(null, CoreNotificationType.BINDING_REMOVED, props)); } binding.close(); @@ -555,6 +558,13 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding { cache.clear(); } + + cache = duplicateIDCaches.remove(BRIDGE_CACHE_STR.concat(address)); + + if (cache != null) + { + cache.clear(); + } } @Override @@ -799,7 +809,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message // arrived the target node // as described on https://issues.jboss.org/browse/JBPAPP-6130 - ServerMessage copyRedistribute = message.copy(storageManager.generateUniqueID()); + ServerMessage copyRedistribute = message.copy(storageManager.generateID()); Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddress()); @@ -871,7 +881,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding { // First send a reset message - ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), 50); + ServerMessage message = new ServerMessageImpl(storageManager.generateID(), 50); message.setAddress(queueName); message.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA, true); @@ -883,9 +893,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding { HornetQServerLogger.LOGGER.trace("QueueInfo on sendQueueInfoToQueue = " + info); } - if (info.getAddress().startsWith(address)) + if (info.matchesAddress(address)) { - message = createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName); + message = createQueueInfoMessage(CoreNotificationType.BINDING_ADDED, queueName); message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress()); message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName()); @@ -900,7 +910,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++) { - message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName); + message = createQueueInfoMessage(CoreNotificationType.CONSUMER_CREATED, queueName); message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress()); message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName()); @@ -914,7 +924,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding { for (SimpleString filterString : info.getFilterStrings()) { - message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName); + message = createQueueInfoMessage(CoreNotificationType.CONSUMER_CREATED, queueName); message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress()); message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName()); @@ -927,7 +937,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } } - ServerMessage completeMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50); + ServerMessage completeMessage = new ServerMessageImpl(storageManager.generateID(), 50); completeMessage.setAddress(queueName); completeMessage.putBooleanProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA_COMPLETE, true); @@ -960,7 +970,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding // We use properties to establish routing context on clustering. // However if the client resends the message after receiving, it needs to be removed if ((name.startsWith(MessageImpl.HDR_ROUTE_TO_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_IDS)) || - name.equals(MessageImpl.HDR_ROUTE_TO_ACK_IDS)) + (name.startsWith(MessageImpl.HDR_ROUTE_TO_ACK_IDS) && !name.equals(MessageImpl.HDR_ROUTE_TO_ACK_IDS))) { if (valuesToRemove == null) { @@ -1360,7 +1370,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName) { - ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), 50); + ServerMessage message = new ServerMessageImpl(storageManager.generateID(), 50); message.setAddress(queueName); @@ -1513,4 +1523,15 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } return bindings; } + + // For tests only + public AddressManager getAddressManager() + { + return addressManager; + } + + public HornetQServer getServer() + { + return server; + } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/SimpleAddressManager.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/SimpleAddressManager.java b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/SimpleAddressManager.java index 66128db..08ad4f1 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/SimpleAddressManager.java +++ b/hornetq-server/src/main/java/org/hornetq/core/postoffice/impl/SimpleAddressManager.java @@ -63,9 +63,9 @@ public class SimpleAddressManager implements AddressManager throw HornetQMessageBundle.BUNDLE.bindingAlreadyExists(binding); } - if (HornetQServerLogger.LOGGER.isDebugEnabled()) + if (HornetQServerLogger.LOGGER.isTraceEnabled()) { - HornetQServerLogger.LOGGER.debug("Adding binding " + binding + " with address = " + binding.getUniqueName(), new Exception("trace")); + HornetQServerLogger.LOGGER.trace("Adding binding " + binding + " with address = " + binding.getUniqueName(), new Exception("trace")); } return addMappingInternal(binding.getAddress(), binding); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/protocol/ProtocolHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/protocol/ProtocolHandler.java b/hornetq-server/src/main/java/org/hornetq/core/protocol/ProtocolHandler.java index dfc0521..d7edc4a 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/protocol/ProtocolHandler.java +++ b/hornetq-server/src/main/java/org/hornetq/core/protocol/ProtocolHandler.java @@ -14,16 +14,19 @@ package org.hornetq.core.protocol; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; @@ -35,11 +38,15 @@ import org.hornetq.core.remoting.impl.netty.ConnectionCreator; import org.hornetq.core.remoting.impl.netty.HttpAcceptorHandler; import org.hornetq.core.remoting.impl.netty.HttpKeepAliveRunnable; import org.hornetq.core.remoting.impl.netty.NettyAcceptor; +import org.hornetq.core.remoting.impl.netty.NettyConnector; import org.hornetq.core.remoting.impl.netty.NettyServerConnection; import org.hornetq.core.remoting.impl.netty.TransportConstants; import org.hornetq.spi.core.protocol.ProtocolManager; import org.hornetq.utils.ConfigurationHelper; +import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + public class ProtocolHandler { private Map<String, ProtocolManager> protocolMap; @@ -104,6 +111,12 @@ public class ProtocolHandler ctx.pipeline().remove("http-handler"); ctx.fireChannelRead(msg); } + // HORNETQ-1391 + else if (upgrade != null && upgrade.equalsIgnoreCase(NettyConnector.HORNETQ_REMOTING)) + { + // Send the response and close the connection if necessary. + ctx.writeAndFlush(new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN)).addListener(ChannelFutureListener.CLOSE); + } } else { @@ -133,15 +146,25 @@ public class ProtocolHandler return; } String protocolToUse = null; - for (String protocol : protocolMap.keySet()) + Set<String> protocolSet = protocolMap.keySet(); + if (!protocolSet.isEmpty()) { - ProtocolManager protocolManager = protocolMap.get(protocol); - if (protocolManager.isProtocol(in.copy(0, 8).array())) + // Use getBytes(...) as this works with direct and heap buffers. + // See https://issues.jboss.org/browse/HORNETQ-1406 + byte[] bytes = new byte[8]; + in.getBytes(0, bytes); + + for (String protocol : protocolSet) { - protocolToUse = protocol; - break; + ProtocolManager protocolManager = protocolMap.get(protocol); + if (protocolManager.isProtocol(bytes)) + { + protocolToUse = protocol; + break; + } } } + //if we get here we assume we use the core protocol as we match nothing else if (protocolToUse == null) { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java index 9d5fdb0..d0fe3ac 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java +++ b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java @@ -20,9 +20,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import io.netty.channel.ChannelPipeline; -import org.hornetq.api.core.HornetQAlreadyReplicatingException; import org.hornetq.api.core.HornetQBuffer; -import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.Interceptor; import org.hornetq.api.core.Pair; import org.hornetq.api.core.TransportConfiguration; @@ -37,8 +35,6 @@ import org.hornetq.core.protocol.core.CoreRemotingConnection; import org.hornetq.core.protocol.core.Packet; import org.hornetq.core.protocol.core.ServerSessionPacketHandler; import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID; -import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage; -import org.hornetq.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage; import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage; import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2; import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V3; @@ -50,8 +46,8 @@ import org.hornetq.core.remoting.impl.netty.HornetQFrameDecoder2; import org.hornetq.core.remoting.impl.netty.NettyServerConnection; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServerLogger; -import org.hornetq.core.server.cluster.ClusterConnection; import org.hornetq.spi.core.protocol.ConnectionEntry; +import org.hornetq.spi.core.protocol.MessageConverter; import org.hornetq.spi.core.protocol.ProtocolManager; import org.hornetq.spi.core.protocol.RemotingConnection; import org.hornetq.spi.core.remoting.Acceptor; @@ -81,6 +77,16 @@ class CoreProtocolManager implements ProtocolManager this.outgoingInterceptors = outgoingInterceptors; } + /** + * no need to implement this now + * @return + */ + @Override + public MessageConverter getConverter() + { + return null; + } + public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) { final Configuration config = server.getConfiguration(); @@ -113,7 +119,7 @@ class CoreProtocolManager implements ProtocolManager channel0.setHandler(new LocalChannelHandler(config, entry, channel0, acceptorUsed, rc)); - server.getClusterManager().addClusterChannelHandler(rc.getChannel(CHANNEL_ID.CLUSTER.id, -1), acceptorUsed, rc); + server.getClusterManager().addClusterChannelHandler(rc.getChannel(CHANNEL_ID.CLUSTER.id, -1), acceptorUsed, rc, server.getActivation()); return entry; } @@ -332,33 +338,6 @@ class CoreProtocolManager implements ProtocolManager } } }); - - } - } - else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION) - { - BackupRegistrationMessage msg = (BackupRegistrationMessage)packet; - ClusterConnection clusterConnection = acceptorUsed.getClusterConnection(); - - if (!config.isSecurityEnabled() || clusterConnection.verify(msg.getClusterUser(), msg.getClusterPassword())) - { - try - { - server.startReplication(rc, clusterConnection, getPair(msg.getConnector(), true), - msg.isFailBackRequest()); - } - catch (HornetQAlreadyReplicatingException are) - { - channel0.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.ALREADY_REPLICATING)); - } - catch (HornetQException e) - { - channel0.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.EXCEPTION)); - } - } - else - { - channel0.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.AUTHENTICATION)); } } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java index 12dd132..b4d993b 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java +++ b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java @@ -22,6 +22,7 @@ import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuation import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage; import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage; import org.hornetq.core.server.HornetQServerLogger; +import org.hornetq.core.server.ServerConsumer; import org.hornetq.core.server.ServerMessage; import org.hornetq.spi.core.protocol.ProtocolManager; import org.hornetq.spi.core.protocol.SessionCallback; @@ -47,9 +48,9 @@ public final class CoreSessionCallback implements SessionCallback this.channel = channel; } - public int sendLargeMessage(ServerMessage message, long consumerID, long bodySize, int deliveryCount) + public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { - Packet packet = new SessionReceiveLargeMessage(consumerID, message, bodySize, deliveryCount); + Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount); channel.send(packet); @@ -58,18 +59,18 @@ public final class CoreSessionCallback implements SessionCallback return size; } - public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse) + public int sendLargeMessageContinuation(ServerConsumer consumer, byte[] body, boolean continues, boolean requiresResponse) { - Packet packet = new SessionReceiveContinuationMessage(consumerID, body, continues, requiresResponse); + Packet packet = new SessionReceiveContinuationMessage(consumer.getID(), body, continues, requiresResponse); channel.send(packet); return packet.getPacketSize(); } - public int sendMessage(ServerMessage message, long consumerID, int deliveryCount) + public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) { - Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount); + Packet packet = new SessionReceiveMessage(consumer.getID(), message, deliveryCount); int size = 0; @@ -112,15 +113,24 @@ public final class CoreSessionCallback implements SessionCallback } @Override - public void disconnect(long consumerId, String queueName) + public void disconnect(ServerConsumer consumerId, String queueName) { if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) { - channel.send(new DisconnectConsumerMessage(consumerId)); + channel.send(new DisconnectConsumerMessage(consumerId.getID())); } else { HornetQServerLogger.LOGGER.warnDisconnectOldClient(queueName); } } + + + @Override + public boolean hasCredits(ServerConsumer consumer) + { + // This one will always return has credits + // as the flow control is done by hornetq + return true; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java index bb4e111..2d8f227 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java +++ b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java @@ -16,7 +16,6 @@ import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.HornetQExceptionType; import org.hornetq.api.core.HornetQInternalErrorException; import org.hornetq.api.core.SimpleString; -import org.hornetq.core.config.BackupStrategy; import org.hornetq.core.protocol.core.Channel; import org.hornetq.core.protocol.core.ChannelHandler; import org.hornetq.core.protocol.core.CoreRemotingConnection; @@ -119,8 +118,7 @@ public class HornetQPacketHandler implements ChannelHandler { String nodeID = failoverMessage.getNodeID(); boolean okToFailover = nodeID == null || - !(server.getConfiguration().getHAPolicy().getBackupStrategy() == BackupStrategy.SCALE_DOWN && - !server.hasScaledDown(new SimpleString(nodeID))); + !(server.getHAPolicy().canScaleDown() && !server.hasScaledDown(new SimpleString(nodeID))); channel1.send(new CheckFailoverReplyMessage(okToFailover)); } @@ -133,7 +131,6 @@ public class HornetQPacketHandler implements ChannelHandler Version version = server.getVersion(); if (!version.isCompatible(request.getVersion())) { - HornetQServerLogger.LOGGER.incompatibleVersion(request.getVersion(), connection.getRemoteAddress(), version.getFullVersion()); throw HornetQMessageBundle.BUNDLE.incompatibleClientServer(); } @@ -180,7 +177,7 @@ public class HornetQPacketHandler implements ChannelHandler request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, - channel)); + channel), null); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), @@ -194,13 +191,17 @@ public class HornetQPacketHandler implements ChannelHandler } catch (HornetQException e) { - HornetQServerLogger.LOGGER.failedToCreateSession(e); - response = new HornetQExceptionMessage(e); - if (e.getType() == HornetQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS) { incompatibleVersion = true; + HornetQServerLogger.LOGGER.debug("Sending HornetQException after Incompatible client", e); + } + else + { + HornetQServerLogger.LOGGER.failedToCreateSession(e); } + + response = new HornetQExceptionMessage(e); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRequestMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRequestMessage.java b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRequestMessage.java index d908e54..566bb74 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRequestMessage.java +++ b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRequestMessage.java @@ -16,13 +16,11 @@ package org.hornetq.core.protocol.core.impl.wireformat; import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.SimpleString; import org.hornetq.core.protocol.core.impl.PacketImpl; -import org.hornetq.core.server.cluster.ha.HAPolicy; public class BackupRequestMessage extends PacketImpl { private int backupSize; private SimpleString nodeID; - private HAPolicy.POLICY_TYPE backupType; private String journalDirectory; private String bindingsDirectory; private String largeMessagesDirectory; @@ -38,7 +36,6 @@ public class BackupRequestMessage extends PacketImpl { super(BACKUP_REQUEST); this.backupSize = backupSize; - backupType = HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE; this.journalDirectory = journalDirectory; this.bindingsDirectory = bindingsDirectory; this.largeMessagesDirectory = largeMessagesDirectory; @@ -50,7 +47,6 @@ public class BackupRequestMessage extends PacketImpl super(BACKUP_REQUEST); this.backupSize = backupSize; this.nodeID = nodeID; - backupType = HAPolicy.POLICY_TYPE.COLOCATED_REPLICATED; } @Override @@ -58,18 +54,11 @@ public class BackupRequestMessage extends PacketImpl { super.encodeRest(buffer); buffer.writeInt(backupSize); - buffer.writeByte(backupType.getType()); - if (backupType == HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE) - { - buffer.writeString(journalDirectory); - buffer.writeString(bindingsDirectory); - buffer.writeString(largeMessagesDirectory); - buffer.writeString(pagingDirectory); - } - else - { - buffer.writeSimpleString(nodeID); - } + buffer.writeNullableString(journalDirectory); + buffer.writeNullableString(bindingsDirectory); + buffer.writeNullableString(largeMessagesDirectory); + buffer.writeNullableString(pagingDirectory); + buffer.writeNullableSimpleString(nodeID); } @Override @@ -77,18 +66,11 @@ public class BackupRequestMessage extends PacketImpl { super.decodeRest(buffer); backupSize = buffer.readInt(); - backupType = HAPolicy.POLICY_TYPE.toBackupType(buffer.readByte()); - if (backupType == HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE) - { - journalDirectory = buffer.readString(); - bindingsDirectory = buffer.readString(); - largeMessagesDirectory = buffer.readString(); - pagingDirectory = buffer.readString(); - } - else - { - nodeID = buffer.readSimpleString(); - } + journalDirectory = buffer.readNullableString(); + bindingsDirectory = buffer.readNullableString(); + largeMessagesDirectory = buffer.readNullableString(); + pagingDirectory = buffer.readNullableString(); + nodeID = buffer.readNullableSimpleString(); } public int getBackupSize() @@ -101,11 +83,6 @@ public class BackupRequestMessage extends PacketImpl return nodeID; } - public HAPolicy.POLICY_TYPE getBackupType() - { - return backupType; - } - public String getJournalDirectory() { return journalDirectory; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java index 1fc484f..114de68 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java +++ b/hornetq-server/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java @@ -23,7 +23,7 @@ import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalCo import org.hornetq.core.protocol.core.impl.PacketImpl; /** - * Message is used to sync {@link SequentialFile}s to a backup server. The {@link FileType} controls + * Message is used to sync {@link org.hornetq.core.journal.SequentialFile}s to a backup server. The {@link FileType} controls * which extra information is sent. */ public final class ReplicationSyncFileMessage extends PacketImpl http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java index e794ef6..ae0838e 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java +++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java @@ -20,7 +20,7 @@ import java.util.concurrent.Executor; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.client.HornetQClient; -import org.hornetq.api.core.management.NotificationType; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.core.security.HornetQPrincipal; import org.hornetq.core.server.HornetQComponent; import org.hornetq.core.server.HornetQMessageBundle; @@ -109,7 +109,7 @@ public final class InVMAcceptor implements Acceptor props.putSimpleStringProperty(new SimpleString("factory"), new SimpleString(InVMAcceptorFactory.class.getName())); props.putIntProperty(new SimpleString("id"), id); - Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props); + Notification notification = new Notification(null, CoreNotificationType.ACCEPTOR_STARTED, props); notificationService.sendNotification(notification); } @@ -143,7 +143,7 @@ public final class InVMAcceptor implements Acceptor props.putSimpleStringProperty(new SimpleString("factory"), new SimpleString(InVMAcceptorFactory.class.getName())); props.putIntProperty(new SimpleString("id"), id); - Notification notification = new Notification(null, NotificationType.ACCEPTOR_STOPPED, props); + Notification notification = new Notification(null, CoreNotificationType.ACCEPTOR_STOPPED, props); try { notificationService.sendNotification(notification); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java index 26bdf13..0613724 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java +++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnection.java @@ -19,12 +19,14 @@ import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import io.netty.channel.ChannelFutureListener; import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.HornetQBuffers; import org.hornetq.api.core.HornetQInterruptedException; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.core.security.HornetQPrincipal; import org.hornetq.core.server.HornetQServerLogger; +import org.hornetq.spi.core.protocol.RemotingConnection; import org.hornetq.spi.core.remoting.BufferHandler; import org.hornetq.spi.core.remoting.Connection; import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener; @@ -59,6 +61,8 @@ public class InVMConnection implements Connection private final HornetQPrincipal defaultHornetQPrincipal; + private RemotingConnection protocolConnection; + public InVMConnection(final int serverID, final BufferHandler handler, final ConnectionLifeCycleListener listener, @@ -96,6 +100,24 @@ public class InVMConnection implements Connection this.defaultHornetQPrincipal = defaultHornetQPrincipal; } + + public void forceClose() + { + // no op + } + + public RemotingConnection getProtocolConnection() + { + return this.protocolConnection; + } + + public void setProtocolConnection(RemotingConnection connection) + { + this.protocolConnection = connection; + } + + + public void close() { if (closing) @@ -132,11 +154,16 @@ public class InVMConnection implements Connection public void write(final HornetQBuffer buffer) { - write(buffer, false, false); + write(buffer, false, false, null); } public void write(final HornetQBuffer buffer, final boolean flush, final boolean batch) { + write(buffer, flush, batch, null); + } + + public void write(final HornetQBuffer buffer, final boolean flush, final boolean batch, final ChannelFutureListener futureListener) + { final HornetQBuffer copied = buffer.copy(0, buffer.capacity()); copied.setIndex(buffer.readerIndex(), buffer.writerIndex()); @@ -157,6 +184,11 @@ public class InVMConnection implements Connection HornetQServerLogger.LOGGER.trace(InVMConnection.this + "::Sending inVM packet"); } handler.bufferReceived(id, copied); + if (futureListener != null) + { + // TODO BEFORE MERGE: (is null a good option here?) + futureListener.operationComplete(null); + } } } catch (Exception e) @@ -224,6 +256,12 @@ public class InVMConnection implements Connection { } + @Override + public boolean isUsingProtocolHandling() + { + return false; + } + public HornetQPrincipal getDefaultHornetQPrincipal() { return defaultHornetQPrincipal; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnector.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnector.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnector.java index 1d24adf..b47cc66 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnector.java +++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnector.java @@ -12,19 +12,21 @@ */ package org.hornetq.core.remoting.impl.invm; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import org.hornetq.api.core.HornetQException; -import org.hornetq.api.core.client.HornetQClient; import org.hornetq.core.server.HornetQComponent; import org.hornetq.core.server.HornetQServerLogger; import org.hornetq.core.server.HornetQMessageBundle; import org.hornetq.spi.core.remoting.AbstractConnector; import org.hornetq.spi.core.remoting.Acceptor; import org.hornetq.spi.core.remoting.BufferHandler; +import org.hornetq.spi.core.remoting.ClientProtocolManager; import org.hornetq.spi.core.remoting.Connection; import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener; import org.hornetq.utils.ConfigurationHelper; @@ -34,10 +36,20 @@ import org.hornetq.utils.OrderedExecutorFactory; * A InVMConnector * * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> * */ public class InVMConnector extends AbstractConnector { + public static final Map<String, Object> DEFAULT_CONFIG; + + static + { + Map<String, Object> config = new HashMap<String , Object>(); + config.put(TransportConstants.SERVER_ID_PROP_NAME, TransportConstants.DEFAULT_SERVER_ID); + DEFAULT_CONFIG = Collections.unmodifiableMap(config); + } + // Used for testing failure only public static volatile boolean failOnCreateConnection; @@ -63,6 +75,8 @@ public class InVMConnector extends AbstractConnector protected final int id; + private final ClientProtocolManager protocolManager; + private final BufferHandler handler; private final ConnectionLifeCycleListener listener; @@ -81,7 +95,8 @@ public class InVMConnector extends AbstractConnector final BufferHandler handler, final ConnectionLifeCycleListener listener, final Executor closeExecutor, - final Executor threadPool) + final Executor threadPool, + ClientProtocolManager protocolManager) { super(configuration); this.listener = listener; @@ -97,6 +112,8 @@ public class InVMConnector extends AbstractConnector InVMRegistry registry = InVMRegistry.instance; acceptor = registry.getAcceptor(id); + + this.protocolManager = protocolManager; } public Acceptor getAcceptor() @@ -180,7 +197,7 @@ public class InVMConnector extends AbstractConnector { // No acceptor on a client connection InVMConnection inVMConnection = new InVMConnection(id, handler, listener, serverExecutor); - listener.connectionCreated(null, inVMConnection, HornetQClient.DEFAULT_CORE_PROTOCOL); + listener.connectionCreated(null, inVMConnection, protocolManager.getName()); return inVMConnection; } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java index 9788d85..283ea98 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java +++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java @@ -19,6 +19,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import org.hornetq.spi.core.remoting.BufferHandler; +import org.hornetq.spi.core.remoting.ClientProtocolManager; import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener; import org.hornetq.spi.core.remoting.Connector; import org.hornetq.spi.core.remoting.ConnectorFactory; @@ -27,7 +28,7 @@ import org.hornetq.spi.core.remoting.ConnectorFactory; * A InVMConnectorFactory * * @author <a href="mailto:[email protected]">Tim Fox</a> - * + * @author <a href="mailto:[email protected]">Martyn Taylor</a> */ public class InVMConnectorFactory implements ConnectorFactory { @@ -36,9 +37,10 @@ public class InVMConnectorFactory implements ConnectorFactory final ConnectionLifeCycleListener listener, final Executor closeExecutor, final Executor threadPool, - final ScheduledExecutorService scheduledThreadPool) + final ScheduledExecutorService scheduledThreadPool, + final ClientProtocolManager protocolManager) { - InVMConnector connector = new InVMConnector(configuration, handler, listener, closeExecutor, threadPool); + InVMConnector connector = new InVMConnector(configuration, handler, listener, closeExecutor, threadPool, protocolManager); return connector; } @@ -53,4 +55,10 @@ public class InVMConnectorFactory implements ConnectorFactory { return true; } + + @Override + public Map<String, Object> getDefaults() + { + return InVMConnector.DEFAULT_CONFIG; + } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/TransportConstants.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/TransportConstants.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/TransportConstants.java index 46332d3..ea67573 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/TransportConstants.java +++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/invm/TransportConstants.java @@ -22,12 +22,15 @@ import org.hornetq.api.config.HornetQDefaultConfiguration; * A TransportConstants * * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> * */ public final class TransportConstants { public static final String SERVER_ID_PROP_NAME = "server-id"; + public static final int DEFAULT_SERVER_ID = 0; + public static final Set<String> ALLOWABLE_CONNECTOR_KEYS; public static final Set<String> ALLOWABLE_ACCEPTOR_KEYS; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java index 0a8288b..adcf29f 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java +++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java @@ -52,7 +52,7 @@ import org.hornetq.api.config.HornetQDefaultConfiguration; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.api.core.management.NotificationType; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.core.client.impl.ClientSessionFactoryImpl; import org.hornetq.core.protocol.ProtocolHandler; import org.hornetq.core.remoting.impl.ssl.SSLSupport; @@ -443,7 +443,7 @@ public class NettyAcceptor implements Acceptor new SimpleString(NettyAcceptorFactory.class.getName())); props.putSimpleStringProperty(new SimpleString("host"), new SimpleString(host)); props.putIntProperty(new SimpleString("port"), port); - Notification notification = new Notification(null, NotificationType.ACCEPTOR_STARTED, props); + Notification notification = new Notification(null, CoreNotificationType.ACCEPTOR_STARTED, props); notificationService.sendNotification(notification); } @@ -457,7 +457,6 @@ public class NettyAcceptor implements Acceptor TimeUnit.MILLISECONDS); } - // TODO: Think about add Version back to netty HornetQServerLogger.LOGGER.startedNettyAcceptor(TransportConstants.NETTY_VERSION, host, port); } } @@ -560,7 +559,7 @@ public class NettyAcceptor implements Acceptor new SimpleString(NettyAcceptorFactory.class.getName())); props.putSimpleStringProperty(new SimpleString("host"), new SimpleString(host)); props.putIntProperty(new SimpleString("port"), port); - Notification notification = new Notification(null, NotificationType.ACCEPTOR_STOPPED, props); + Notification notification = new Notification(null, CoreNotificationType.ACCEPTOR_STOPPED, props); try { notificationService.sendNotification(notification); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java index 60d1570..f1b2ce9 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -48,6 +49,7 @@ import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServerLogger; import org.hornetq.core.server.cluster.ClusterConnection; import org.hornetq.core.server.cluster.ClusterManager; +import org.hornetq.core.server.impl.ServiceRegistry; import org.hornetq.core.server.impl.ServerSessionImpl; import org.hornetq.core.server.management.ManagementService; import org.hornetq.spi.core.protocol.ConnectionEntry; @@ -67,6 +69,7 @@ import org.hornetq.utils.HornetQThreadFactory; * @author <a href="mailto:[email protected]">Jeff Mesnil</a> * @author <a href="mailto:[email protected]">Andy Taylor</a> * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> */ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener { @@ -96,6 +99,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle private ExecutorService threadPool; + private final Executor flushExecutor; + private final ScheduledExecutorService scheduledThreadPool; private FailureCheckAndFlushThread failureCheckAndFlushThread; @@ -106,6 +111,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle private HornetQPrincipal defaultInvmSecurityPrincipal; + private ServiceRegistry serviceRegistry; + // Static -------------------------------------------------------- // Constructors -------------------------------------------------- @@ -114,43 +121,29 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle final Configuration config, final HornetQServer server, final ManagementService managementService, - final ScheduledExecutorService scheduledThreadPool, List<ProtocolManagerFactory> protocolManagerFactories) + final ScheduledExecutorService scheduledThreadPool, + List<ProtocolManagerFactory> protocolManagerFactories, + final Executor flushExecutor, + final ServiceRegistry serviceRegistry) { + this.serviceRegistry = serviceRegistry; + acceptorsConfig = config.getAcceptorConfigurations(); this.server = server; this.clusterManager = clusterManager; - for (String interceptorClass : config.getIncomingInterceptorClassNames()) - { - try - { - incomingInterceptors.add((Interceptor) safeInitNewInstance(interceptorClass)); - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.errorCreatingRemotingInterceptor(e, interceptorClass); - } - } + setInterceptors(config); - for (String interceptorClass : config.getOutgoingInterceptorClassNames()) - { - try - { - outgoingInterceptors.add((Interceptor) safeInitNewInstance(interceptorClass)); - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.errorCreatingRemotingInterceptor(e, interceptorClass); - } - } this.managementService = managementService; this.scheduledThreadPool = scheduledThreadPool; CoreProtocolManagerFactory coreProtocolManagerFactory = new CoreProtocolManagerFactory(); //i know there is only 1 + this.flushExecutor = flushExecutor; + HornetQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0]); this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.createProtocolManager(server, incomingInterceptors, outgoingInterceptors)); @@ -188,6 +181,23 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle // RemotingService implementation ------------------------------- + private void setInterceptors(Configuration configuration) + { + addReflectivelyInstantiatedInterceptors(configuration.getIncomingInterceptorClassNames(), incomingInterceptors); + addReflectivelyInstantiatedInterceptors(configuration.getOutgoingInterceptorClassNames(), outgoingInterceptors); + incomingInterceptors.addAll(serviceRegistry.getIncomingInterceptors()); + outgoingInterceptors.addAll(serviceRegistry.getOutgoingInterceptors()); + } + + private void addReflectivelyInstantiatedInterceptors(List<String> classNames, List<Interceptor> interceptors) + { + for (String className : classNames) + { + Interceptor interceptor = ((Interceptor) safeInitNewInstance(className)); + interceptors.add(interceptor); + } + } + public synchronized void start() throws Exception { if (started) @@ -463,6 +473,23 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle return started; } + + private RemotingConnection getConnection(final Object remotingConnectionID) + { + ConnectionEntry entry = connections.get(remotingConnectionID); + + if (entry != null) + { + return entry.connection; + } + else + { + HornetQServerLogger.LOGGER.errorRemovingConnection(); + + return null; + } + } + public RemotingConnection removeConnection(final Object remotingConnectionID) { ConnectionEntry entry = connections.remove(remotingConnectionID); @@ -689,7 +716,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle for (ConnectionEntry entry : connections.values()) { - RemotingConnection conn = entry.connection; + final RemotingConnection conn = entry.connection; boolean flush = true; @@ -712,16 +739,34 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle if (flush) { - conn.flush(); + flushExecutor.execute(new Runnable() + { + public void run() + { + try + { + // this is using a different thread + // as if anything wrong happens on flush + // failure detection could be affected + conn.flush(); + } + catch (Throwable e) + { + HornetQServerLogger.LOGGER.warn(e.getMessage(), e); + } + + } + }); } } for (Object id : idsToRemove) { - RemotingConnection conn = removeConnection(id); + RemotingConnection conn = getConnection(id); if (conn != null) { conn.fail(HornetQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress())); + removeConnection(id); } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedJournal.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedJournal.java b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedJournal.java index e6c539e..94dcc29 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedJournal.java +++ b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedJournal.java @@ -30,12 +30,12 @@ import org.hornetq.core.persistence.OperationContext; import org.hornetq.core.replication.ReplicationManager.ADD_OPERATION_TYPE; /** - * Used by the {@link JournalStorageManager} to replicate journal calls. + * Used by the {@link org.hornetq.core.persistence.impl.journal.JournalStorageManager} to replicate journal calls. * <p> * This class wraps a {@link ReplicationManager} and the local {@link Journal}. Every call will be * relayed to both instances. * @author <mailto:[email protected]">Clebert Suconic</a> - * @see JournalStorageManager + * @see org.hornetq.core.persistence.impl.journal.JournalStorageManager */ public class ReplicatedJournal implements Journal { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java index 21721e8..52404a8 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java +++ b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java @@ -15,32 +15,34 @@ */ package org.hornetq.core.replication; +import org.hornetq.api.core.Message; + /** - * {@link LargeServerMessage} methods used by the {@link ReplicationEndpoint}. + * {@link org.hornetq.core.server.LargeServerMessage} methods used by the {@link ReplicationEndpoint}. * <p/> - * In practice a subset of the methods necessary to have a {@link LargeServerMessage} + * In practice a subset of the methods necessary to have a {@link org.hornetq.core.server.LargeServerMessage} * - * @see LargeServerMessageInSync + * @see org.hornetq.core.persistence.impl.journal.LargeServerMessageInSync */ public interface ReplicatedLargeMessage { /** - * @see LargeServerMessage#setDurable(boolean) + * @see org.hornetq.core.server.LargeServerMessage#setDurable(boolean) */ - void setDurable(boolean b); + Message setDurable(boolean b); /** - * @see LargeServerMessage#setMessageID(long) + * @see org.hornetq.core.server.LargeServerMessage#setMessageID(long) */ - void setMessageID(long id); + Message setMessageID(long id); /** - * @see LargeServerMessage#releaseResources() + * @see org.hornetq.core.server.LargeServerMessage#releaseResources() */ void releaseResources(); /** - * @see LargeServerMessage#deleteFile() + * @see org.hornetq.core.server.LargeServerMessage#deleteFile() */ void deleteFile() throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java index c667fa3..dd68e36 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java +++ b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java @@ -74,6 +74,7 @@ import org.hornetq.core.server.HornetQServerLogger; import org.hornetq.core.server.ServerMessage; import org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum; import org.hornetq.core.server.impl.HornetQServerImpl; +import org.hornetq.core.server.impl.SharedNothingBackupActivation; /** * Handles all the synchronization necessary for replication on the backup side (that is the @@ -88,6 +89,7 @@ public final class ReplicationEndpoint implements ChannelHandler, HornetQCompone private final IOCriticalErrorListener criticalErrorListener; private final HornetQServerImpl server; private final boolean wantedFailBack; + private final SharedNothingBackupActivation activation; private final boolean noSync = false; private Channel channel; @@ -125,11 +127,12 @@ public final class ReplicationEndpoint implements ChannelHandler, HornetQCompone // Constructors -------------------------------------------------- public ReplicationEndpoint(final HornetQServerImpl server, IOCriticalErrorListener criticalErrorListener, - boolean wantedFailBack) + boolean wantedFailBack, SharedNothingBackupActivation activation) { this.server = server; this.criticalErrorListener = criticalErrorListener; this.wantedFailBack = wantedFailBack; + this.activation = activation; } // Public -------------------------------------------------------- @@ -260,7 +263,7 @@ public final class ReplicationEndpoint implements ChannelHandler, HornetQCompone */ private void handleLiveStopping(ReplicationLiveIsStoppingMessage packet) throws HornetQException { - server.remoteFailOver(packet.isFinalMessage()); + activation.remoteFailOver(packet.isFinalMessage()); } public boolean isStarted() @@ -394,7 +397,7 @@ public final class ReplicationEndpoint implements ChannelHandler, HornetQCompone public void compareJournalInformation(final JournalLoadInformation[] journalInformation) throws HornetQException { - if (!server.isRemoteBackupUpToDate()) + if (!activation.isRemoteBackupUpToDate()) { throw HornetQMessageBundle.BUNDLE.journalsNotInSync(); } @@ -481,7 +484,7 @@ public final class ReplicationEndpoint implements ChannelHandler, HornetQCompone journalsHolder = null; backupQuorum.liveIDSet(liveID); - server.setRemoteBackupUpToDate(); + activation.setRemoteBackupUpToDate(); HornetQServerLogger.LOGGER.backupServerSynched(server); return; } @@ -555,7 +558,7 @@ public final class ReplicationEndpoint implements ChannelHandler, HornetQCompone */ private void handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception { - if (server.isRemoteBackupUpToDate()) + if (activation.isRemoteBackupUpToDate()) { throw HornetQMessageBundle.BUNDLE.replicationBackupUpToDate(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java index 42f3872..cd43ffa 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java +++ b/hornetq-server/src/main/java/org/hornetq/core/replication/ReplicationManager.java @@ -16,11 +16,8 @@ import java.io.FileInputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedHashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -112,8 +109,6 @@ public final class ReplicationManager implements HornetQComponent private volatile boolean enabled; private final Object replicationLock = new Object(); - private final Object largeMessageSyncGuard = new Object(); - private final HashMap<Long, Pair<String, Long>> largeMessagesToSync = new HashMap<Long, Pair<String, Long>>(); private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<OperationContext>(); @@ -506,22 +501,6 @@ public final class ReplicationManager implements HornetQComponent } } - /** - * @return - */ - public Map.Entry<Long, Pair<String, Long>> getNextLargeMessageToSync() - { - Iterator<Entry<Long, Pair<String, Long>>> iter = largeMessagesToSync.entrySet().iterator(); - if (!iter.hasNext()) - { - return null; - } - - Entry<Long, Pair<String, Long>> entry = iter.next(); - iter.remove(); - return entry; - } - public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception { if (enabled) @@ -563,7 +542,10 @@ public final class ReplicationManager implements HornetQComponent final FileChannel channel = fis.getChannel(); try { - final ByteBuffer buffer = ByteBuffer.allocate(1 << 17); + // We can afford having a single buffer here for this entire loop + // because sendReplicatePacket will encode the packet as a NettyBuffer + // through HornetQBuffer class leaving this buffer free to be reused on the next copy + final ByteBuffer buffer = ByteBuffer.allocate(1 << 17); // 1 << 17 == 131072 == 128 * 1024 while (true) { buffer.clear(); @@ -649,8 +631,7 @@ public final class ReplicationManager implements HornetQComponent public void sendLargeMessageIdListMessage(Map<Long, Pair<String, Long>> largeMessages) { ArrayList<Long> idsToSend; - largeMessagesToSync.putAll(largeMessages); - idsToSend = new ArrayList<Long>(largeMessagesToSync.keySet()); + idsToSend = new ArrayList<Long>(largeMessages.keySet()); if (enabled) sendReplicatePacket(new ReplicationStartSyncMessage(idsToSend)); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/security/SecurityStore.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/security/SecurityStore.java b/hornetq-server/src/main/java/org/hornetq/core/security/SecurityStore.java index 01a8f17..c7325ca 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/security/SecurityStore.java +++ b/hornetq-server/src/main/java/org/hornetq/core/security/SecurityStore.java @@ -26,5 +26,7 @@ public interface SecurityStore void check(SimpleString address, CheckType checkType, ServerSession session) throws Exception; + boolean isSecurityEnabled(); + void stop(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/security/impl/SecurityStoreImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/security/impl/SecurityStoreImpl.java b/hornetq-server/src/main/java/org/hornetq/core/security/impl/SecurityStoreImpl.java index a0a4807..d260ba3 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/security/impl/SecurityStoreImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/security/impl/SecurityStoreImpl.java @@ -12,15 +12,13 @@ */ package org.hornetq.core.security.impl; -import static org.hornetq.api.core.management.NotificationType.SECURITY_AUTHENTICATION_VIOLATION; - import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.ManagementHelper; -import org.hornetq.api.core.management.NotificationType; import org.hornetq.core.security.CheckType; import org.hornetq.core.security.Role; import org.hornetq.core.security.SecurityStore; @@ -98,6 +96,12 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC // SecurityManager implementation -------------------------------- + @Override + public boolean isSecurityEnabled() + { + return securityEnabled; + } + public void stop() { securityRepository.unRegisterListener(this); @@ -137,7 +141,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(user)); - Notification notification = new Notification(null, SECURITY_AUTHENTICATION_VIOLATION, props); + Notification notification = new Notification(null, CoreNotificationType.SECURITY_AUTHENTICATION_VIOLATION, props); notificationService.sendNotification(notification); } @@ -183,7 +187,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC props.putSimpleStringProperty(ManagementHelper.HDR_CHECK_TYPE, new SimpleString(checkType.toString())); props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString(user)); - Notification notification = new Notification(null, NotificationType.SECURITY_PERMISSION_VIOLATION, props); + Notification notification = new Notification(null, CoreNotificationType.SECURITY_PERMISSION_VIOLATION, props); notificationService.sendNotification(notification); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/ActivateCallback.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/ActivateCallback.java b/hornetq-server/src/main/java/org/hornetq/core/server/ActivateCallback.java index 1f10891..361bb3a 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/ActivateCallback.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/ActivateCallback.java @@ -21,9 +21,24 @@ package org.hornetq.core.server; */ public interface ActivateCallback { + /* + * this is called before any services are started when the server first initialised + */ void preActivate(); + /* + * this is called after most of the services have been started but before any cluster resources or JMS resources have been + */ void activated(); + /* + * this is called when the server is stopping, after any network resources and clients are closed but before the rest + * of the resources + */ void deActivate(); + + /* + * this is called when all resources have been started including any JMS resources + */ + void activationComplete(); }
