Repository: activemq Updated Branches: refs/heads/master 2eff835ee -> 01384c714
AMQ-6707 - fix destination filter delegate param, refactor-auto-gen method; jees Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/01384c71 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/01384c71 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/01384c71 Branch: refs/heads/master Commit: 01384c714dbe0405d876b93849e6fff5ec048bff Parents: 2eff835 Author: gtully <[email protected]> Authored: Fri May 18 14:44:05 2018 +0100 Committer: gtully <[email protected]> Committed: Fri May 18 14:44:05 2018 +0100 ---------------------------------------------------------------------- .../broker/region/DestinationFilter.java | 2 +- .../activemq/store/jdbc/XACompletionTest.java | 62 +++++++++++++++----- 2 files changed, 48 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/01384c71/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index dad8501..1897c23 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -380,7 +380,7 @@ public class DestinationFilter implements Destination { @Override public void clearPendingMessages(int pendingAdditionsCount) { - next.clearPendingMessages(0); + next.clearPendingMessages(pendingAdditionsCount); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/01384c71/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java index 50cb1c9..6aef533 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java @@ -21,14 +21,22 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQXAConnection; import org.apache.activemq.ActiveMQXAConnectionFactory; import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.filter.AnyDestination; +import org.apache.activemq.filter.DestinationMap; +import org.apache.activemq.jaas.GroupPrincipal; import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.security.AuthorizationPlugin; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.apache.activemq.security.SimpleAuthorizationMap; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.Wait; import org.apache.activemq.wireformat.WireFormat; @@ -73,12 +81,12 @@ public class XACompletionTest extends TestSupport { @Parameterized.Parameter public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice; - @Parameterized.Parameters(name="store={0}") + @Parameterized.Parameters(name = "store={0}") public static Iterable<Object[]> getTestParameters() { - return Arrays.asList(new Object[][]{ {TestSupport.PersistenceAdapterChoice.KahaDB},{PersistenceAdapterChoice.JDBC} }); + return Arrays.asList(new Object[][]{{TestSupport.PersistenceAdapterChoice.KahaDB}, {PersistenceAdapterChoice.JDBC}}); } - @Before + @Before public void setUp() throws Exception { broker = createBroker(); } @@ -293,7 +301,7 @@ public class XACompletionTest extends TestSupport { assertNotNull("message gone", browsed); LOG.info("Try receive... after"); - for (int i=0; i<10; i++) { + for (int i = 0; i < 10; i++) { Message message = regularReceive("TEST"); assertNotNull("message gone", message); } @@ -383,7 +391,7 @@ public class XACompletionTest extends TestSupport { LOG.info("Try receive... after rollback"); - for (int i=0;i<10; i++) { + for (int i = 0; i < 10; i++) { Message message = regularReceive("TEST"); assertNotNull("message gone: " + i, message); } @@ -464,6 +472,7 @@ public class XACompletionTest extends TestSupport { // set maxBatchSize=1 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + 1); + factory.setWatchTopicAdvisories(false); javax.jms.Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -472,6 +481,7 @@ public class XACompletionTest extends TestSupport { consumer.close(); ActiveMQConnectionFactory receiveFactory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0"); + receiveFactory.setWatchTopicAdvisories(false); // recover/rollback the second tx ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=0"); @@ -483,7 +493,7 @@ public class XACompletionTest extends TestSupport { xids = xaResource.recover(XAResource.TMSTARTRSCAN); xaResource.recover(XAResource.TMNOFLAGS); - for (int i=0; i< xids.length; i++) { + for (int i = 0; i < xids.length; i++) { xaResource.rollback(xids[i]); } @@ -523,7 +533,7 @@ public class XACompletionTest extends TestSupport { final Xid tid = createXid(); byte[] branch = tid.getBranchQualifier(); - final byte[] branch2 = Arrays.copyOf(branch, branch.length); + final byte[] branch2 = Arrays.copyOf(branch, branch.length); branch2[0] = '!'; Xid branchTid = new Xid() { @@ -639,7 +649,7 @@ public class XACompletionTest extends TestSupport { final Xid tid = createXid(); byte[] branch = tid.getBranchQualifier(); - final byte[] branch2 = Arrays.copyOf(branch, branch.length); + final byte[] branch2 = Arrays.copyOf(branch, branch.length); branch2[0] = '!'; Xid branchTid = new Xid() { @@ -771,7 +781,7 @@ public class XACompletionTest extends TestSupport { final Xid tid = createXid(); byte[] branch = tid.getBranchQualifier(); - final byte[] branch2 = Arrays.copyOf(branch, branch.length); + final byte[] branch2 = Arrays.copyOf(branch, branch.length); branch2[0] = '!'; Xid branchTid = new Xid() { @@ -870,6 +880,7 @@ public class XACompletionTest extends TestSupport { private Message regularReceive(String qName) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + factory.setWatchTopicAdvisories(false); return regularReceiveWith(factory, qName); } @@ -889,6 +900,7 @@ public class XACompletionTest extends TestSupport { private int drainUnack(int limit, String qName) throws Exception { int drained = 0; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + limit); + factory.setWatchTopicAdvisories(false); javax.jms.Connection connection = factory.createConnection(); try { connection.start(); @@ -897,7 +909,8 @@ public class XACompletionTest extends TestSupport { MessageConsumer consumer = session.createConsumer(destination); while (drained < limit && consumer.receive(2000) != null) { drained++; - }; + } + ; consumer.close(); } finally { connection.close(); @@ -921,6 +934,7 @@ public class XACompletionTest extends TestSupport { connection.close(); } } + protected void sendMessages(int messagesExpected) throws Exception { sendMessagesWith(factory, messagesExpected); } @@ -933,9 +947,9 @@ public class XACompletionTest extends TestSupport { MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); - for (int i=0; i<messagesExpected; i++) { - LOG.debug("Sending message " + (i+1) + " of " + messagesExpected); - producer.send(session.createTextMessage("test message " + (i+1))); + for (int i = 0; i < messagesExpected; i++) { + LOG.debug("Sending message " + (i + 1) + " of " + messagesExpected); + producer.send(session.createTextMessage("test message " + (i + 1))); } connection.close(); } @@ -950,9 +964,9 @@ public class XACompletionTest extends TestSupport { PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG, XID FROM ACTIVEMQ_MSGS"); ResultSet result = statement.executeQuery(); LOG.info("Messages in broker db..."); - while(result.next()) { + while (result.next()) { long id = result.getLong(1); - org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2))); + org.apache.activemq.command.Message message = (org.apache.activemq.command.Message) wireFormat.unmarshal(new ByteSequence(result.getBytes(2))); String xid = result.getString(3); LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", XID:" + xid + ", MSG: " + message); } @@ -985,6 +999,24 @@ public class XACompletionTest extends TestSupport { setPersistenceAdapter(broker, persistenceAdapterChoice); broker.setPersistent(true); connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); + + // ensure we run through a destination filter + final String id = "a"; + AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(); + SimpleAuthorizationMap map = new SimpleAuthorizationMap(); + DestinationMap destinationMap = new DestinationMap(); + GroupPrincipal anaGroup = new GroupPrincipal(id); + destinationMap.put(new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">")}), anaGroup); + map.setWriteACLs(destinationMap); + map.setAdminACLs(destinationMap); + map.setReadACLs(destinationMap); + authorizationPlugin.setMap(map); + SimpleAuthenticationPlugin simpleAuthenticationPlugin = new SimpleAuthenticationPlugin(); + simpleAuthenticationPlugin.setAnonymousAccessAllowed(true); + simpleAuthenticationPlugin.setAnonymousGroup(id); + simpleAuthenticationPlugin.setAnonymousUser(id); + + broker.setPlugins(new BrokerPlugin[]{simpleAuthenticationPlugin, authorizationPlugin}); broker.start(); return broker; }
