Repository: activemq
Updated Branches:
  refs/heads/master b6521e292 -> 864192855


[AMQ-6603] ensure failover does not track consumer creation that fails with an 
exception, fix and test. Thanks for the test Tadayoshi Sato


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/86419285
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/86419285
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/86419285

Branch: refs/heads/master
Commit: 8641928553e0f1d97416fd010c2f7d6f165b3660
Parents: b6521e2
Author: gtully <gary.tu...@gmail.com>
Authored: Tue Feb 28 17:17:18 2017 +0000
Committer: gtully <gary.tu...@gmail.com>
Committed: Tue Feb 28 17:17:18 2017 +0000

----------------------------------------------------------------------
 .../activemq/state/ConnectionStateTracker.java  |  18 ++
 activemq-pool/pom.xml                           |   5 +
 .../activemq/pool/PooledConsumerTest.java       | 223 +++++++++++++++++++
 3 files changed, 246 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/86419285/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
 
b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
index 8fdcd77..2caab8f 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
@@ -108,6 +108,21 @@ public class ConnectionStateTracker extends 
CommandVisitorAdapter {
         }
     }
 
+    private final class ExceptionResponseCheckAction implements 
ResponseHandler {
+        private final ConsumerInfo info;
+
+        public ExceptionResponseCheckAction(ConsumerInfo info) {
+            this.info = info;
+        }
+
+        @Override
+        public void onResponse(Command response) {
+            if (ExceptionResponse.DATA_STRUCTURE_TYPE == 
response.getDataStructureType()) {
+                processRemoveConsumer(info.getConsumerId(), 0l);
+            }
+        }
+    }
+
     private class PrepareReadonlyTransactionAction extends 
RemoveTransactionAction {
         public PrepareReadonlyTransactionAction(TransactionInfo info) {
             super(info);
@@ -415,6 +430,9 @@ public class ConnectionStateTracker extends 
CommandVisitorAdapter {
                         SessionState ss = cs.getSessionState(sessionId);
                         if (ss != null) {
                             ss.addConsumer(info);
+                            if (info.isResponseRequired()) {
+                                return new Tracked(new 
ExceptionResponseCheckAction(info));
+                            }
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/86419285/activemq-pool/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-pool/pom.xml b/activemq-pool/pom.xml
index edb2286..305bf0c 100644
--- a/activemq-pool/pom.xml
+++ b/activemq-pool/pom.xml
@@ -104,6 +104,11 @@
       <artifactId>log4j</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>activemq-jaas</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   
   <profiles>

http://git-wip-us.apache.org/repos/asf/activemq/blob/86419285/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConsumerTest.java 
b/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConsumerTest.java
new file mode 100644
index 0000000..4311f8e
--- /dev/null
+++ 
b/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConsumerTest.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pool;
+
+import com.google.common.base.Strings;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.filter.DestinationMap;
+import org.apache.activemq.jaas.GroupPrincipal;
+import org.apache.activemq.security.AuthenticationUser;
+import org.apache.activemq.security.AuthorizationPlugin;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.apache.activemq.security.SimpleAuthorizationMap;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PooledConsumerTest {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PooledConsumerTest.class);
+
+    public static final String USERNAME = "test";
+    public static final String PASSWORD = "test";
+    private static final ActiveMQQueue QUEUE = new ActiveMQQueue("TEST");
+
+    BrokerService brokerService;
+
+
+    class PooledConsumer implements MessageListener {
+
+        private ConnectionFactory factory;
+        private Connection connection;
+        public boolean done = false;
+
+        public PooledConsumer(String url) throws JMSException {
+            org.apache.activemq.pool.PooledConnectionFactory factory = new 
org.apache.activemq.pool.PooledConnectionFactory(url);
+            factory.setMaxConnections(5);
+            factory.setIdleTimeout(0);
+            this.factory = factory;
+            init();
+        }
+
+        private void init() throws JMSException {
+            if (connection != null) {
+                close();
+            }
+            connection = factory.createConnection(USERNAME, PASSWORD);
+            connection.start();
+        }
+
+        public void listen() {
+            Session session = null;
+            MessageConsumer consumer = null;
+            boolean success = true;
+            while (!done) {
+                try {
+                    session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                    consumer = session.createConsumer(QUEUE);
+                    onMessage(consumer.receive());
+                    success = true;
+                } catch (JMSException e) {
+                    LOGGER.info(e.getMessage());
+                    success = false;
+                } finally {
+                    try {
+                        if (consumer != null) consumer.close();
+                        if (session != null) session.close();
+                        if (!success) init();
+                    } catch (JMSException ignore) {
+                        ignore.printStackTrace();
+                    }
+                }
+                sleep(50);
+            }
+        }
+
+        private void sleep(long milliseconds) {
+            try {
+                TimeUnit.MILLISECONDS.sleep(milliseconds);
+            } catch (InterruptedException e) {
+            }
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            if (message != null) {
+                TextMessage textMessage = (TextMessage) message;
+                try {
+                    String response = textMessage.getText();
+                    LOGGER.info(Strings.repeat("=", 50));
+                    LOGGER.info("Received: '{}'", response);
+                    LOGGER.info(Strings.repeat("=", 50));
+                } catch (Exception e) {
+                    LOGGER.error(e.getMessage(), e);
+                }
+            }
+        }
+
+        public void close() {
+            try {
+                if (connection != null) {
+                    connection.close();
+                }
+            } catch (JMSException e) {
+            }
+        }
+
+        public void done() {
+            done = true;
+            close();
+        }
+    }
+
+    public void startBroker(String group, String trasport) throws Exception {
+        brokerService = new BrokerService();
+        brokerService.addConnector(trasport);
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.setAdvisorySupport(false);
+        brokerService.setDestinations(new ActiveMQDestination[]{QUEUE});
+
+        List<AuthenticationUser> users = new ArrayList<>();
+        users.add(new AuthenticationUser("test", "test", group));
+        SimpleAuthenticationPlugin authenticationPlugin = new 
SimpleAuthenticationPlugin(users);
+
+        AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin();
+        SimpleAuthorizationMap simpleAuthorizationMap = new 
SimpleAuthorizationMap();
+        DestinationMap readAcls = new DestinationMap();
+        GroupPrincipal USERS = new GroupPrincipal("users");
+
+        readAcls.put(QUEUE, USERS);
+        simpleAuthorizationMap.setReadACLs(readAcls);
+        authorizationPlugin.setMap(simpleAuthorizationMap);
+        BrokerPlugin[] plugins = new BrokerPlugin[]{authenticationPlugin, 
authorizationPlugin};
+        brokerService.setPlugins(plugins);
+
+        brokerService.start();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testFailedConsumerNotRetainedByFailover() throws Exception {
+        startBroker("test", "tcp://0.0.0.0:0");
+        String url = 
brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+        final PooledConsumer consumer = new PooledConsumer("failover:(" + 
brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString()
 + ")?jms.watchTopicAdvisories=false");
+
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                consumer.listen();
+            }
+        });
+
+        assertTrue("5 connectons - pool fils up", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 5 == 
brokerService.getTransportConnectorByScheme("tcp").getConnections().size();
+            }
+        }));
+
+        stopBroker();
+
+        // with perms
+        startBroker("users", url);
+
+        assertTrue("5 reconnections from the pool", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 5 == 
brokerService.getTransportConnectorByScheme("tcp").getConnections().size();
+            }
+        }));
+
+        assertEquals("one consumer", 1, 
brokerService.getRegionBroker().getDestinationMap().get(QUEUE).getConsumers().size());
+
+        consumer.done();
+        executorService.shutdownNow();
+    }
+
+}
+

Reply via email to