Repository: activemq Updated Branches: refs/heads/master e73ab3483 -> eb9c584fb
https://issues.apache.org/jira/browse/AMQ-6373 More tests and cleanup Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/eb9c584f Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/eb9c584f Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/eb9c584f Branch: refs/heads/master Commit: eb9c584fbd86e52b2cc7bb7d209068a8865fa7b4 Parents: e73ab34 Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Mon Aug 1 12:27:34 2016 -0400 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Mon Aug 1 12:27:34 2016 -0400 ---------------------------------------------------------------------- .../command/BrokerSubscriptionInfo.java | 3 +- .../network/DurableSyncNetworkBridgeTest.java | 63 ++++++++++++++-- .../openwire/BrokerSubscriptionInfoTest.java | 78 ++++++++++++++++++++ 3 files changed, 137 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/eb9c584f/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java index 57f854a..0f6a11b 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java @@ -19,7 +19,8 @@ package org.apache.activemq.command; import org.apache.activemq.state.CommandVisitor; /** - * Used to represent a durable subscription. + * Used to represent the durable subscriptions contained by the broker + * This is used to synchronize durable subs on bridge creation * * @openwire:marshaller code="92" * http://git-wip-us.apache.org/repos/asf/activemq/blob/eb9c584f/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java index 3c4a2a0..f3314cc 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; import javax.jms.MessageConsumer; import javax.jms.Session; @@ -32,12 +33,16 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.CommandTypes; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait.Condition; import org.junit.After; import org.junit.Assume; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -61,6 +66,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { private Session session1; private final FLOW flow; + @Rule + public Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS); + @Parameters public static Collection<Object[]> data() { return Arrays.asList(new Object[][] { @@ -209,6 +217,43 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { } + @Test + public void testSyncLoadTest() throws Exception { + String subName = this.subName; + //Create 1000 subs + for (int i = 0; i < 100; i++) { + for (int j = 0; j < 10; j++) { + session1.createDurableSubscriber(new ActiveMQTopic("include.test." + i), subName + i + j).close(); + } + } + for (int i = 0; i < 100; i++) { + assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 1); + } + + doTearDown(); + restartBroker(broker1, false); + + //with bridge off, remove 100 subs + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + removeSubscription(broker1, new ActiveMQTopic("include.test." + i), subName + i + j); + } + } + + //restart test that 900 are resynced and 100 are deleted + restartBrokers(true); + + for (int i = 0; i < 10; i++) { + assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 0); + } + + for (int i = 10; i < 100; i++) { + assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 1); + } + + } + + /** * Using an older version of openwire should not sync but the network bridge * should still start without error @@ -395,10 +440,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { localConnection.setClientID("clientId"); localConnection.start(); - if (startNetworkConnector) { // brokerService.setPlugins(new BrokerPlugin[] {new - // JavaRuntimeConfigurationPlugin()}); - // brokerService.setUseVirtualDestSubs(true); - // brokerService.setUseVirtualDestSubsOnCreation(isUsevirtualDestinationSubscriptionsOnCreation); + if (startNetworkConnector) { Wait.waitFor(new Condition() { @Override public boolean isSatisified() throws Exception { @@ -439,8 +481,12 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConnector) throws Exception { BrokerService brokerService = new BrokerService(); brokerService.setMonitorConnectionSplits(true); - brokerService.setDataDirectoryFile(dataDir); brokerService.setBrokerName("localBroker"); + brokerService.setDataDirectoryFile(dataDir); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setDirectory(dataDir); + adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name()); + brokerService.setPersistenceAdapter(adapter); if (startNetworkConnector) { brokerService.addNetworkConnector(configureLocalNetworkConnector()); @@ -477,10 +523,15 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { brokerService.setBrokerName("remoteBroker"); brokerService.setUseJmx(false); brokerService.setDataDirectoryFile(dataDir); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setDirectory(dataDir); + adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name()); + brokerService.setPersistenceAdapter(adapter); remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); - brokerService.addConnector("tcp://localhost:" + port + "?wireFormat.version=" + remoteBrokerWireFormatVersion); + //Need a larger cache size in order to handle all of the durables + brokerService.addConnector("tcp://localhost:" + port + "?wireFormat.cacheSize=2048&wireFormat.version=" + remoteBrokerWireFormatVersion); return brokerService; } http://git-wip-us.apache.org/repos/asf/activemq/blob/eb9c584f/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/BrokerSubscriptionInfoTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/BrokerSubscriptionInfoTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/BrokerSubscriptionInfoTest.java new file mode 100644 index 0000000..9075348 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/BrokerSubscriptionInfoTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.activemq.command.BrokerSubscriptionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BrokerSubscriptionInfoTest { + + static final Logger LOG = LoggerFactory.getLogger(BrokerSubscriptionInfoTest.class); + + + + @Test + public void testMarshalClientProperties() throws IOException { + // marshal object + OpenWireFormatFactory factory = new OpenWireFormatFactory(); + factory.setCacheEnabled(true); + OpenWireFormat wf = (OpenWireFormat)factory.createWireFormat(); + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + DataOutputStream ds = new DataOutputStream(buffer); + + ConsumerInfo info = new ConsumerInfo(); + info.setClientId("clientId"); + info.setConsumerId(new ConsumerId()); + + int size = 1000; + + + ConsumerInfo infos[] = new ConsumerInfo[size]; + for (int i = 0; i < infos.length; i++) { + infos[i] = info; + } + + BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo(); + bsi.setSubscriptionInfos(infos); + + wf.marshal(bsi, ds); + ds.close(); + + // unmarshal object and check that the properties are present. + ByteArrayInputStream in = new ByteArrayInputStream(buffer.toByteArray()); + DataInputStream dis = new DataInputStream(in); + BrokerSubscriptionInfo actual = (BrokerSubscriptionInfo) wf.unmarshal(dis); + + //assertTrue(actual instanceof BrokerSubscriptionInfo); + assertEquals(size, actual.getSubscriptionInfos().length); + } + + +}
