Author: tabish
Date: Thu Sep 29 16:02:49 2011
New Revision: 1177345

URL: http://svn.apache.org/viewvc?rev=1177345&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3516

Add fix along with a unit test to ensure it stays fixed.

Added:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
   (with props)
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1177345&r1=1177344&r2=1177345&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
 Thu Sep 29 16:02:49 2011
@@ -455,14 +455,6 @@ public class FailoverTransport implement
         this.maxCacheSize = maxCacheSize;
     }
 
-    /**
-     * @return Returns true if the command is one sent when a connection is
-     *         being closed.
-     */
-    private boolean isShutdownCommand(Command command) {
-        return (command != null && (command.isShutdownInfo() || command 
instanceof RemoveInfo));
-    }
-
     public void oneway(Object o) throws IOException {
 
         Command command = (Command) o;
@@ -471,22 +463,22 @@ public class FailoverTransport implement
 
             synchronized (reconnectMutex) {
 
-                if (isShutdownCommand(command) && connectedTransport.get() == 
null) {
+                if (command != null && connectedTransport.get() == null) {
                     if (command.isShutdownInfo()) {
-                        // Skipping send of ShutdownInfo command when not
-                        // connected.
+                        // Skipping send of ShutdownInfo command when not 
connected.
                         return;
-                    }
-                    if (command instanceof RemoveInfo || 
command.isMessageAck()) {
-                        // Simulate response to RemoveInfo command or ack (as 
it
-                        // will be stale)
+                    } else if (command instanceof RemoveInfo || 
command.isMessageAck()) {
+                        // Simulate response to RemoveInfo command or 
MessageAck (as it will be stale)
                         stateTracker.track(command);
-                        Response response = new Response();
-                        response.setCorrelationId(command.getCommandId());
-                        myTransportListener.onCommand(response);
+                       if (command.isResponseRequired()) {
+                               Response response = new Response();
+                               
response.setCorrelationId(command.getCommandId());
+                               myTransportListener.onCommand(response);
+                       }
                         return;
                     }
                 }
+
                 // Keep trying until the message is sent.
                 for (int i = 0; !disposed; i++) {
                     try {

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java?rev=1177345&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
 Thu Sep 29 16:02:49 2011
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.state.ConnectionStateTracker;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class FailoverTransportTest {
+
+    protected Transport transport;
+    protected FailoverTransport failoverTransport;
+    private int commandsReceived;
+
+       @Before
+       public void setUp() throws Exception {
+       }
+
+       @After
+       public void tearDown() throws Exception {
+        if (transport != null) {
+            transport.stop();
+        }
+    }
+
+       @Test(timeout=30000)
+       public void testCommandsIgnoredWhenOffline() throws Exception {
+               this.transport = createTransport();
+
+               assertNotNull(failoverTransport);
+
+               ConnectionStateTracker tracker = 
failoverTransport.getStateTracker();
+               assertNotNull(tracker);
+
+               ConnectionId id = new ConnectionId("1");
+               ConnectionInfo connection = new ConnectionInfo(id);
+
+               // Track a connection
+               tracker.track(connection);
+
+               try {
+                       this.transport.oneway(new RemoveInfo(new 
ConnectionId("1")));
+               } catch(Exception e) {
+                       fail("Should not have failed to remove this known 
connection");
+               }
+
+               try {
+                       this.transport.oneway(new RemoveInfo(new 
ConnectionId("2")));
+               } catch(Exception e) {
+                       fail("Should not have failed to remove this unknown 
connection");
+               }
+
+               this.transport.oneway(new MessageAck());
+               this.transport.oneway(new ShutdownInfo());
+       }
+
+       @Test(timeout=30000)
+       public void testResponsesSentWhenRequestForIgnoredCommands() throws 
Exception {
+               this.transport = createTransport();
+               assertNotNull(failoverTransport);
+               MessageAck ack = new MessageAck();
+               assertNotNull("Should have received a Response", 
this.transport.request(ack));
+               RemoveInfo info = new RemoveInfo(new ConnectionId("2"));
+               assertNotNull("Should have received a Response", 
this.transport.request(info));
+       }
+
+    protected Transport createTransport() throws Exception {
+       Transport transport = TransportFactory.connect(
+                       new URI("failover://(tcp://doesNotExist:1234)"));
+        transport.setTransportListener(new TransportListener() {
+
+            public void onCommand(Object command) {
+               commandsReceived++;
+            }
+
+            public void onException(IOException error) {
+            }
+
+            public void transportInterupted() {
+            }
+
+            public void transportResumed() {
+            }
+        });
+        transport.start();
+
+        this.failoverTransport = transport.narrow(FailoverTransport.class);
+
+        return transport;
+    }
+
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to