more openwire test fixing added a queuePrefetch config param to AddressSettings
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3189d659 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3189d659 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3189d659 Branch: refs/heads/refactor-openwire Commit: 3189d6592c67d1b2c589f15766beb5b1a954a293 Parents: 95f76e2 Author: Howard Gao <[email protected]> Authored: Wed Mar 2 20:55:37 2016 +0800 Committer: Clebert Suconic <[email protected]> Committed: Thu Mar 17 14:10:46 2016 -0400 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 2 +- .../openwire/OpenWireProtocolManager.java | 2 +- .../core/protocol/openwire/amq/AMQConsumer.java | 13 +++++++++ .../core/protocol/openwire/amq/AMQSession.java | 4 +++ .../core/settings/impl/AddressSettings.java | 26 +++++++++++++++++ .../artemiswrapper/ArtemisBrokerWrapper.java | 2 ++ .../transport/tcp/TcpTransportFactory.java | 9 +++++- .../activemq/ActiveMQConnectionFactoryTest.java | 5 +++- .../activemq/QueueConsumerPriorityTest.java | 1 + .../activemq/ZeroPrefetchConsumerTest.java | 30 +++++++++++--------- 10 files changed, 76 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 1e1e953..03871ab 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -388,7 +388,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se dispatchAsync(ce); } - protected void dispatch(Command command) throws IOException { + public void dispatch(Command command) throws IOException { this.physicalSend(command); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 7445960..122788e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -524,7 +524,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl public void sendBrokerInfo(OpenWireConnection connection) throws Exception { BrokerInfo brokerInfo = new BrokerInfo(); - brokerInfo.setBrokerName(server.getIdentity()); + brokerInfo.setBrokerName(getBrokerName()); brokerInfo.setBrokerId(new BrokerId("" + server.getNodeID())); brokerInfo.setPeerBrokerInfos(null); brokerInfo.setFaultTolerantConfiguration(false); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 221679f..7c2a9bd 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -34,7 +34,9 @@ import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; @@ -136,6 +138,17 @@ public class AMQConsumer implements BrowserListener { else { SimpleString queueName = new SimpleString("jms.queue." + this.actualDest.getPhysicalName()); coreSession.createConsumer(nativeId, queueName, selector, info.isBrowser(), false, -1); + AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(queueName.toString()); + if (addrSettings != null) { + //see PolicyEntry + if (prefetchSize != 0 && addrSettings.getQueuePrefetch() == 0) { + //sends back a ConsumerControl + ConsumerControl cc = new ConsumerControl(); + cc.setConsumerId(info.getConsumerId()); + cc.setPrefetch(0); + session.getConnection().dispatch(cc); + } + } } if (info.isBrowser()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index b68861e..e3d2266 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -542,4 +542,8 @@ public class AMQSession implements SessionCallback { } } } + + public OpenWireConnection getConnection() { + return connection; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 3309fab..4b53ec6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -70,6 +70,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable public static final SlowConsumerPolicy DEFAULT_SLOW_CONSUMER_POLICY = SlowConsumerPolicy.NOTIFY; + public static final int DEFAULT_QUEUE_PREFETCH = 1000; + private AddressFullMessagePolicy addressFullMessagePolicy = null; private Long maxSizeBytes = null; @@ -114,6 +116,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE; + //from amq5 + //make it transient + private transient Integer queuePrefetch = null; + public AddressSettings(AddressSettings other) { this.addressFullMessagePolicy = other.addressFullMessagePolicy; this.maxSizeBytes = other.maxSizeBytes; @@ -137,6 +143,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable this.autoCreateJmsQueues = other.autoCreateJmsQueues; this.autoDeleteJmsQueues = other.autoDeleteJmsQueues; this.managementBrowsePageSize = other.managementBrowsePageSize; + this.queuePrefetch = other.queuePrefetch; } public AddressSettings() { @@ -333,6 +340,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable return this; } + public int getQueuePrefetch() { + return queuePrefetch != null ? queuePrefetch : AddressSettings.DEFAULT_QUEUE_PREFETCH; + } + + public AddressSettings setQueuePrefetch(int queuePrefetch) { + this.queuePrefetch = queuePrefetch; + return this; + } + /** * merge 2 objects in to 1 * @@ -403,6 +419,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable if (managementBrowsePageSize == null) { managementBrowsePageSize = merged.managementBrowsePageSize; } + if (queuePrefetch == null) { + queuePrefetch = merged.queuePrefetch; + } } @Override @@ -569,6 +588,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode()); result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode()); result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode()); + result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode()); return result; } @@ -718,6 +738,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable } else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize)) return false; + if (queuePrefetch == null) { + if (other.queuePrefetch != null) + return false; + } + else if (!queuePrefetch.equals(other.queuePrefetch)) + return false; return true; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index 112d425..94faf26 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -210,6 +210,8 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); } } + int queuePrefetch = entry.getQueuePrefetch(); + settings.setQueuePrefetch(queuePrefetch); } PolicyEntry defaultEntry = policyMap.getDefaultEntry(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index c44dd72..c0ed126 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -52,10 +52,17 @@ public class TcpTransportFactory extends TransportFactory { //here check broker, if no broker, we start one Map<String, String> params = URISupport.parseParameters(location); String brokerId = params.remove("invmBrokerId"); + boolean autoCreate = true; + String create = params.remove("create"); + if (create != null) + { + autoCreate = "true".equals(create); + } + URI location1 = URISupport.createRemainingURI(location, Collections.EMPTY_MAP); LOG.info("deciding whether starting an internal broker: " + brokerService + " flag: " + BrokerService.disableWrapper); - if (brokerService == null && !BrokerService.disableWrapper && BrokerService.checkPort(location1.getPort())) { + if (autoCreate && brokerService == null && !BrokerService.disableWrapper && BrokerService.checkPort(location1.getPort())) { LOG.info("starting internal broker: " + location1); ArtemisBrokerHelper.startArtemisBroker(location1); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java index e1ea7e6..8769324 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java @@ -110,6 +110,7 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport { connection.close(); } + //we don't support in-vm connector (will we?) public void testCreateVMConnectionWithEmbdeddBroker() throws URISyntaxException, JMSException { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://myBroker2?broker.persistent=false"); // Make sure the broker is not created until the connection is @@ -124,7 +125,9 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport { connection.close(); // Verify the broker was destroyed. - assertNull(BrokerRegistry.getInstance().lookup("myBroker2")); + //I comment out this because this is pure client behavior in + //amq5. there shouldn't be any use-case like that with Artemis. + //assertNull(BrokerRegistry.getInstance().lookup("myBroker2")); } public void testGetBrokerName() throws URISyntaxException, JMSException { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java index 296f52b..4ae2feb 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.transport.tcp.TcpTransportFactory; +//https://issues.apache.org/jira/browse/ARTEMIS-196 public class QueueConsumerPriorityTest extends TestCase { private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true"; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3189d659/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java index 953032b..a9a564b 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java @@ -26,10 +26,11 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ConsumerControl; @@ -349,8 +350,10 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize()); // verify sub view broker - Subscription sub = broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0); - assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize()); + // I comment out this because it checks broker internal + // which doesn't apply to artemis broker. + //Subscription sub = broker.getRegionBroker().getDestinationMap().get(ActiveMQDestination.transform(brokerZeroQueue)).getConsumers().get(0); + //assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize()); // manipulate Prefetch (like failover and stomp) ConsumerControl consumerControl = new ConsumerControl(); @@ -361,18 +364,17 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl); assertTrue("good request", !(reply instanceof ExceptionResponse)); assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize()); - assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize()); } @Override - protected BrokerService createBroker() throws Exception { - BrokerService brokerService = super.createBroker(); - PolicyMap policyMap = new PolicyMap(); - PolicyEntry zeroPrefetchPolicy = new PolicyEntry(); - zeroPrefetchPolicy.setQueuePrefetch(0); - policyMap.put(ActiveMQDestination.transform(brokerZeroQueue), zeroPrefetchPolicy); - brokerService.setDestinationPolicy(policyMap); - return brokerService; + public EmbeddedJMS createArtemisBroker() throws Exception { + Configuration config0 = createConfig("localhost", 0); + String coreQueueAddress = "jms.queue." + brokerZeroQueue.getQueueName(); + AddressSettings addrSettings = new AddressSettings(); + addrSettings.setQueuePrefetch(0); + config0.getAddressesSettings().put(coreQueueAddress, addrSettings); + EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + return newbroker; } @Override
