http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java b/hornetq-server/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java index ae5b41f..bed8d0a 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java +++ b/hornetq-server/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java @@ -32,13 +32,20 @@ import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.UDPBroadcastGroupConfiguration; import org.hornetq.api.core.client.HornetQClient; -import org.hornetq.core.config.BackupStrategy; import org.hornetq.core.config.BridgeConfiguration; import org.hornetq.core.config.ClusterConnectionConfiguration; import org.hornetq.core.config.Configuration; import org.hornetq.core.config.ConnectorServiceConfiguration; import org.hornetq.core.config.CoreQueueConfiguration; import org.hornetq.core.config.DivertConfiguration; +import org.hornetq.core.config.HAPolicyConfiguration; +import org.hornetq.core.config.ScaleDownConfiguration; +import org.hornetq.core.config.ha.ColocatedPolicyConfiguration; +import org.hornetq.core.config.ha.LiveOnlyPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicaPolicyConfiguration; +import org.hornetq.core.config.ha.ReplicatedPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.hornetq.core.config.impl.ConfigurationImpl; import org.hornetq.core.config.impl.FileConfiguration; import org.hornetq.core.config.impl.Validators; @@ -47,11 +54,10 @@ import org.hornetq.core.journal.impl.JournalConstants; import org.hornetq.core.security.Role; import org.hornetq.core.server.HornetQServerLogger; import org.hornetq.core.server.JournalType; -import org.hornetq.core.server.cluster.ha.HAPolicy; -import org.hornetq.core.server.cluster.ha.HAPolicyTemplate; import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration; import org.hornetq.core.settings.impl.AddressFullMessagePolicy; import org.hornetq.core.settings.impl.AddressSettings; +import org.hornetq.core.settings.impl.SlowConsumerPolicy; import org.hornetq.utils.DefaultSensitiveStringCodec; import org.hornetq.utils.PasswordMaskingUtil; import org.hornetq.utils.SensitiveDataCodec; @@ -136,6 +142,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil private static final String SEND_TO_DLA_ON_NO_ROUTE = "send-to-dla-on-no-route"; + private static final String SLOW_CONSUMER_THRESHOLD_NODE_NAME = "slow-consumer-threshold"; + + private static final String SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME = "slow-consumer-check-period"; + + private static final String SLOW_CONSUMER_POLICY_NODE_NAME = "slow-consumer-policy"; + // Attributes ---------------------------------------------------- private boolean validateAIO = false; @@ -195,6 +207,57 @@ public final class FileConfigurationParser extends XMLConfigurationUtil HornetQServerLogger.LOGGER.deprecatedConfigurationOption("clustered"); } + // these are combined because they are both required for setting the correct HAPolicyConfiguration + if (parameterExists(e, "backup") || parameterExists(e, "shared-store")) + { + boolean backup = getBoolean(e, "backup", false); + boolean sharedStore = getBoolean(e, "shared-store", true); + + if (containsHAPolicy) + { + if (parameterExists(e, "backup")) + { + HornetQServerLogger.LOGGER.incompatibleWithHAPolicy("backup"); + } + + if (parameterExists(e, "shared-store")) + { + HornetQServerLogger.LOGGER.incompatibleWithHAPolicy("shared-store"); + } + } + else + { + if (parameterExists(e, "backup")) + { + HornetQServerLogger.LOGGER.deprecatedConfigurationOption("backup"); + } + + if (parameterExists(e, "shared-store")) + { + HornetQServerLogger.LOGGER.deprecatedConfigurationOption("shared-store"); + } + + if (backup && sharedStore) + { + config.setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()); + } + else if (backup && !sharedStore) + { + config.setHAPolicyConfiguration(new ReplicaPolicyConfiguration()); + } + else if (!backup && sharedStore) + { + config.setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()); + } + else if (!backup && !sharedStore) + { + config.setHAPolicyConfiguration(new ReplicatedPolicyConfiguration()); + } + } + } + + HAPolicyConfiguration haPolicyConfig = config.getHAPolicyConfiguration(); + if (parameterExists(e, "check-for-live-server")) { if (containsHAPolicy) @@ -205,7 +268,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { HornetQServerLogger.LOGGER.deprecatedConfigurationOption("check-for-live-server"); - config.getHAPolicy().setCheckForLiveServer(getBoolean(e, "check-for-live-server", config.getHAPolicy().isCheckForLiveServer())); + if (haPolicyConfig instanceof ReplicatedPolicyConfiguration) + { + ReplicatedPolicyConfiguration hapc = (ReplicatedPolicyConfiguration) haPolicyConfig; + hapc.setCheckForLiveServer(getBoolean(e, "check-for-live-server", hapc.isCheckForLiveServer())); + } } } @@ -219,7 +286,20 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { HornetQServerLogger.LOGGER.deprecatedConfigurationOption("allow-failback"); - config.getHAPolicy().setAllowAutoFailBack(getBoolean(e, "allow-failback", config.getHAPolicy().isAllowAutoFailBack())); + if (haPolicyConfig instanceof ReplicaPolicyConfiguration) + { + ReplicaPolicyConfiguration hapc = (ReplicaPolicyConfiguration) haPolicyConfig; + hapc.setAllowFailBack(getBoolean(e, "allow-failback", hapc.isAllowFailBack())); + } + else if (haPolicyConfig instanceof SharedStoreSlavePolicyConfiguration) + { + SharedStoreSlavePolicyConfiguration hapc = (SharedStoreSlavePolicyConfiguration) haPolicyConfig; + hapc.setAllowFailBack(getBoolean(e, "allow-failback", hapc.isAllowFailBack())); + } + else + { + HornetQServerLogger.LOGGER.incompatibleWithHAPolicyChosen("check-for-live-server"); + } } } @@ -233,7 +313,20 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { HornetQServerLogger.LOGGER.deprecatedConfigurationOption("backup-group-name"); - config.getHAPolicy().setBackupGroupName(getString(e, "backup-group-name", config.getHAPolicy().getBackupGroupName(), Validators.NO_CHECK)); + if (haPolicyConfig instanceof ReplicaPolicyConfiguration) + { + ReplicaPolicyConfiguration hapc = (ReplicaPolicyConfiguration) haPolicyConfig; + hapc.setGroupName(getString(e, "backup-group-name", hapc.getGroupName(), Validators.NO_CHECK)); + } + else if (haPolicyConfig instanceof ReplicatedPolicyConfiguration) + { + ReplicatedPolicyConfiguration hapc = (ReplicatedPolicyConfiguration) haPolicyConfig; + hapc.setGroupName(getString(e, "backup-group-name", hapc.getGroupName(), Validators.NO_CHECK)); + } + else + { + HornetQServerLogger.LOGGER.incompatibleWithHAPolicyChosen("backup-group-name"); + } } } @@ -247,7 +340,25 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { HornetQServerLogger.LOGGER.deprecatedConfigurationOption("failback-delay"); - config.getHAPolicy().setFailbackDelay(getLong(e, "failback-delay", config.getHAPolicy().getFailbackDelay(), Validators.GT_ZERO)); + if (haPolicyConfig instanceof ReplicaPolicyConfiguration) + { + ReplicaPolicyConfiguration hapc = (ReplicaPolicyConfiguration) haPolicyConfig; + hapc.setFailbackDelay(getLong(e, "failback-delay", hapc.getFailbackDelay(), Validators.GT_ZERO)); + } + else if (haPolicyConfig instanceof SharedStoreMasterPolicyConfiguration) + { + SharedStoreMasterPolicyConfiguration hapc = (SharedStoreMasterPolicyConfiguration) haPolicyConfig; + hapc.setFailbackDelay(getLong(e, "failback-delay", hapc.getFailbackDelay(), Validators.GT_ZERO)); + } + else if (haPolicyConfig instanceof SharedStoreSlavePolicyConfiguration) + { + SharedStoreSlavePolicyConfiguration hapc = (SharedStoreSlavePolicyConfiguration) haPolicyConfig; + hapc.setFailbackDelay(getLong(e, "failback-delay", hapc.getFailbackDelay(), Validators.GT_ZERO)); + } + else + { + HornetQServerLogger.LOGGER.incompatibleWithHAPolicyChosen("failback-delay"); + } } } @@ -261,7 +372,20 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { HornetQServerLogger.LOGGER.deprecatedConfigurationOption("failover-on-shutdown"); - config.getHAPolicy().setFailoverOnServerShutdown(getBoolean(e, "failover-on-shutdown", config.getHAPolicy().isFailoverOnServerShutdown())); + if (haPolicyConfig instanceof SharedStoreMasterPolicyConfiguration) + { + SharedStoreMasterPolicyConfiguration hapc = (SharedStoreMasterPolicyConfiguration) haPolicyConfig; + hapc.setFailoverOnServerShutdown(getBoolean(e, "failover-on-shutdown", hapc.isFailoverOnServerShutdown())); + } + else if (haPolicyConfig instanceof SharedStoreSlavePolicyConfiguration) + { + SharedStoreSlavePolicyConfiguration hapc = (SharedStoreSlavePolicyConfiguration) haPolicyConfig; + hapc.setFailoverOnServerShutdown(getBoolean(e, "failover-on-shutdown", hapc.isFailoverOnServerShutdown())); + } + else + { + HornetQServerLogger.LOGGER.incompatibleWithHAPolicyChosen("failover-on-shutdown"); + } } } @@ -275,21 +399,20 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { HornetQServerLogger.LOGGER.deprecatedConfigurationOption("replication-clustername"); - config.getHAPolicy().setReplicationClustername(getString(e, "replication-clustername", null, Validators.NO_CHECK)); - } - } - - if (parameterExists(e, "scale-down-clustername")) - { - if (containsHAPolicy) - { - HornetQServerLogger.LOGGER.incompatibleWithHAPolicy("scale-down-clustername"); - } - else - { - HornetQServerLogger.LOGGER.deprecatedConfigurationOption("scale-down-clustername"); - - config.getHAPolicy().setScaleDownClustername(getString(e, "scale-down-clustername", null, Validators.NO_CHECK)); + if (haPolicyConfig instanceof ReplicaPolicyConfiguration) + { + ReplicaPolicyConfiguration hapc = (ReplicaPolicyConfiguration) haPolicyConfig; + hapc.setClusterName(getString(e, "replication-clustername", null, Validators.NO_CHECK)); + } + else if (haPolicyConfig instanceof ReplicatedPolicyConfiguration) + { + ReplicatedPolicyConfiguration hapc = (ReplicatedPolicyConfiguration) haPolicyConfig; + hapc.setClusterName(getString(e, "replication-clustername", null, Validators.NO_CHECK)); + } + else + { + HornetQServerLogger.LOGGER.incompatibleWithHAPolicyChosen("replication-clustername"); + } } } @@ -303,60 +426,27 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { HornetQServerLogger.LOGGER.deprecatedConfigurationOption("max-saved-replicated-journals-size"); - config.getHAPolicy().setMaxSavedReplicatedJournalSize(getInteger(e, "max-saved-replicated-journals-size", - config.getHAPolicy().getMaxSavedReplicatedJournalsSize(), Validators.MINUS_ONE_OR_GE_ZERO)); - } - } - - // these are combined because they are both required for setting the HAPolicy - if (parameterExists(e, "backup") || parameterExists(e, "shared-store")) - { - boolean backup = getBoolean(e, "backup", config.getHAPolicy().isBackup()); - boolean sharedStore = getBoolean(e, "shared-store", config.getHAPolicy().isSharedStore()); - - if (containsHAPolicy) - { - if (parameterExists(e, "backup")) - { - HornetQServerLogger.LOGGER.incompatibleWithHAPolicy("backup"); - } - - if (parameterExists(e, "shared-store")) - { - HornetQServerLogger.LOGGER.incompatibleWithHAPolicy("shared-store"); - } - } - else - { - if (parameterExists(e, "backup")) - { - HornetQServerLogger.LOGGER.deprecatedConfigurationOption("backup"); - } - - if (parameterExists(e, "shared-store")) + if (haPolicyConfig instanceof ReplicaPolicyConfiguration) { - HornetQServerLogger.LOGGER.deprecatedConfigurationOption("shared-store"); - } + ReplicaPolicyConfiguration hapc = (ReplicaPolicyConfiguration) haPolicyConfig; + hapc.setMaxSavedReplicatedJournalsSize(getInteger(e, "max-saved-replicated-journals-size", + hapc.getMaxSavedReplicatedJournalsSize(), Validators.MINUS_ONE_OR_GE_ZERO)); - if (backup && sharedStore) - { - config.setHAPolicy(HAPolicyTemplate.BACKUP_SHARED_STORE.getHaPolicy()); - } - else if (backup && !sharedStore) - { - config.setHAPolicy(HAPolicyTemplate.BACKUP_REPLICATED.getHaPolicy()); - } - else if (!backup && sharedStore) - { - config.setHAPolicy(HAPolicyTemplate.SHARED_STORE.getHaPolicy()); } - else if (!backup && !sharedStore) + else { - config.setHAPolicy(HAPolicyTemplate.REPLICATED.getHaPolicy()); + HornetQServerLogger.LOGGER.incompatibleWithHAPolicyChosen("max-saved-replicated-journals-size"); } } } + //if we aren already set then set to default + if (config.getHAPolicyConfiguration() == null) + { + config.setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()); + } + + config.setResolveProtocols(getBoolean(e, "resolve-protocols", config.isResolveProtocols())); // Defaults to true when using FileConfiguration @@ -1023,6 +1113,28 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { addressSettings.setSendToDLAOnNoRoute(XMLUtil.parseBoolean(child)); } + else if (SLOW_CONSUMER_THRESHOLD_NODE_NAME.equalsIgnoreCase(name)) + { + long slowConsumerThreshold = XMLUtil.parseLong(child); + Validators.MINUS_ONE_OR_GT_ZERO.validate(SLOW_CONSUMER_THRESHOLD_NODE_NAME, slowConsumerThreshold); + + addressSettings.setSlowConsumerThreshold(slowConsumerThreshold); + } + else if (SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME.equalsIgnoreCase(name)) + { + long slowConsumerCheckPeriod = XMLUtil.parseLong(child); + Validators.GT_ZERO.validate(SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME, slowConsumerCheckPeriod); + + addressSettings.setSlowConsumerCheckPeriod(slowConsumerCheckPeriod); + } + else if (SLOW_CONSUMER_POLICY_NODE_NAME.equalsIgnoreCase(name)) + { + String value = getTrimmedTextContent(child); + Validators.SLOW_CONSUMER_POLICY_TYPE.validate(SLOW_CONSUMER_POLICY_NODE_NAME, + value); + SlowConsumerPolicy policy = Enum.valueOf(SlowConsumerPolicy.class, value); + addressSettings.setSlowConsumerPolicy(policy); + } } return setting; } @@ -1054,7 +1166,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil } } - return new CoreQueueConfiguration(address, name, filterString, durable); + return new CoreQueueConfiguration() + .setAddress(address) + .setName(name) + .setFilterString(filterString) + .setDurable(durable); } private TransportConfiguration parseTransportConfiguration(final Element e, final Configuration mainConfig) @@ -1097,74 +1213,180 @@ public final class FileConfigurationParser extends XMLConfigurationUtil return new TransportConfiguration(clazz, params, name); } + private static final ArrayList<String> POLICY_LIST = new ArrayList<>(); + static + { + POLICY_LIST.add("colocated"); + POLICY_LIST.add("live-only"); + POLICY_LIST.add("replicated"); + POLICY_LIST.add("replica"); + POLICY_LIST.add("shared-store-master"); + POLICY_LIST.add("shared-store-slave"); + } + private static final ArrayList<String> HA_LIST = new ArrayList<>(); + static + { + HA_LIST.add("live-only"); + HA_LIST.add("shared-store"); + HA_LIST.add("replication"); + } private void parseHAPolicyConfiguration(final Element e, final Configuration mainConfig) { - String policyTemplate = e.getAttribute("template"); - HAPolicy policy; - if (policyTemplate.length() > 0) - { - policy = HAPolicyTemplate.valueOf(policyTemplate).getHaPolicy(); - } - else + for (String haType : HA_LIST) { - policy = HAPolicyTemplate.NONE.getHaPolicy(); + NodeList haNodeList = e.getElementsByTagName(haType); + if (haNodeList.getLength() > 0) + { + Element haNode = (Element) haNodeList.item(0); + if (haNode.getTagName().equals("replication")) + { + NodeList masterNodeList = e.getElementsByTagName("master"); + if (masterNodeList.getLength() > 0) + { + Element masterNode = (Element) masterNodeList.item(0); + mainConfig.setHAPolicyConfiguration(createReplicatedHaPolicy(masterNode)); + } + NodeList slaveNodeList = e.getElementsByTagName("slave"); + if (slaveNodeList.getLength() > 0) + { + Element slaveNode = (Element) slaveNodeList.item(0); + mainConfig.setHAPolicyConfiguration(createReplicaHaPolicy(slaveNode)); + } + NodeList colocatedNodeList = e.getElementsByTagName("colocated"); + if (colocatedNodeList.getLength() > 0) + { + Element colocatedNode = (Element) colocatedNodeList.item(0); + mainConfig.setHAPolicyConfiguration(createColocatedHaPolicy(colocatedNode, true)); + } + } + else if (haNode.getTagName().equals("shared-store")) + { + NodeList masterNodeList = e.getElementsByTagName("master"); + if (masterNodeList.getLength() > 0) + { + Element masterNode = (Element) masterNodeList.item(0); + mainConfig.setHAPolicyConfiguration(createSharedStoreMasterHaPolicy(masterNode)); + } + NodeList slaveNodeList = e.getElementsByTagName("slave"); + if (slaveNodeList.getLength() > 0) + { + Element slaveNode = (Element) slaveNodeList.item(0); + mainConfig.setHAPolicyConfiguration(createSharedStoreSlaveHaPolicy(slaveNode)); + } + NodeList colocatedNodeList = e.getElementsByTagName("colocated"); + if (colocatedNodeList.getLength() > 0) + { + Element colocatedNode = (Element) colocatedNodeList.item(0); + mainConfig.setHAPolicyConfiguration(createColocatedHaPolicy(colocatedNode, false)); + } + } + else if (haNode.getTagName().equals("live-only")) + { + NodeList noneNodeList = e.getElementsByTagName("live-only"); + Element noneNode = (Element) noneNodeList.item(0); + mainConfig.setHAPolicyConfiguration(createLiveOnlyHaPolicy(noneNode)); + } + } } - mainConfig.setHAPolicy(policy); + } - String policyType = getString(e, "policy-type", policy.getPolicyType().toString(), Validators.NOT_NULL_OR_EMPTY); + private LiveOnlyPolicyConfiguration createLiveOnlyHaPolicy(Element policyNode) + { + LiveOnlyPolicyConfiguration configuration = new LiveOnlyPolicyConfiguration(); - policy.setPolicyType(HAPolicy.POLICY_TYPE.valueOf(policyType)); + configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode)); - boolean requestBackup = getBoolean(e, "request-backup", policy.isRequestBackup()); + return configuration; + } - policy.setRequestBackup(requestBackup); + private ReplicatedPolicyConfiguration createReplicatedHaPolicy(Element policyNode) + { + ReplicatedPolicyConfiguration configuration = new ReplicatedPolicyConfiguration(); - int backupRequestRetries = getInteger(e, "backup-request-retries", policy.getBackupRequestRetries(), Validators.MINUS_ONE_OR_GE_ZERO); + configuration.setCheckForLiveServer(getBoolean(policyNode, "check-for-live-server", configuration.isCheckForLiveServer())); - policy.setBackupRequestRetries(backupRequestRetries); + configuration.setGroupName(getString(policyNode, "group-name", configuration.getGroupName(), Validators.NO_CHECK)); - long backupRequestRetryInterval = getLong(e, "backup-request-retry-interval", policy.getBackupRequestRetryInterval(), Validators.GT_ZERO); + configuration.setClusterName(getString(policyNode, "cluster-name", configuration.getClusterName(), Validators.NO_CHECK)); - policy.setBackupRequestRetryInterval(backupRequestRetryInterval); + return configuration; + } - int maxBackups = getInteger(e, "max-backups", policy.getMaxBackups(), Validators.GE_ZERO); + private ReplicaPolicyConfiguration createReplicaHaPolicy(Element policyNode) + { + ReplicaPolicyConfiguration configuration = new ReplicaPolicyConfiguration(); - policy.setMaxBackups(maxBackups); + configuration.setRestartBackup(getBoolean(policyNode, "restart-backup", configuration.isRestartBackup())); - int backupPortOffset = getInteger(e, "backup-port-offset", policy.getBackupPortOffset(), Validators.GT_ZERO); + configuration.setGroupName(getString(policyNode, "group-name", configuration.getGroupName(), Validators.NO_CHECK)); - policy.setBackupPortOffset(backupPortOffset); + configuration.setAllowFailBack(getBoolean(policyNode, "allow-failback", configuration.isAllowFailBack())); - String backupStrategy = getString(e, "backup-strategy", policy.getBackupStrategy().toString(), Validators.NOT_NULL_OR_EMPTY); + configuration.setFailbackDelay(getLong(policyNode, "failback-delay", configuration.getFailbackDelay(), Validators.GT_ZERO)); - policy.setBackupStrategy(BackupStrategy.valueOf(backupStrategy)); + configuration.setClusterName(getString(policyNode, "cluster-name", configuration.getClusterName(), Validators.NO_CHECK)); - String scaleDownDiscoveryGroup = getString(e, "scale-down-discovery-group", policy.getScaleDownDiscoveryGroup(), Validators.NO_CHECK); + configuration.setMaxSavedReplicatedJournalsSize(getInteger(policyNode, "max-saved-replicated-journals-size", + configuration.getMaxSavedReplicatedJournalsSize(), Validators.MINUS_ONE_OR_GE_ZERO)); - policy.setScaleDownDiscoveryGroup(scaleDownDiscoveryGroup); + configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode)); - String scaleDownDiscoveryGroupName = getString(e, "scale-down-group-name", policy.getScaleDownGroupName(), Validators.NO_CHECK); + return configuration; + } - policy.setScaleDownGroupName(scaleDownDiscoveryGroupName); + private SharedStoreMasterPolicyConfiguration createSharedStoreMasterHaPolicy(Element policyNode) + { + SharedStoreMasterPolicyConfiguration configuration = new SharedStoreMasterPolicyConfiguration(); - NodeList scaleDownConnectorNode = e.getElementsByTagName("scale-down-connectors"); + configuration.setFailoverOnServerShutdown(getBoolean(policyNode, "failover-on-shutdown", configuration.isFailoverOnServerShutdown())); - if (scaleDownConnectorNode != null && scaleDownConnectorNode.getLength() > 0) - { - NodeList scaleDownConnectors = scaleDownConnectorNode.item(0).getChildNodes(); - for (int i = 0; i < scaleDownConnectors.getLength(); i++) - { - Node child = scaleDownConnectors.item(i); - if (child.getNodeName().equals("connector-ref")) - { - String connectorName = getTrimmedTextContent(child); + configuration.setFailbackDelay(getLong(policyNode, "failback-delay", configuration.getFailbackDelay(), Validators.GT_ZERO)); - policy.getScaleDownConnectors().add(connectorName); - } - } - } + return configuration; + } + + private SharedStoreSlavePolicyConfiguration createSharedStoreSlaveHaPolicy(Element policyNode) + { + SharedStoreSlavePolicyConfiguration configuration = new SharedStoreSlavePolicyConfiguration(); + + configuration.setAllowFailBack(getBoolean(policyNode, "allow-failback", configuration.isAllowFailBack())); - NodeList remoteConnectorNode = e.getElementsByTagName("remote-connectors"); + configuration.setFailoverOnServerShutdown(getBoolean(policyNode, "failover-on-shutdown", configuration.isFailoverOnServerShutdown())); + + configuration.setFailbackDelay(getLong(policyNode, "failback-delay", configuration.getFailbackDelay(), Validators.GT_ZERO)); + + configuration.setRestartBackup(getBoolean(policyNode, "restart-backup", configuration.isRestartBackup())); + + configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode)); + + return configuration; + } + + private ColocatedPolicyConfiguration createColocatedHaPolicy(Element policyNode, boolean replicated) + { + ColocatedPolicyConfiguration configuration = new ColocatedPolicyConfiguration(); + + boolean requestBackup = getBoolean(policyNode, "request-backup", configuration.isRequestBackup()); + + configuration.setRequestBackup(requestBackup); + + int backupRequestRetries = getInteger(policyNode, "backup-request-retries", configuration.getBackupRequestRetries(), Validators.MINUS_ONE_OR_GE_ZERO); + + configuration.setBackupRequestRetries(backupRequestRetries); + + long backupRequestRetryInterval = getLong(policyNode, "backup-request-retry-interval", configuration.getBackupRequestRetryInterval(), Validators.GT_ZERO); + + configuration.setBackupRequestRetryInterval(backupRequestRetryInterval); + + int maxBackups = getInteger(policyNode, "max-backups", configuration.getMaxBackups(), Validators.GE_ZERO); + + configuration.setMaxBackups(maxBackups); + + int backupPortOffset = getInteger(policyNode, "backup-port-offset", configuration.getBackupPortOffset(), Validators.GT_ZERO); + + configuration.setBackupPortOffset(backupPortOffset); + + NodeList remoteConnectorNode = policyNode.getElementsByTagName("excludes"); if (remoteConnectorNode != null && remoteConnectorNode.getLength() > 0) { @@ -1175,31 +1397,65 @@ public final class FileConfigurationParser extends XMLConfigurationUtil if (child.getNodeName().equals("connector-ref")) { String connectorName = getTrimmedTextContent(child); - policy.getRemoteConnectors().add(connectorName); + configuration.getExcludedConnectors().add(connectorName); } } } - policy.setScaleDown(getBoolean(e, "scale-down", policy.isScaleDown())); + NodeList masterNodeList = policyNode.getElementsByTagName("master"); + if (masterNodeList.getLength() > 0) + { + Element masterNode = (Element) masterNodeList.item(0); + configuration.setLiveConfig(replicated ? createReplicatedHaPolicy(masterNode) : createSharedStoreMasterHaPolicy(masterNode)); + } + NodeList slaveNodeList = policyNode.getElementsByTagName("slave"); + if (slaveNodeList.getLength() > 0) + { + Element slaveNode = (Element) slaveNodeList.item(0); + configuration.setBackupConfig(replicated ? createReplicaHaPolicy(slaveNode) : createSharedStoreSlaveHaPolicy(slaveNode)); + } - policy.setScaleDownGroupName(getString(e, "scale-down-group-name", policy.getScaleDownGroupName(), Validators.NO_CHECK)); + return configuration; + } + private ScaleDownConfiguration parseScaleDownConfig(Element policyNode) + { + NodeList scaleDownNode = policyNode.getElementsByTagName("scale-down"); - policy.setCheckForLiveServer(getBoolean(e, "check-for-live-server", policy.isCheckForLiveServer())); + if (scaleDownNode.getLength() > 0) + { + ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration(); - policy.setAllowAutoFailBack(getBoolean(e, "allow-failback", policy.isCheckForLiveServer())); + Element scaleDownElement = (Element) scaleDownNode.item(0); - policy.setFailoverOnServerShutdown(getBoolean(e, "failover-on-shutdown", policy.isFailoverOnServerShutdown())); + scaleDownConfiguration.setEnabled(getBoolean(scaleDownElement, "enabled", scaleDownConfiguration.isEnabled())); - policy.setBackupGroupName(getString(e, "backup-group-name", policy.getBackupGroupName(), Validators.NO_CHECK)); + String scaleDownDiscoveryGroup = getString(scaleDownElement, "discovery-group", scaleDownConfiguration.getDiscoveryGroup(), Validators.NO_CHECK); - policy.setFailbackDelay(getLong(e, "failback-delay", policy.getFailbackDelay(), Validators.GT_ZERO)); + scaleDownConfiguration.setDiscoveryGroup(scaleDownDiscoveryGroup); - policy.setReplicationClustername(getString(e, "replication-clustername", null, Validators.NO_CHECK)); + String scaleDownDiscoveryGroupName = getString(scaleDownElement, "group-name", scaleDownConfiguration.getGroupName(), Validators.NO_CHECK); - policy.setScaleDownClustername(getString(e, "scale-down-clustername", null, Validators.NO_CHECK)); + scaleDownConfiguration.setGroupName(scaleDownDiscoveryGroupName); - policy.setMaxSavedReplicatedJournalSize(getInteger(e, "max-saved-replicated-journals-size", - policy.getMaxSavedReplicatedJournalsSize(), Validators.MINUS_ONE_OR_GE_ZERO)); + NodeList scaleDownConnectorNode = scaleDownElement.getElementsByTagName("connectors"); + + if (scaleDownConnectorNode != null && scaleDownConnectorNode.getLength() > 0) + { + NodeList scaleDownConnectors = scaleDownConnectorNode.item(0).getChildNodes(); + for (int i = 0; i < scaleDownConnectors.getLength(); i++) + { + Node child = scaleDownConnectors.item(i); + if (child.getNodeName().equals("connector-ref")) + { + String connectorName = getTrimmedTextContent(child); + + scaleDownConfiguration.getConnectors().add(connectorName); + } + } + } + return scaleDownConfiguration; + } + return null; } private void parseBroadcastGroupConfiguration(final Element e, final Configuration mainConfig) @@ -1251,10 +1507,18 @@ public final class FileConfigurationParser extends XMLConfigurationUtil } else { - endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration(groupAddress, groupPort, localAddress, localBindPort); + endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration() + .setGroupAddress(groupAddress) + .setGroupPort(groupPort) + .setLocalBindAddress(localAddress) + .setLocalBindPort(localBindPort); } - BroadcastGroupConfiguration config = new BroadcastGroupConfiguration(name, broadcastPeriod, connectorNames, endpointFactoryConfiguration); + BroadcastGroupConfiguration config = new BroadcastGroupConfiguration() + .setName(name) + .setBroadcastPeriod(broadcastPeriod) + .setConnectorInfos(connectorNames) + .setEndpointFactoryConfiguration(endpointFactoryConfiguration); mainConfig.getBroadcastGroupConfigurations().add(config); } @@ -1291,10 +1555,18 @@ public final class FileConfigurationParser extends XMLConfigurationUtil } else { - endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration(groupAddress, groupPort, localBindAddress, localBindPort); + endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration() + .setGroupAddress(groupAddress) + .setGroupPort(groupPort) + .setLocalBindAddress(localBindAddress) + .setLocalBindPort(localBindPort); } - DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(name, refreshTimeout, discoveryInitialWaitTimeout, endpointFactoryConfiguration); + DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration() + .setName(name) + .setRefreshTimeout(refreshTimeout) + .setDiscoveryInitialWaitTimeout(discoveryInitialWaitTimeout) + .setBroadcastEndpointFactoryConfiguration(endpointFactoryConfiguration); if (mainConfig.getDiscoveryGroupConfigurations().containsKey(name)) { @@ -1357,7 +1629,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil int confirmationWindowSize = - getInteger(e, "confirmation-window-size", FileConfiguration.DEFAULT_CONFIRMATION_WINDOW_SIZE, + getInteger(e, "confirmation-window-size", HornetQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(), Validators.GT_ZERO); long clusterNotificationInterval = getLong(e, "notification-interval", HornetQDefaultConfiguration.getDefaultClusterNotificationInterval(), Validators.GT_ZERO); @@ -1393,44 +1665,35 @@ public final class FileConfigurationParser extends XMLConfigurationUtil } } - ClusterConnectionConfiguration config; + ClusterConnectionConfiguration config = new ClusterConnectionConfiguration() + .setName(name) + .setAddress(address) + .setConnectorName(connectorName) + .setMinLargeMessageSize(minLargeMessageSize) + .setClientFailureCheckPeriod(clientFailureCheckPeriod) + .setConnectionTTL(connectionTTL) + .setRetryInterval(retryInterval) + .setRetryIntervalMultiplier(retryIntervalMultiplier) + .setMaxRetryInterval(maxRetryInterval) + .setInitialConnectAttempts(initialConnectAttempts) + .setReconnectAttempts(reconnectAttempts) + .setCallTimeout(callTimeout) + .setCallFailoverTimeout(callFailoverTimeout) + .setDuplicateDetection(duplicateDetection) + .setForwardWhenNoConsumers(forwardWhenNoConsumers) + .setMaxHops(maxHops) + .setConfirmationWindowSize(confirmationWindowSize) + .setAllowDirectConnectionsOnly(allowDirectConnectionsOnly) + .setClusterNotificationInterval(clusterNotificationInterval) + .setClusterNotificationAttempts(clusterNotificationAttempts); if (discoveryGroupName == null) { - config = - new ClusterConnectionConfiguration(name, address, connectorName, - minLargeMessageSize, clientFailureCheckPeriod, connectionTTL, - retryInterval, retryIntervalMultiplier, maxRetryInterval, - initialConnectAttempts, reconnectAttempts, callTimeout, callFailoverTimeout, - duplicateDetection, forwardWhenNoConsumers, maxHops, - confirmationWindowSize, - staticConnectorNames, - allowDirectConnectionsOnly, - clusterNotificationInterval, - clusterNotificationAttempts, - scaleDownConnector); + config.setStaticConnectors(staticConnectorNames); } else { - config = - new ClusterConnectionConfiguration(name, address, connectorName, - minLargeMessageSize, clientFailureCheckPeriod, - connectionTTL, - retryInterval, - retryIntervalMultiplier, - maxRetryInterval, - initialConnectAttempts, - reconnectAttempts, - callTimeout, - callFailoverTimeout, - duplicateDetection, - forwardWhenNoConsumers, - maxHops, - confirmationWindowSize, - discoveryGroupName, - clusterNotificationInterval, - clusterNotificationAttempts, - scaleDownConnector); + config.setDiscoveryGroupName(discoveryGroupName); } mainConfig.getClusterConfigurations().add(config); @@ -1441,17 +1704,19 @@ public final class FileConfigurationParser extends XMLConfigurationUtil String name = node.getAttribute("name"); String type = getString(node, "type", null, Validators.NOT_NULL_OR_EMPTY); String address = getString(node, "address", null, Validators.NOT_NULL_OR_EMPTY); - Integer timeout = getInteger(node, "timeout", GroupingHandlerConfiguration.DEFAULT_TIMEOUT, Validators.GT_ZERO); - Long groupTimeout = getLong(node, "group-timeout", GroupingHandlerConfiguration.DEFAULT_GROUP_TIMEOUT, Validators.MINUS_ONE_OR_GT_ZERO); - Long reaperPeriod = getLong(node, "reaper-period", GroupingHandlerConfiguration.DEFAULT_REAPER_PERIOD, Validators.GT_ZERO); - mainConfiguration.setGroupingHandlerConfiguration(new GroupingHandlerConfiguration(new SimpleString(name), - type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType()) - ? GroupingHandlerConfiguration.TYPE.LOCAL - : GroupingHandlerConfiguration.TYPE.REMOTE, - new SimpleString(address), - timeout, - groupTimeout, - reaperPeriod)); + Integer timeout = getInteger(node, "timeout", HornetQDefaultConfiguration.getDefaultGroupingHandlerTimeout(), Validators.GT_ZERO); + Long groupTimeout = getLong(node, "group-timeout", HornetQDefaultConfiguration.getDefaultGroupingHandlerGroupTimeout(), Validators.MINUS_ONE_OR_GT_ZERO); + Long reaperPeriod = getLong(node, "reaper-period", HornetQDefaultConfiguration.getDefaultGroupingHandlerReaperPeriod(), Validators.GT_ZERO); + mainConfiguration.setGroupingHandlerConfiguration(new GroupingHandlerConfiguration() + .setName(new SimpleString(name)) + .setType( + type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType()) + ? GroupingHandlerConfiguration.TYPE.LOCAL + : GroupingHandlerConfiguration.TYPE.REMOTE) + .setAddress(new SimpleString(address)) + .setTimeout(timeout) + .setGroupTimeout(groupTimeout) + .setReaperPeriod(reaperPeriod)); } private void parseBridgeConfiguration(final Element brNode, final Configuration mainConfig) throws Exception @@ -1466,7 +1731,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil // Default bridge conf int confirmationWindowSize = - getInteger(brNode, "confirmation-window-size", FileConfiguration.DEFAULT_CONFIRMATION_WINDOW_SIZE, + getInteger(brNode, "confirmation-window-size", HornetQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(), Validators.GT_ZERO); long retryInterval = getLong(brNode, "retry-interval", HornetQClient.DEFAULT_RETRY_INTERVAL, Validators.GT_ZERO); @@ -1561,53 +1826,34 @@ public final class FileConfigurationParser extends XMLConfigurationUtil } } - BridgeConfiguration config; + BridgeConfiguration config = new BridgeConfiguration() + .setName(name) + .setQueueName(queueName) + .setForwardingAddress(forwardingAddress) + .setFilterString(filterString) + .setTransformerClassName(transformerClassName) + .setMinLargeMessageSize(minLargeMessageSize) + .setClientFailureCheckPeriod(clientFailureCheckPeriod) + .setConnectionTTL(connectionTTL) + .setRetryInterval(retryInterval) + .setMaxRetryInterval(maxRetryInterval) + .setRetryIntervalMultiplier(retryIntervalMultiplier) + .setInitialConnectAttempts(initialConnectAttempts) + .setReconnectAttempts(reconnectAttempts) + .setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode) + .setUseDuplicateDetection(useDuplicateDetection) + .setConfirmationWindowSize(confirmationWindowSize) + .setHA(ha) + .setUser(user) + .setPassword(password); if (!staticConnectorNames.isEmpty()) { - config = new BridgeConfiguration(name, - queueName, - forwardingAddress, - filterString, - transformerClassName, - minLargeMessageSize, - clientFailureCheckPeriod, - connectionTTL, - retryInterval, - maxRetryInterval, - retryIntervalMultiplier, - initialConnectAttempts, - reconnectAttempts, - reconnectAttemptsSameNode, - useDuplicateDetection, - confirmationWindowSize, - staticConnectorNames, - ha, - user, - password); + config.setStaticConnectors(staticConnectorNames); } else { - config = new BridgeConfiguration(name, - queueName, - forwardingAddress, - filterString, - transformerClassName, - minLargeMessageSize, - clientFailureCheckPeriod, - connectionTTL, - retryInterval, - maxRetryInterval, - retryIntervalMultiplier, - initialConnectAttempts, - reconnectAttempts, - reconnectAttemptsSameNode, - useDuplicateDetection, - confirmationWindowSize, - discoveryGroupName, - ha, - user, - password); + config.setDiscoveryGroupName(discoveryGroupName); } mainConfig.getBridgeConfigurations().add(config); @@ -1655,13 +1901,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil } } - DivertConfiguration config = new DivertConfiguration(name, - routingName, - address, - forwardingAddress, - exclusive, - filterString, - transformerClassName); + DivertConfiguration config = new DivertConfiguration() + .setName(name) + .setRoutingName(routingName) + .setAddress(address) + .setForwardingAddress(forwardingAddress) + .setExclusive(exclusive) + .setFilterString(filterString) + .setTransformerClassName(transformerClassName); mainConfig.getDivertConfigurations().add(config); } @@ -1693,6 +1940,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil params.put(key, nValue.getTextContent()); } - return new ConnectorServiceConfiguration(clazz, params, name); + return new ConnectorServiceConfiguration() + .setFactoryClassName(clazz) + .setParams(params) + .setName(name); } }
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java b/hornetq-server/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java index ad9cc47..d606178 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java @@ -24,25 +24,25 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import javax.management.ListenerNotFoundException; import javax.management.MBeanNotificationInfo; import javax.management.MBeanOperationInfo; +import javax.management.Notification; import javax.management.NotificationBroadcasterSupport; import javax.management.NotificationEmitter; import javax.management.NotificationFilter; import javax.management.NotificationListener; import javax.transaction.xa.Xid; -import org.hornetq.api.config.HornetQDefaultConfiguration; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.api.core.client.HornetQClient; import org.hornetq.api.core.management.AddressControl; import org.hornetq.api.core.management.BridgeControl; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.DivertControl; import org.hornetq.api.core.management.HornetQServerControl; -import org.hornetq.api.core.management.NotificationType; import org.hornetq.api.core.management.QueueControl; import org.hornetq.core.config.BridgeConfiguration; import org.hornetq.core.config.Configuration; @@ -52,18 +52,29 @@ import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl; import org.hornetq.core.persistence.StorageManager; import org.hornetq.core.persistence.config.PersistedAddressSetting; import org.hornetq.core.persistence.config.PersistedRoles; +import org.hornetq.core.postoffice.Binding; import org.hornetq.core.postoffice.DuplicateIDCache; import org.hornetq.core.postoffice.PostOffice; +import org.hornetq.core.postoffice.impl.LocalQueueBinding; import org.hornetq.core.remoting.server.RemotingService; import org.hornetq.core.security.CheckType; import org.hornetq.core.security.Role; +import org.hornetq.core.server.Consumer; import org.hornetq.core.server.HornetQMessageBundle; import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.HornetQServerLogger; import org.hornetq.core.server.JournalType; +import org.hornetq.core.server.Queue; +import org.hornetq.core.server.ServerConsumer; import org.hornetq.core.server.ServerSession; +import org.hornetq.core.server.cluster.ha.HAPolicy; +import org.hornetq.core.server.cluster.ha.LiveOnlyPolicy; +import org.hornetq.core.server.cluster.ha.ScaleDownPolicy; +import org.hornetq.core.server.cluster.ha.SharedStoreSlavePolicy; import org.hornetq.core.server.group.GroupingHandler; import org.hornetq.core.settings.impl.AddressFullMessagePolicy; import org.hornetq.core.settings.impl.AddressSettings; +import org.hornetq.core.settings.impl.SlowConsumerPolicy; import org.hornetq.core.transaction.ResourceManager; import org.hornetq.core.transaction.Transaction; import org.hornetq.core.transaction.TransactionDetail; @@ -71,6 +82,7 @@ import org.hornetq.core.transaction.impl.CoreTransactionDetail; import org.hornetq.core.transaction.impl.XidImpl; import org.hornetq.spi.core.protocol.RemotingConnection; import org.hornetq.utils.SecurityFormatter; +import org.hornetq.utils.TypedProperties; import org.hornetq.utils.json.JSONArray; import org.hornetq.utils.json.JSONObject; @@ -79,7 +91,8 @@ import org.hornetq.utils.json.JSONObject; * * */ -public class HornetQServerControlImpl extends AbstractControl implements HornetQServerControl, NotificationEmitter +public class HornetQServerControlImpl extends AbstractControl implements HornetQServerControl, NotificationEmitter, + org.hornetq.core.server.management.NotificationListener { // Constants ----------------------------------------------------- @@ -99,6 +112,7 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ private final NotificationBroadcasterSupport broadcaster; + private final AtomicLong notifSeq = new AtomicLong(0); // Static -------------------------------------------------------- // Constructors -------------------------------------------------- @@ -120,6 +134,7 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ server = messagingServer; this.messageCounterManager = messageCounterManager; this.broadcaster = broadcaster; + server.getManagementService().addNotificationListener(this); } // HornetQServerControlMBean implementation -------------------- @@ -159,7 +174,7 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ clearIO(); try { - return configuration.getHAPolicy().isBackup(); + return server.getHAPolicy().isBackup(); } finally { @@ -174,7 +189,7 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ clearIO(); try { - return configuration.getHAPolicy().isSharedStore(); + return server.getHAPolicy().isSharedStore(); } finally { @@ -284,7 +299,11 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ clearIO(); try { - configuration.getHAPolicy().setFailoverOnServerShutdown(failoverOnServerShutdown); + HAPolicy haPolicy = server.getHAPolicy(); + if (haPolicy instanceof SharedStoreSlavePolicy) + { + ((SharedStoreSlavePolicy) haPolicy).setFailoverOnServerShutdown(failoverOnServerShutdown); + } } finally { @@ -300,7 +319,15 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ clearIO(); try { - return configuration.getHAPolicy().isFailoverOnServerShutdown(); + HAPolicy haPolicy = server.getHAPolicy(); + if (haPolicy instanceof SharedStoreSlavePolicy) + { + return ((SharedStoreSlavePolicy) haPolicy).isFailoverOnServerShutdown(); + } + else + { + return false; + } } finally { @@ -1238,8 +1265,8 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ String remoteAddress = connection.getRemoteAddress(); if (remoteAddress.contains(ipAddress)) { - remotingService.removeConnection(connection.getID()); connection.fail(HornetQMessageBundle.BUNDLE.connectionsClosedByManagement(ipAddress)); + remotingService.removeConnection(connection.getID()); closed = true; } } @@ -1253,6 +1280,94 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ } + public synchronized boolean closeConsumerConnectionsForAddress(final String address) + { + boolean closed = false; + checkStarted(); + + clearIO(); + try + { + for (Binding binding : postOffice.getMatchingBindings(SimpleString.toSimpleString(address)).getBindings()) + { + if (binding instanceof LocalQueueBinding) + { + Queue queue = ((LocalQueueBinding) binding).getQueue(); + for (Consumer consumer : queue.getConsumers()) + { + if (consumer instanceof ServerConsumer) + { + ServerConsumer serverConsumer = (ServerConsumer) consumer; + RemotingConnection connection = null; + + for (RemotingConnection potentialConnection : remotingService.getConnections()) + { + if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) + { + connection = potentialConnection; + } + } + + if (connection != null) + { + remotingService.removeConnection(connection.getID()); + connection.fail(HornetQMessageBundle.BUNDLE.consumerConnectionsClosedByManagement(address)); + closed = true; + } + } + } + } + } + } + catch (Exception e) + { + HornetQServerLogger.LOGGER.failedToCloseConsumerConnectionsForAddress(address, e); + } + finally + { + blockOnIO(); + } + return closed; + } + + public synchronized boolean closeConnectionsForUser(final String userName) + { + boolean closed = false; + checkStarted(); + + clearIO(); + try + { + for (ServerSession serverSession : server.getSessions()) + { + if (serverSession.getUsername() != null && serverSession.getUsername().equals(userName)) + { + RemotingConnection connection = null; + + for (RemotingConnection potentialConnection : remotingService.getConnections()) + { + if (potentialConnection.getID().toString().equals(serverSession.getConnectionID().toString())) + { + connection = potentialConnection; + } + } + + if (connection != null) + { + remotingService.removeConnection(connection.getID()); + connection.fail(HornetQMessageBundle.BUNDLE.connectionsForUserClosedByManagement(userName)); + closed = true; + } + } + } + } + finally + { + blockOnIO(); + } + return closed; + } + public String[] listConnectionIDs() { checkStarted(); @@ -1511,6 +1626,11 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ : addressSettings.getAddressFullMessagePolicy() == AddressFullMessagePolicy.DROP ? "DROP" : "FAIL"; settings.put("addressFullMessagePolicy", policy); + settings.put("slowConsumerThreshold", addressSettings.getSlowConsumerThreshold()); + settings.put("slowConsumerCheckPeriod", addressSettings.getSlowConsumerCheckPeriod()); + policy = addressSettings.getSlowConsumerPolicy() == SlowConsumerPolicy.NOTIFY ? "NOTIFY" + : "KILL"; + settings.put("slowConsumerPolicy", policy); JSONObject jsonObject = new JSONObject(settings); return jsonObject.toString(); @@ -1531,7 +1651,10 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ final long maxRedeliveryDelay, final long redistributionDelay, final boolean sendToDLAOnNoRoute, - final String addressFullMessagePolicy) throws Exception + final String addressFullMessagePolicy, + final long slowConsumerThreshold, + final long slowConsumerCheckPeriod, + final String slowConsumerPolicy) throws Exception { checkStarted(); @@ -1580,6 +1703,20 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ { addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); } + addressSettings.setSlowConsumerThreshold(slowConsumerThreshold); + addressSettings.setSlowConsumerCheckPeriod(slowConsumerCheckPeriod); + if (slowConsumerPolicy == null) + { + addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY); + } + else if (slowConsumerPolicy.equalsIgnoreCase("NOTIFY")) + { + addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY); + } + else if (slowConsumerPolicy.equalsIgnoreCase("KILL")) + { + addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); + } server.getAddressSettingsRepository().addMatch(address, addressSettings); storageManager.storeAddressSetting(new PersistedAddressSetting(new SimpleString(address), addressSettings)); @@ -1652,13 +1789,14 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ clearIO(); try { - DivertConfiguration config = new DivertConfiguration(name, - routingName, - address, - forwardingAddress, - exclusive, - filterString, - transformerClassName); + DivertConfiguration config = new DivertConfiguration() + .setName(name) + .setRoutingName(routingName) + .setAddress(address) + .setForwardingAddress(forwardingAddress) + .setExclusive(exclusive) + .setFilterString(filterString) + .setTransformerClassName(transformerClassName); server.deployDivert(config); } finally @@ -1717,7 +1855,7 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ final boolean useDuplicateDetection, final int confirmationWindowSize, final long clientFailureCheckPeriod, - final String connectorNames, + final String staticConnectorsOrDiscoveryGroup, boolean useDiscoveryGroup, final boolean ha, final String user, @@ -1727,57 +1865,34 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ clearIO(); - try { - BridgeConfiguration config = null; + BridgeConfiguration config = new BridgeConfiguration() + .setName(name) + .setQueueName(queueName) + .setForwardingAddress(forwardingAddress) + .setFilterString(filterString) + .setTransformerClassName(transformerClassName) + .setClientFailureCheckPeriod(clientFailureCheckPeriod) + .setRetryInterval(retryInterval) + .setRetryIntervalMultiplier(retryIntervalMultiplier) + .setInitialConnectAttempts(initialConnectAttempts) + .setReconnectAttempts(reconnectAttempts) + .setUseDuplicateDetection(useDuplicateDetection) + .setConfirmationWindowSize(confirmationWindowSize) + .setHA(ha) + .setUser(user) + .setPassword(password); + if (useDiscoveryGroup) { - config = new BridgeConfiguration(name, - queueName, - forwardingAddress, - filterString, - transformerClassName, - HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, - clientFailureCheckPeriod, - HornetQClient.DEFAULT_CONNECTION_TTL, - retryInterval, - HornetQClient.DEFAULT_MAX_RETRY_INTERVAL, - retryIntervalMultiplier, - initialConnectAttempts, - reconnectAttempts, - HornetQDefaultConfiguration.getDefaultBridgeConnectSameNode(), - useDuplicateDetection, - confirmationWindowSize, - connectorNames, - ha, - user, - password); + config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup); } else { - List<String> connectors = toList(connectorNames); - config = new BridgeConfiguration(name, - queueName, - forwardingAddress, - filterString, - transformerClassName, - HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, - clientFailureCheckPeriod, - HornetQClient.DEFAULT_CONNECTION_TTL, - retryInterval, - HornetQClient.DEFAULT_MAX_RETRY_INTERVAL, - retryIntervalMultiplier, - initialConnectAttempts, - reconnectAttempts, - HornetQDefaultConfiguration.getDefaultBridgeConnectSameNode(), - useDuplicateDetection, - confirmationWindowSize, - connectors, - ha, - user, - password); + config.setStaticConnectors(toList(staticConnectorsOrDiscoveryGroup)); } + server.deployBridge(config); } finally @@ -1835,6 +1950,35 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ blockOnIO(); } } + + @Override + public void scaleDown(String connector) throws Exception + { + checkStarted(); + + clearIO(); + HAPolicy haPolicy = server.getHAPolicy(); + if (haPolicy instanceof LiveOnlyPolicy) + { + LiveOnlyPolicy liveOnlyPolicy = (LiveOnlyPolicy) haPolicy; + + if (liveOnlyPolicy.getScaleDownPolicy() == null) + { + liveOnlyPolicy.setScaleDownPolicy(new ScaleDownPolicy()); + } + + liveOnlyPolicy.getScaleDownPolicy().setEnabled(true); + + if (connector != null) + { + liveOnlyPolicy.getScaleDownPolicy().getConnectors().add(0, connector); + } + + server.stop(true); + } + + } + // NotificationEmitter implementation ---------------------------- public void removeNotificationListener(final NotificationListener listener, @@ -1882,7 +2026,7 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ public MBeanNotificationInfo[] getNotificationInfo() { - NotificationType[] values = NotificationType.values(); + CoreNotificationType[] values = CoreNotificationType.values(); String[] names = new String[values.length]; for (int i = 0; i < values.length; i++) { @@ -2028,5 +2172,16 @@ public class HornetQServerControlImpl extends AbstractControl implements HornetQ return list; } + @Override + public void onNotification(org.hornetq.core.server.management.Notification notification) + { + if (!(notification.getType() instanceof CoreNotificationType)) return; + CoreNotificationType type = (CoreNotificationType) notification.getType(); + TypedProperties prop = notification.getProperties(); + + this.broadcaster.sendNotification(new Notification(type.toString(), this, + notifSeq.incrementAndGet(), notification.toString())); + } + } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/management/impl/QueueControlImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/management/impl/QueueControlImpl.java b/hornetq-server/src/main/java/org/hornetq/core/management/impl/QueueControlImpl.java index 7d3102c..8208ad4 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/management/impl/QueueControlImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/management/impl/QueueControlImpl.java @@ -254,6 +254,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl } } + public long getMessagesAcknowledged() + { + checkStarted(); + + clearIO(); + try + { + return queue.getMessagesAcknowledged(); + } + finally + { + blockOnIO(); + } + } + public long getID() { checkStarted(); @@ -926,6 +941,22 @@ public class QueueControlImpl extends AbstractControl implements QueueControl } } + + public void flushExecutor() + { + checkStarted(); + + clearIO(); + try + { + queue.flushExecutor(); + } + finally + { + blockOnIO(); + } + } + @Override public String listConsumersAsJSON() throws Exception { @@ -987,6 +1018,22 @@ public class QueueControlImpl extends AbstractControl implements QueueControl } + public void resetMessagesAcknowledged() throws Exception + { + checkStarted(); + + clearIO(); + try + { + queue.resetMessagesAcknowledged(); + } + finally + { + blockOnIO(); + } + + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java b/hornetq-server/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java index d342bce..96a0851 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java +++ b/hornetq-server/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java @@ -130,7 +130,7 @@ public class MessageCounter { public void run() { - long latestMessagesAdded = serverQueue.getInstantMessagesAdded(); + long latestMessagesAdded = serverQueue.getMessagesAdded(); long newMessagesAdded = latestMessagesAdded - lastMessagesAdded; @@ -213,7 +213,7 @@ public class MessageCounter */ public long getMessageCount() { - return serverQueue.getInstantMessageCount(); + return serverQueue.getMessageCount(); } /** @@ -222,7 +222,7 @@ public class MessageCounter */ public long getMessageCountDelta() { - long current = serverQueue.getInstantMessageCount(); + long current = serverQueue.getMessageCount(); int delta = (int)(current - depthLast); depthLast = current; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/PagingManager.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/paging/PagingManager.java b/hornetq-server/src/main/java/org/hornetq/core/paging/PagingManager.java index def8e4d..7e6b0e1 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/paging/PagingManager.java +++ b/hornetq-server/src/main/java/org/hornetq/core/paging/PagingManager.java @@ -21,16 +21,16 @@ import org.hornetq.core.settings.HierarchicalRepositoryChangeListener; /** * <PRE> * - * +------------+ 1 +-------------+ N +------------+ N +-------+ 1 +----------------+ - * | {@link PostOffice} |-------> |{@link PagingManager}|-------> |{@link PagingStore} | ------> | {@link Page} | ------> | {@link SequentialFile} | - * +------------+ +-------------+ +------------+ +-------+ +----------------+ - * | 1 ^ - * | | - * | | - * | | 1 - * | N +---------+ / - * +--------> | {@link Address} | - * +---------+ + * +--------------+ 1 +----------------+ N +--------------+ N +--------+ 1 +-------------------+ + * | {@link org.hornetq.core.postoffice.PostOffice} |-------> |{@link PagingManager}|-------> |{@link PagingStore} | ------> | {@link org.hornetq.core.paging.impl.Page} | ------> | {@link org.hornetq.core.journal.SequentialFile} | + * +--------------+ +----------------+ +--------------+ +--------+ +-------------------+ + * | 1 ^ + * | | + * | | + * | | 1 + * | N +----------+ + * +------------> | {@link org.hornetq.core.postoffice.Address} | + * +----------+ * </PRE> * @author <a href="mailto:[email protected]">Clebert Suconic</a> * @author <a href="mailto:[email protected]">Tim Fox</a> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/PagingStore.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/paging/PagingStore.java b/hornetq-server/src/main/java/org/hornetq/core/paging/PagingStore.java index e286f40..d151d72 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/paging/PagingStore.java +++ b/hornetq-server/src/main/java/org/hornetq/core/paging/PagingStore.java @@ -154,7 +154,7 @@ public interface PagingStore extends HornetQComponent /** * Sends the pages with given IDs to the {@link ReplicationManager}. * <p/> - * Sending is done here to avoid exposing the internal {@link SequentialFile}s. + * Sending is done here to avoid exposing the internal {@link org.hornetq.core.journal.SequentialFile}s. * * @param replicator * @param pageIds http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java index bf85d1d..11a5ba4 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -91,9 +91,9 @@ public class PageCursorProviderImpl implements PageCursorProvider public synchronized PageSubscription createSubscription(long cursorID, Filter filter, boolean persistent) { - if (HornetQServerLogger.LOGGER.isDebugEnabled()) + if (HornetQServerLogger.LOGGER.isTraceEnabled()) { - HornetQServerLogger.LOGGER.debug(this.pagingStore.getAddress() + " creating subscription " + cursorID + " with filter " + filter, new Exception("trace")); + HornetQServerLogger.LOGGER.trace(this.pagingStore.getAddress() + " creating subscription " + cursorID + " with filter " + filter, new Exception("trace")); } if (activeCursors.containsKey(cursorID)) http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java index 5b2cedc..12fda59 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java @@ -41,14 +41,14 @@ public class PagePositionImpl implements PagePosition */ public PagePositionImpl(long pageNr, int messageNr) { - super(); + this(); this.pageNr = pageNr; this.messageNr = messageNr; } public PagePositionImpl() { - + super(); } /** @@ -151,4 +151,12 @@ public class PagePositionImpl implements PagePosition return "PagePositionImpl [pageNr=" + pageNr + ", messageNr=" + messageNr + ", recordID=" + recordID + "]"; } + /** + * I needed a finalize method defined here just as a way to get a hook on the PagingLeakTest through ByteMan + * There is a rule for finalizing it where I'm establishing a counter, and that rule won't work without this method defined. + * So, please don't remove it unless you had to remove that test for any weird reason.. it's here for a purpose! + */ + protected void finalize() + { + } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java index 4100d0d..ddd0880 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java @@ -346,7 +346,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter long newRecordID = -1; - long txCleanup = storage.generateUniqueID(); + long txCleanup = storage.generateID(); try { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java index 24d8bb7..acd56f6 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -335,6 +335,7 @@ final class PageSubscriptionImpl implements PageSubscription } infoPG.acks.clear(); + infoPG.removedReferences.clear(); } tx.addOperation(new TransactionOperationAbstract() @@ -672,7 +673,7 @@ final class PageSubscriptionImpl implements PageSubscription */ public void destroy() throws Exception { - final long tx = store.generateUniqueID(); + final long tx = store.generateID(); try { @@ -752,7 +753,7 @@ final class PageSubscriptionImpl implements PageSubscription HornetQServerLogger.LOGGER.pageNotFound(pos); if (txDeleteCursorOnReload == -1) { - txDeleteCursorOnReload = store.generateUniqueID(); + txDeleteCursorOnReload = store.generateID(); } store.deleteCursorAcknowledgeTransactional(txDeleteCursorOnReload, pos.getRecordID()); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java b/hornetq-server/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java index 2c16ef0..df06d41 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java @@ -926,7 +926,7 @@ public class PagingStoreImpl implements PagingStore currentPage.write(pagedMessage); - if (tx == null && syncNonTransactional) + if (tx == null && syncNonTransactional && message.isDurable()) { sync(); } @@ -1197,7 +1197,7 @@ public class PagingStoreImpl implements PagingStore } // To be used on isDropMessagesWhenFull - private boolean isFull() + public boolean isFull() { return maxSize > 0 && getAddressSize() > maxSize; } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/persistence/StorageManager.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/persistence/StorageManager.java b/hornetq-server/src/main/java/org/hornetq/core/persistence/StorageManager.java index 7173f45..559a3c9 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/persistence/StorageManager.java +++ b/hornetq-server/src/main/java/org/hornetq/core/persistence/StorageManager.java @@ -46,6 +46,7 @@ import org.hornetq.core.server.group.impl.GroupBinding; import org.hornetq.core.server.impl.JournalLoader; import org.hornetq.core.transaction.ResourceManager; import org.hornetq.core.transaction.Transaction; +import org.hornetq.utils.IDGenerator; /** * A StorageManager @@ -53,8 +54,17 @@ import org.hornetq.core.transaction.Transaction; * @author <a href="mailto:[email protected]">Tim Fox</a> * @author <a href="mailto:[email protected]">Clebert Suconic</a> * @author <a href="mailto:[email protected]>Andy Taylor</a> + * + * Note about IDGEnerator + * + * I've changed StorageManager to extend IDGenerator, because in some places + * all we needed from the StorageManager was the idGeneration. + * I couldn't just get the IDGenerator from the inner part because the NullPersistent has its own sequence. + * So the best was to add the interface and adjust the callers for the method + * + */ -public interface StorageManager extends HornetQComponent +public interface StorageManager extends IDGenerator, HornetQComponent { /** @@ -144,10 +154,6 @@ public interface StorageManager extends HornetQComponent void clearContext(); - long generateUniqueID(); - - long getCurrentUniqueID(); - /** * Confirms that a large message was finished */ @@ -336,8 +342,7 @@ public interface StorageManager extends HornetQComponent Journal getMessageJournal(); /** - * @see JournalStorageManager#startReplication(ReplicationManager, PagingManager, String, - * boolean) + * @see org.hornetq.core.persistence.impl.journal.JournalStorageManager#startReplication(org.hornetq.core.replication.ReplicationManager, org.hornetq.core.paging.PagingManager, String, boolean) */ void startReplication(ReplicationManager replicationManager, PagingManager pagingManager, String nodeID, boolean autoFailBack) throws Exception; @@ -397,10 +402,10 @@ public interface StorageManager extends HornetQComponent void readUnLock(); /** - * Closes the {@link IDGenerator} persisting the current record ID. + * Closes the {@link org.hornetq.utils.IDGenerator} persisting the current record ID. * <p/> * Effectively a "pre-stop" method. Necessary due to the "stop"-order at - * {@link HornetQServerImpl} + * {@link org.hornetq.core.server.impl.HornetQServerImpl} */ void persistIdGenerator(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/DescribeJournal.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/DescribeJournal.java b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/DescribeJournal.java index 77f01dc..d00e056 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/DescribeJournal.java +++ b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/DescribeJournal.java @@ -14,17 +14,14 @@ package org.hornetq.core.persistence.impl.journal; import javax.transaction.xa.Xid; import java.io.PrintStream; -import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.HornetQBuffers; import org.hornetq.api.core.Message; -import org.hornetq.api.core.SimpleString; import org.hornetq.core.config.impl.ConfigurationImpl; import org.hornetq.core.journal.EncodingSupport; import org.hornetq.core.journal.PreparedTransactionInfo; @@ -667,29 +664,8 @@ public final class DescribeJournal { buffer.append(";userMessageID=" + msg.getUserID().toString()); } - buffer.append(";properties=["); - Set<SimpleString> properties = msg.getPropertyNames(); - - for (SimpleString prop : properties) - { - Object value = msg.getObjectProperty(prop); - if (value instanceof byte[]) - { - buffer.append(prop + "=" + Arrays.toString((byte[])value) + ","); - - } - else - { - buffer.append(prop + "=" + value + ","); - } - } - - buffer.append("#properties = " + properties.size()); - - buffer.append("]"); - - buffer.append(" - " + msg.toString()); + buffer.append(";msg=" + msg.toString()); return buffer.toString(); }
