Repository: activemq
Updated Branches:
  refs/heads/trunk dc607bbf3 -> 2360fb859


Fix for AMQ-5073, updated AmqpNioSslTransport.java to propery handle frames.  
Also fixed bugs in amqp test, as seen in AMQ-5062


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2360fb85
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2360fb85
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2360fb85

Branch: refs/heads/trunk
Commit: 2360fb859694bacac1e48092e53a56b388e1d2f0
Parents: dc607bb
Author: Kevin Earls <[email protected]>
Authored: Fri Feb 28 10:58:15 2014 +0100
Committer: Kevin Earls <[email protected]>
Committed: Fri Feb 28 10:58:15 2014 +0100

----------------------------------------------------------------------
 activemq-amqp/pom.xml                           |  13 ++
 .../transport/amqp/AmqpNioSslTransport.java     | 149 ++++++++++++++++---
 .../transport/amqp/AmqpTestSupport.java         |  83 ++++++++++-
 .../transport/amqp/JMSClientNioPlusSslTest.java |  33 ++++
 .../transport/amqp/JMSClientNioTest.java        |   2 +-
 .../transport/amqp/JMSClientSslTest.java        |  54 +++++++
 .../activemq/transport/amqp/JMSClientTest.java  |  19 ++-
 .../amqp/joram/JoramJmsNioPlusSslTest.java      |   1 -
 8 files changed, 320 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml
index 56990f4..77d41ec 100644
--- a/activemq-amqp/pom.xml
+++ b/activemq-amqp/pom.xml
@@ -160,6 +160,19 @@
               </execution>
             </executions>
           </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <forkCount>1</forkCount>
+                    <reuseForks>false</reuseForks>
+                    <surefire.argLine>-Xmx512M 
-Djava.awt.headless=true</surefire.argLine>
+                    <runOrder>alphabetical</runOrder>
+                    
<forkedProcessTimeoutInSeconds>120</forkedProcessTimeoutInSeconds>
+                    <includes>
+                        <include>**/*Test.*</include>
+                    </includes>
+                </configuration>
+            </plugin>
         </plugins>
       </build>
     </profile>

http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
index ee0dc78..76e6f64 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
@@ -16,21 +16,26 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.SocketFactory;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
-import javax.net.SocketFactory;
-
-import org.apache.activemq.transport.nio.NIOSSLTransport;
-import org.apache.activemq.wireformat.WireFormat;
-import org.fusesource.hawtbuf.Buffer;
-
 public class AmqpNioSslTransport extends NIOSSLTransport {
-
-    private final ByteBuffer magic = ByteBuffer.allocate(8);
+    private DataInputStream amqpHeaderValue = new DataInputStream(new 
ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'}));
+    public final Integer AMQP_HEADER_VALUE = amqpHeaderValue.readInt();
+    private static final Logger LOG = 
LoggerFactory.getLogger(AmqpNioSslTransport.class);
+    private boolean magicConsumed = false;
 
     public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory 
socketFactory, URI remoteLocation, URI localLocation) throws 
UnknownHostException, IOException {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -50,27 +55,131 @@ public class AmqpNioSslTransport extends NIOSSLTransport {
 
     @Override
     protected void processCommand(ByteBuffer plain) throws Exception {
+        // Are we waiting for the next Command or are we building on the 
current one?  The
+        // frame size is in the first 4 bytes.
+        if (nextFrameSize == -1) {
+            // We can get small packets that don't give us enough for the 
frame size
+            // so allocate enough for the initial size value and
+            if (plain.remaining() < 4) {
+                if (currentBuffer == null) {
+                    currentBuffer = ByteBuffer.allocate(4);
+                }
+
+                // Go until we fill the integer sized current buffer.
+                while (currentBuffer.hasRemaining() && plain.hasRemaining()) {
+                    currentBuffer.put(plain.get());
+                }
+
+                // Didn't we get enough yet to figure out next frame size.
+                if (currentBuffer.hasRemaining()) {
+                    return;
+                } else {
+                    currentBuffer.flip();
+                    nextFrameSize = currentBuffer.getInt();
+                }
+            } else {
+                // Either we are completing a previous read of the next frame 
size or its
+                // fully contained in plain already.
+                if (currentBuffer != null) {
+                    // Finish the frame size integer read and get from the 
current buffer.
+                    while (currentBuffer.hasRemaining()) {
+                        currentBuffer.put(plain.get());
+                    }
 
-        byte[] fill = new byte[plain.remaining()];
-        plain.get(fill);
+                    currentBuffer.flip();
+                    nextFrameSize = currentBuffer.getInt();
+                } else {
+                    nextFrameSize = plain.getInt();
+                }
+            }
+        }
 
-        ByteBuffer payload = ByteBuffer.wrap(fill);
+        // There are three possibilities when we get here.  We could have a 
partial frame,
+        // a full frame, or more than 1 frame
+        while (true) {
+            LOG.debug("Entering while loop with plain.position {} remaining {} 
", plain.position(), plain.remaining());
+            // handle headers, which start with 'A','M','Q','P' rather than 
size
+            if (nextFrameSize == AMQP_HEADER_VALUE) {
+                nextFrameSize = handleAmqpHeader(plain);
+                if (nextFrameSize == -1) {
+                    return;
+                }
+            }
 
-        if (magic.position() != 8) {
+            validateFrameSize(nextFrameSize);
 
-            while (payload.hasRemaining() && magic.position() < 8) {
-                magic.put(payload.get());
+            // now we have the data, let's reallocate and try to fill it,  
(currentBuffer.putInt() is called
+            // because we need to put back the 4 bytes we read to determine 
the size)
+            currentBuffer = ByteBuffer.allocate(nextFrameSize );
+            currentBuffer.putInt(nextFrameSize);
+            if (currentBuffer.remaining() >= plain.remaining()) {
+                currentBuffer.put(plain);
+            } else {
+                byte[] fill = new byte[currentBuffer.remaining()];
+                plain.get(fill);
+                currentBuffer.put(fill);
             }
 
-            if (!magic.hasRemaining()) {
-                magic.flip();
-                doConsume(new AmqpHeader(new Buffer(magic)));
-                magic.position(8);
+            // Either we have enough data for a new command or we have to wait 
for some more.  If hasRemaining is true,
+            // we have not filled the buffer yet, i.e. we haven't received the 
full frame.
+            if (currentBuffer.hasRemaining()) {
+                return;
+            } else {
+                currentBuffer.flip();
+                LOG.debug("Calling doConsume with position {} limit {}", 
currentBuffer.position(), currentBuffer.limit());
+                doConsume(AmqpSupport.toBuffer(currentBuffer));
+
+                // Determine if there are more frames to process
+                if (plain.hasRemaining()) {
+                    if (plain.remaining() < 4) {
+                        nextFrameSize = 4;
+                    } else {
+                        nextFrameSize = plain.getInt();
+                    }
+                } else {
+                    nextFrameSize = -1;
+                    currentBuffer = null;
+                    return;
+                }
             }
         }
+    }
+
+    private void validateFrameSize(int frameSize) throws IOException {
+        if (nextFrameSize > AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE) {
+            throw new IOException("Frame size of " + nextFrameSize +
+                    "larger than max allowed " + 
AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE);
+        }
+    }
+
+    private int handleAmqpHeader(ByteBuffer plain) {
+        int nextFrameSize;
 
-        if (payload.hasRemaining()) {
-            doConsume(AmqpSupport.toBuffer(payload));
+        LOG.debug("Consuming AMQP_HEADER");
+        currentBuffer = ByteBuffer.allocate(8);
+        currentBuffer.putInt(AMQP_HEADER_VALUE);
+        while (currentBuffer.hasRemaining()) {
+            currentBuffer.put(plain.get());
         }
+        currentBuffer.flip();
+        if (!magicConsumed) {   // The first case we see is special and has to 
be handled differently
+            doConsume(new AmqpHeader(new Buffer(currentBuffer)));
+            magicConsumed = true;
+        } else {
+            doConsume(AmqpSupport.toBuffer(currentBuffer));
+        }
+
+        if (plain.hasRemaining()) {
+            if (plain.remaining() < 4) {
+                nextFrameSize = 4;
+            } else {
+                nextFrameSize = plain.getInt();
+            }
+        } else {
+            nextFrameSize = -1;
+            currentBuffer = null;
+        }
+        return nextFrameSize;
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index 23c895e..a44874f 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -20,6 +20,12 @@ import java.io.File;
 import java.security.SecureRandom;
 import java.util.Set;
 import java.util.Vector;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -68,7 +74,20 @@ public class AmqpTestSupport {
     @Before
     public void setUp() throws Exception {
         exceptions.clear();
-        startBroker();
+        if (killHungThreads("setUp")) {
+            LOG.warn("HUNG THREADS in setUp");
+        }
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        Future<Boolean> future = executor.submit(new SetUpTask());
+        try {
+            LOG.debug("SetUpTask started.");
+            Boolean result =  future.get(60, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            throw new Exception("startBroker timed out");
+        }
+        executor.shutdownNow();
+
         this.numberOfMessages = 2000;
     }
 
@@ -130,16 +149,51 @@ public class AmqpTestSupport {
     }
 
     public void stopBroker() throws Exception {
+        LOG.debug("entering AmqpTestSupport.stopBroker");
         if (brokerService != null) {
             brokerService.stop();
             brokerService.waitUntilStopped();
             brokerService = null;
         }
+        LOG.debug("exiting AmqpTestSupport.stopBroker");
     }
 
     @After
     public void tearDown() throws Exception {
-        stopBroker();
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        Future<Boolean> future = executor.submit(new TearDownTask());
+        try {
+            LOG.debug("tearDown started.");
+            Boolean result =  future.get(60, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            throw new Exception("startBroker timed out");
+        }
+        executor.shutdownNow();
+
+        if (killHungThreads("tearDown")) {
+            LOG.warn("HUNG THREADS in setUp");
+        }
+    }
+
+    private boolean killHungThreads(String stage) throws Exception{
+        Thread.sleep(500);
+        if (Thread.activeCount() == 1) {
+            return false;
+        }
+        LOG.warn("Hung Thread(s) on {} entry threadCount {} ", stage, 
Thread.activeCount());
+
+        Thread[] threads = new Thread[Thread.activeCount()];
+        Thread.enumerate(threads);
+        for (int i=0; i < threads.length; i++) {
+            Thread t = threads[i];
+            if (!t.getName().equals("main")) {
+                LOG.warn("KillHungThreads: Interrupting thread {}", 
t.getName());
+                t.interrupt();
+            }
+        }
+
+        LOG.warn("Hung Thread on {} exit threadCount {} ", stage, 
Thread.activeCount());
+        return true;
     }
 
     public void sendMessages(Connection connection, Destination destination, 
int count) throws Exception {
@@ -191,4 +245,29 @@ public class AmqpTestSupport {
                 .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, 
true);
         return proxy;
     }
+
+    public class SetUpTask implements Callable<Boolean> {
+        private String testName;
+
+        @Override
+        public Boolean call() throws Exception {
+            LOG.debug("in SetUpTask.call, calling startBroker");
+            startBroker();
+
+            return Boolean.TRUE;
+        }
+    }
+
+    public class TearDownTask implements Callable<Boolean> {
+        private String testName;
+
+        @Override
+        public Boolean call() throws Exception {
+            LOG.debug("in TearDownTask.call(), calling stopBroker");
+            stopBroker();
+
+            return Boolean.TRUE;
+        }
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioPlusSslTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioPlusSslTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioPlusSslTest.java
new file mode 100644
index 0000000..d39b53c
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioPlusSslTest.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the JMS client when connected to the NIO+SSL transport.
+ */
+public class JMSClientNioPlusSslTest extends JMSClientSslTest {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(JMSClientNioPlusSslTest.class);
+
+    @Override
+    protected int getBrokerPort() {
+        LOG.debug("JMSClientNioPlusSslTest.getBrokerPort returning 
nioPlusSslPort {}", nioPlusSslPort);
+        return nioPlusSslPort;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java
index 7392482..68f93c3 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java
@@ -29,12 +29,12 @@ import java.io.DataInputStream;
 /**
  * Test the JMS client when connected to the NIO transport.
  */
-@Ignore
 public class JMSClientNioTest extends JMSClientTest {
     protected static final Logger LOG = 
LoggerFactory.getLogger(JMSClientNioTest.class);
 
     @Override
     protected int getBrokerPort() {
+        LOG.debug("JMSClientNioTest.getBrokerPort returning nioPort {}", 
nioPort);
         return nioPort;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java
new file mode 100644
index 0000000..2a7445b
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import java.security.SecureRandom;
+
+/**
+ * Test the JMS client when connected to the SSL transport.
+ */
+public class JMSClientSslTest extends JMSClientTest {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(JMSClientSslTest.class);
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        SSLContext ctx = SSLContext.getInstance("TLS");
+        ctx.init(new KeyManager[0], new TrustManager[]{new 
DefaultTrustManager()}, new SecureRandom());
+        SSLContext.setDefault(ctx);
+    }
+
+    @Override
+    protected Connection createConnection(String clientId, boolean 
syncPublish, boolean useSsl) throws JMSException {
+        LOG.debug("JMSClientSslTest.createConnection called with clientId {} 
syncPublish {} useSsl {}", clientId, syncPublish, useSsl);
+        return super.createConnection(clientId, syncPublish, true);
+    }
+
+    @Override
+    protected int getBrokerPort() {
+        LOG.debug("JMSClientSslTest.getBrokerPort returning sslPort {}", 
sslPort);
+        return sslPort;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index be5a92b..1c6bc79 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -57,7 +57,6 @@ import org.objectweb.jtests.jms.framework.TestConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Ignore
 public class JMSClientTest extends AmqpTestSupport {
     protected static final Logger LOG = 
LoggerFactory.getLogger(JMSClientTest.class);
     @Rule public TestName name = new TestName();
@@ -353,7 +352,7 @@ public class JMSClientTest extends AmqpTestSupport {
         }
     }
 
-    @Test(timeout=30000)
+    @Test(timeout=90000)
     public void testConsumerReceiveNoWaitThrowsWhenBrokerStops() throws 
Exception {
 
         Connection connection = createConnection();
@@ -377,7 +376,7 @@ public class JMSClientTest extends AmqpTestSupport {
         try {
             for (int i = 0; i < 10; ++i) {
                 consumer.receiveNoWait();
-                TimeUnit.SECONDS.sleep(1);
+                TimeUnit.MILLISECONDS.sleep(1000 + (i * 100));
             }
             fail("Should have thrown an IllegalStateException");
         } catch (Exception ex) {
@@ -385,7 +384,7 @@ public class JMSClientTest extends AmqpTestSupport {
         }
     }
 
-    @Test(timeout=30000)
+    @Test(timeout=60000)
     public void testConsumerReceiveTimedThrowsWhenBrokerStops() throws 
Exception {
 
         Connection connection = createConnection();
@@ -408,7 +407,7 @@ public class JMSClientTest extends AmqpTestSupport {
 
         try {
             for (int i = 0; i < 10; ++i) {
-                consumer.receive(1000);
+                consumer.receive(1000 + (i * 100));
             }
             fail("Should have thrown an IllegalStateException");
         } catch (Exception ex) {
@@ -753,15 +752,15 @@ public class JMSClientTest extends AmqpTestSupport {
     }
 
     private Connection createConnection() throws JMSException {
-        return createConnection(name.toString(), false);
+        return createConnection(name.toString(), false, false);
     }
 
     private Connection createConnection(boolean syncPublish) throws 
JMSException {
-        return createConnection(name.toString(), syncPublish);
+        return createConnection(name.toString(), syncPublish, false);
     }
 
     private Connection createConnection(String clientId) throws JMSException {
-        return createConnection(clientId, false);
+        return createConnection(clientId, false, false);
     }
 
     /**
@@ -773,11 +772,11 @@ public class JMSClientTest extends AmqpTestSupport {
         return port;
     }
 
-    private Connection createConnection(String clientId, boolean syncPublish) 
throws JMSException {
+    protected Connection createConnection(String clientId, boolean 
syncPublish, boolean useSsl) throws JMSException {
 
         int brokerPort = getBrokerPort();
         LOG.debug("Creating connection on port {}", brokerPort);
-        final ConnectionFactoryImpl factory = new 
ConnectionFactoryImpl("localhost", brokerPort, "admin", "password");
+        final ConnectionFactoryImpl factory = new 
ConnectionFactoryImpl("localhost", brokerPort, "admin", "password", null, 
useSsl);
 
         factory.setSyncPublish(syncPublish);
         factory.setTopicPrefix("topic://");

http://git-wip-us.apache.org/repos/asf/activemq/blob/2360fb85/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioPlusSslTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioPlusSslTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioPlusSslTest.java
index d1b63d8..168b077 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioPlusSslTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsNioPlusSslTest.java
@@ -48,7 +48,6 @@ import 
org.objectweb.jtests.jms.conform.topic.TemporaryTopicTest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Ignore
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
     //    TopicSessionTest.class,    // Hangs, see 
https://issues.apache.org/jira/browse/PROTON-154

Reply via email to