http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java new file mode 100644 index 0000000..0c20c0f --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java @@ -0,0 +1,184 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.proton.plug.test.minimalserver; + + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.ChannelGroupFuture; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.ResourceLeakDetector; +import io.netty.util.concurrent.GlobalEventExecutor; + +import org.proton.plug.AMQPServerConnectionContext; +import org.proton.plug.context.server.ProtonServerConnectionContextFactory; +import org.proton.plug.test.Constants; + +/** + * A Netty TCP Acceptor that supports SSL + * + * @author <a href="[email protected]">Andy Taylor</a> + * @author <a href="[email protected]">Tim Fox</a> + * @author <a href="[email protected]">Trustin Lee</a> + * @author <a href="[email protected]">Jeff Mesnil</a> + * @author <a href="[email protected]">Norman Maurer</a> + * @version $Rev$, $Date$ + */ +public class MinimalServer +{ + + static + { + // Disable resource leak detection for performance reasons by default + ResourceLeakDetector.setEnabled(false); + } + + private Class<? extends ServerChannel> channelClazz; + + private EventLoopGroup eventLoopGroup; + + private volatile ChannelGroup serverChannelGroup; + + private volatile ChannelGroup channelGroup; + + private ServerBootstrap bootstrap; + + private String host; + + private boolean sasl; + + // Constants.PORT is the default here + private int port; + + public synchronized void start(String host, int port, final boolean sasl) throws Exception + { + this.host = host; + this.port = port; + this.sasl = sasl; + + if (channelClazz != null) + { + // Already started + return; + } + + int threadsToUse = Runtime.getRuntime().availableProcessors() * 3; + channelClazz = NioServerSocketChannel.class; + eventLoopGroup = new NioEventLoopGroup(threadsToUse, new SimpleServerThreadFactory("simple-server", true, Thread.currentThread().getContextClassLoader())); + + bootstrap = new ServerBootstrap(); + bootstrap.group(eventLoopGroup); + bootstrap.channel(channelClazz); + + + ChannelInitializer<Channel> factory = new ChannelInitializer<Channel>() + { + @Override + public void initChannel(Channel channel) throws Exception + { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("amqp-handler", new ProtocolDecoder()); + } + }; + bootstrap.childHandler(factory); + + bootstrap.option(ChannelOption.SO_REUSEADDR, true). + childOption(ChannelOption.SO_REUSEADDR, true). + childOption(ChannelOption.SO_KEEPALIVE, true). +// childOption(ChannelOption.AUTO_READ, false). + childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + + channelGroup = new DefaultChannelGroup("hornetq-accepted-channels", GlobalEventExecutor.INSTANCE); + + serverChannelGroup = new DefaultChannelGroup("hornetq-acceptor-channels", GlobalEventExecutor.INSTANCE); + + + SocketAddress address; + address = new InetSocketAddress(host, port); + Channel serverChannel = bootstrap.bind(address).syncUninterruptibly().channel(); + serverChannelGroup.add(serverChannel); + + } + + class ProtocolDecoder extends ByteToMessageDecoder + { + + AMQPServerConnectionContext connection; + + + public ProtocolDecoder() + { + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception + { + super.channelActive(ctx); + connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new MinimalConnectionSPI(ctx.channel())); + //ctx.read(); + } + + @Override + protected void decode(final ChannelHandlerContext ctx, ByteBuf byteIn, List<Object> out) throws Exception + { + connection.inputBuffer(byteIn); + ctx.flush(); +// if (connection.capacity() > 0) +// { +// ctx.read(); +// } + } + } + + public synchronized void stop() + { + serverChannelGroup.close().awaitUninterruptibly(); + ChannelGroupFuture future = channelGroup.close().awaitUninterruptibly(); + } + + + public static void main(String[] arg) + { + MinimalServer server = new MinimalServer(); + try + { + server.start("127.0.0.1", Constants.PORT, true); + + + while (true) + { + Thread.sleep(360000000); + } + } + catch (Throwable e) + { + e.printStackTrace(); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java new file mode 100644 index 0000000..30a58bc --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java @@ -0,0 +1,219 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test.minimalserver; + +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.AMQPSessionContext; +import org.proton.plug.context.ProtonPlugSender; +import org.proton.plug.context.server.ProtonServerSessionContext; +import org.proton.plug.SASLResult; +import org.proton.plug.util.ProtonServerMessage; + +/** + * @author Clebert Suconic + */ + +public class MinimalSessionSPI implements AMQPSessionCallback +{ + + private SASLResult result; + ProtonServerSessionContext session; + + @Override + public void init(AMQPSessionContext session, SASLResult result) + { + this.session = (ProtonServerSessionContext) session; + this.result = result; + } + + @Override + public void start() + { + } + + static AtomicInteger tempQueueGenerator = new AtomicInteger(0); + + public String tempQueueName() + { + return "TempQueueName" + tempQueueGenerator.incrementAndGet(); + } + + @Override + public Object createSender(ProtonPlugSender plugSender, String queue, String filer, boolean browserOnly) + { + Consumer consumer = new Consumer(DumbServer.getQueue(queue)); + return consumer; + } + + @Override + public void startSender(Object brokerConsumer) + { + ((Consumer) brokerConsumer).start(); + } + + @Override + public void createTemporaryQueue(String queueName) + { + + } + + @Override + public void onFlowConsumer(Object consumer, int credits) + { + } + + @Override + public boolean queueQuery(String queueName) + { + return true; + } + + @Override + public void closeSender(Object brokerConsumer) + { + ((Consumer) brokerConsumer).close(); + } + + @Override + public ProtonJMessage encodeMessage(Object message, int deliveryCount) + { + // We are storing internally as EncodedMessage on this minimalserver server + return (ProtonServerMessage) message; + } + + @Override + public Binary getCurrentTXID() + { + return new Binary(new byte[]{1}); + } + + @Override + public void commitCurrentTX() + { + } + + @Override + public void rollbackCurrentTX() + { + } + + @Override + public void close() + { + + } + + @Override + public void ack(Object brokerConsumer, Object message) + { + + } + + @Override + public void cancel(Object brokerConsumer, Object message, boolean updateCounts) + { + + } + + @Override + public void resumeDelivery(Object consumer) + { + System.out.println("Resume delivery!!!"); + ((Consumer) consumer).start(); + } + + @Override + public void serverSend(Receiver receiver, Delivery delivery, String address, int messageFormat, ByteBuf buffer) + { + ProtonServerMessage serverMessage = new ProtonServerMessage(); + serverMessage.decode(buffer.nioBuffer()); + + BlockingDeque<Object> queue = DumbServer.getQueue(address); + queue.add(serverMessage); + } + + + class Consumer + { + final BlockingDeque<Object> queue; + + Consumer(BlockingDeque<Object> queue) + { + this.queue = queue; + } + + boolean running = false; + volatile Thread thread; + + public void close() + { + System.out.println("Closing!!!"); + running = false; + if (thread != null) + { + try + { + thread.join(); + } + catch (Throwable ignored) + { + } + } + + thread = null; + } + + public synchronized void start() + { + running = true; + if (thread == null) + { + System.out.println("Start!!!"); + thread = new Thread() + { + public void run() + { + try + { + while (running) + { + Object msg = queue.poll(1, TimeUnit.SECONDS); + + if (msg != null) + { + session.serverDelivery(msg, Consumer.this, 1); + } + } + } + catch (Exception e) + { + e.printStackTrace(); + } + } + }; + thread.start(); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java new file mode 100644 index 0000000..d33c235 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/SimpleServerThreadFactory.java @@ -0,0 +1,90 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.proton.plug.test.minimalserver; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A HornetQThreadFactory + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + */ +public final class SimpleServerThreadFactory implements ThreadFactory +{ + private final ThreadGroup group; + + private final AtomicInteger threadCount = new AtomicInteger(0); + + private final int threadPriority; + + private final boolean daemon; + + private final ClassLoader tccl; + + public SimpleServerThreadFactory(final String groupName, final boolean daemon, final ClassLoader tccl) + { + group = new ThreadGroup(groupName + "-" + System.identityHashCode(this)); + + this.threadPriority = Thread.NORM_PRIORITY; + + this.tccl = tccl; + + this.daemon = daemon; + } + + public Thread newThread(final Runnable command) + { + final Thread t; + // attach the thread to a group only if there is no security manager: + // when sandboxed, the code does not have the RuntimePermission modifyThreadGroup + if (System.getSecurityManager() == null) + { + t = new Thread(group, command, "Thread-" + threadCount.getAndIncrement() + " (" + group.getName() + ")"); + } + else + { + t = new Thread(command, "Thread-" + threadCount.getAndIncrement()); + } + + AccessController.doPrivileged(new PrivilegedAction<Object>() + { + public Object run() + { + t.setDaemon(daemon); + t.setPriority(threadPriority); + return null; + } + }); + + try + { + AccessController.doPrivileged(new PrivilegedAction<Object>() + { + public Object run() + { + t.setContextClassLoader(tccl); + return null; + } + }); + } + catch (java.security.AccessControlException e) + { + e.printStackTrace(); + } + + return t; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java new file mode 100644 index 0000000..be615c4 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/sasl/PlainSASLTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test.sasl; + +import org.junit.Assert; +import org.junit.Test; +import org.proton.plug.sasl.ClientSASLPlain; +import org.proton.plug.sasl.PlainSASLResult; +import org.proton.plug.sasl.ServerSASLPlain; + +/** + * @author Clebert Suconic + */ + +public class PlainSASLTest +{ + @Test + public void testPlain() + { + ClientSASLPlain plainSASL = new ClientSASLPlain("user-me", "password-secret"); + byte[] bytesResult = plainSASL.getBytes(); + + ServerSASLPlain serverSASLPlain = new ServerSASLPlain(); + PlainSASLResult result = (PlainSASLResult) serverSASLPlain.processSASL(bytesResult); + Assert.assertEquals("user-me", result.getUser()); + Assert.assertEquals("password-secret", result.getPassword()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java new file mode 100644 index 0000000..e1f0744 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/CreditsSemaphoreTest.java @@ -0,0 +1,149 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Test; +import org.proton.plug.util.CreditsSemaphore; + +/** + * @author Clebert Suconic + */ + +public class CreditsSemaphoreTest +{ + final CreditsSemaphore semaphore = new CreditsSemaphore(10); + + final AtomicInteger errors = new AtomicInteger(0); + + final AtomicInteger acquired = new AtomicInteger(0); + + final CountDownLatch waiting = new CountDownLatch(1); + + Thread thread = new Thread() + { + public void run() + { + try + { + for (int i = 0; i < 12; i++) + { + if (!semaphore.tryAcquire()) + { + waiting.countDown(); + semaphore.acquire(); + } + acquired.incrementAndGet(); + } + } + catch (Throwable e) + { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + + + @Test + public void testSetAndRelease() throws Exception + { + thread.start(); + + // 5 seconds would be an eternity here + Assert.assertTrue(waiting.await(5, TimeUnit.SECONDS)); + + Assert.assertEquals(0, semaphore.getCredits()); + + long timeout = System.currentTimeMillis() + 1000; + while (!semaphore.hasQueuedThreads() && System.currentTimeMillis() < timeout) + { + Thread.sleep(10); + } + + Assert.assertTrue(semaphore.hasQueuedThreads()); + + semaphore.setCredits(2); + + thread.join(); + + Assert.assertEquals(12, acquired.get()); + + Assert.assertFalse(semaphore.hasQueuedThreads()); + } + + @Test + public void testDownAndUp() throws Exception + { + thread.start(); + + // 5 seconds would be an eternity here + Assert.assertTrue(waiting.await(5, TimeUnit.SECONDS)); + + Assert.assertEquals(0, semaphore.getCredits()); + + long timeout = System.currentTimeMillis() + 1000; + while (!semaphore.hasQueuedThreads() && System.currentTimeMillis() < timeout) + { + Thread.sleep(10); + } + + Assert.assertTrue(semaphore.hasQueuedThreads()); + + semaphore.release(2); + + thread.join(); + + Assert.assertEquals(12, acquired.get()); + + Assert.assertFalse(semaphore.hasQueuedThreads()); + } + + + @Test + public void testStartedZeroedSetLater() throws Exception + { + semaphore.setCredits(0); + + thread.start(); + + // 5 seconds would be an eternity here + Assert.assertTrue(waiting.await(5, TimeUnit.SECONDS)); + + Assert.assertEquals(0, semaphore.getCredits()); + + long timeout = System.currentTimeMillis() + 1000; + while (!semaphore.hasQueuedThreads() && System.currentTimeMillis() < timeout) + { + Thread.sleep(10); + } + + Assert.assertTrue(semaphore.hasQueuedThreads()); + + Assert.assertEquals(0, acquired.get()); + + semaphore.setCredits(12); + + thread.join(); + + Assert.assertEquals(12, acquired.get()); + + Assert.assertFalse(semaphore.hasQueuedThreads()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java new file mode 100644 index 0000000..5d44c88 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/ReusableLatchTest.java @@ -0,0 +1,335 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test.util; + +import java.util.concurrent.CountDownLatch; + +import org.junit.Assert; +import org.junit.Test; +import org.proton.plug.util.ReusableLatch; + +/** + * @author <a href="mailto:[email protected]">Clebert Suconic</a> + */ +public class ReusableLatchTest +{ + @Test + public void testLatchWithParameterizedDown() throws Exception + { + ReusableLatch latch = new ReusableLatch(1000); + + latch.countDown(5000); + + Assert.assertTrue(latch.await(1000)); + + + Assert.assertEquals(0, latch.getCount()); + } + + @Test + public void testLatchOnSingleThread() throws Exception + { + ReusableLatch latch = new ReusableLatch(); + + for (int i = 1; i <= 100; i++) + { + latch.countUp(); + Assert.assertEquals(i, latch.getCount()); + } + + for (int i = 100; i > 0; i--) + { + Assert.assertEquals(i, latch.getCount()); + latch.countDown(); + Assert.assertEquals(i - 1, latch.getCount()); + } + + latch.await(); + } + + /** + * This test will open numberOfThreads threads, and add numberOfAdds on the + * VariableLatch After those addthreads are finished, the latch count should + * be numberOfThreads * numberOfAdds Then it will open numberOfThreads + * threads again releasing numberOfAdds on the VariableLatch After those + * releaseThreads are finished, the latch count should be 0 And all the + * waiting threads should be finished also + * + * @throws Exception + */ + @Test + public void testLatchOnMultiThread() throws Exception + { + final ReusableLatch latch = new ReusableLatch(); + + latch.countUp(); // We hold at least one, so ThreadWaits won't go away + + final int numberOfThreads = 100; + final int numberOfAdds = 100; + + class ThreadWait extends Thread + { + private volatile boolean waiting = true; + + @Override + public void run() + { + try + { + if (!latch.await(5000)) + { + System.err.println("Latch timed out"); + } + } + catch (Exception e) + { + e.printStackTrace(); + } + waiting = false; + } + } + + class ThreadAdd extends Thread + { + private final CountDownLatch latchReady; + + private final CountDownLatch latchStart; + + ThreadAdd(final CountDownLatch latchReady, final CountDownLatch latchStart) + { + this.latchReady = latchReady; + this.latchStart = latchStart; + } + + @Override + public void run() + { + try + { + latchReady.countDown(); + // Everybody should start at the same time, to worse concurrency + // effects + latchStart.await(); + for (int i = 0; i < numberOfAdds; i++) + { + latch.countUp(); + } + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } + + CountDownLatch latchReady = new CountDownLatch(numberOfThreads); + CountDownLatch latchStart = new CountDownLatch(1); + + ThreadAdd[] threadAdds = new ThreadAdd[numberOfThreads]; + ThreadWait[] waits = new ThreadWait[numberOfThreads]; + + for (int i = 0; i < numberOfThreads; i++) + { + threadAdds[i] = new ThreadAdd(latchReady, latchStart); + threadAdds[i].start(); + waits[i] = new ThreadWait(); + waits[i].start(); + } + + latchReady.await(); + latchStart.countDown(); + + for (int i = 0; i < numberOfThreads; i++) + { + threadAdds[i].join(); + } + + for (int i = 0; i < numberOfThreads; i++) + { + Assert.assertTrue(waits[i].waiting); + } + + Assert.assertEquals(numberOfThreads * numberOfAdds + 1, latch.getCount()); + + class ThreadDown extends Thread + { + private final CountDownLatch latchReady; + + private final CountDownLatch latchStart; + + ThreadDown(final CountDownLatch latchReady, final CountDownLatch latchStart) + { + this.latchReady = latchReady; + this.latchStart = latchStart; + } + + @Override + public void run() + { + try + { + latchReady.countDown(); + // Everybody should start at the same time, to worse concurrency + // effects + latchStart.await(); + for (int i = 0; i < numberOfAdds; i++) + { + latch.countDown(); + } + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } + + latchReady = new CountDownLatch(numberOfThreads); + latchStart = new CountDownLatch(1); + + ThreadDown[] down = new ThreadDown[numberOfThreads]; + + for (int i = 0; i < numberOfThreads; i++) + { + down[i] = new ThreadDown(latchReady, latchStart); + down[i].start(); + } + + latchReady.await(); + latchStart.countDown(); + + for (int i = 0; i < numberOfThreads; i++) + { + down[i].join(); + } + + Assert.assertEquals(1, latch.getCount()); + + for (int i = 0; i < numberOfThreads; i++) + { + Assert.assertTrue(waits[i].waiting); + } + + latch.countDown(); + + for (int i = 0; i < numberOfThreads; i++) + { + waits[i].join(); + } + + Assert.assertEquals(0, latch.getCount()); + + for (int i = 0; i < numberOfThreads; i++) + { + Assert.assertFalse(waits[i].waiting); + } + } + + @Test + public void testReuseLatch() throws Exception + { + final ReusableLatch latch = new ReusableLatch(5); + for (int i = 0; i < 5; i++) + { + latch.countDown(); + } + + latch.countUp(); + + class ThreadWait extends Thread + { + private volatile boolean waiting = false; + + private volatile Exception e; + + private final CountDownLatch readyLatch = new CountDownLatch(1); + + @Override + public void run() + { + waiting = true; + readyLatch.countDown(); + try + { + if (!latch.await(1000)) + { + System.err.println("Latch timed out!"); + } + } + catch (Exception e) + { + e.printStackTrace(); + this.e = e; + } + waiting = false; + } + } + + ThreadWait t = new ThreadWait(); + t.start(); + + t.readyLatch.await(); + + Assert.assertEquals(true, t.waiting); + + latch.countDown(); + + t.join(); + + Assert.assertEquals(false, t.waiting); + + Assert.assertNull(t.e); + + latch.countUp(); + + t = new ThreadWait(); + t.start(); + + t.readyLatch.await(); + + Assert.assertEquals(true, t.waiting); + + latch.countDown(); + + t.join(); + + Assert.assertEquals(false, t.waiting); + + Assert.assertNull(t.e); + + Assert.assertTrue(latch.await(1000)); + + Assert.assertEquals(0, latch.getCount()); + + latch.countDown(); + + Assert.assertEquals(0, latch.getCount()); + + } + + @Test + public void testTimeout() throws Exception + { + ReusableLatch latch = new ReusableLatch(); + + latch.countUp(); + + long start = System.currentTimeMillis(); + Assert.assertFalse(latch.await(1000)); + long end = System.currentTimeMillis(); + + Assert.assertTrue("Timeout didn't work correctly", end - start >= 1000 && end - start < 2000); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java new file mode 100644 index 0000000..5832b1e --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/util/SimpleServerAbstractTest.java @@ -0,0 +1,79 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test.util; + +import org.proton.plug.test.AbstractJMSTest; +import org.proton.plug.test.Constants; +import org.proton.plug.test.invm.InVMTestConnector; +import org.proton.plug.test.minimalclient.Connector; +import org.proton.plug.test.minimalclient.SimpleAMQPConnector; +import org.proton.plug.test.minimalserver.DumbServer; +import org.proton.plug.test.minimalserver.MinimalServer; +import org.junit.After; +import org.junit.Before; + +/** + * @author Clebert Suconic + */ + +public class SimpleServerAbstractTest +{ + + protected final boolean useSASL; + protected final boolean useInVM; + protected MinimalServer server = new MinimalServer(); + + public SimpleServerAbstractTest(boolean useSASL, boolean useInVM) + { + this.useSASL = useSASL; + this.useInVM = useInVM; + } + + @Before + public void setUp() throws Exception + { + DumbServer.clear(); + AbstractJMSTest.forceGC(); + if (!useInVM) + { + server.start("127.0.0.1", Constants.PORT, useSASL); + } + + + } + + @After + public void tearDown() throws Exception + { + if (!useInVM) + { + server.stop(); + } + DumbServer.clear(); + } + + protected Connector newConnector() + { + if (useInVM) + { + return new InVMTestConnector(); + } + else + { + return new SimpleAMQPConnector(); + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java index 8321533..d515fa3 100644 --- a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java +++ b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java @@ -100,6 +100,8 @@ public class StompDecoder public static final byte C = (byte) 'C'; + public static final byte c = (byte) 'c'; + public static final byte D = (byte) 'D'; public static final byte E = (byte) 'E'; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java index e4d2637..a0d26c1 100644 --- a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java +++ b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java @@ -172,18 +172,18 @@ public class StompFrame this.val = val; } - public String getEscapedKey() + public String getEncodedKey() { - return escape(key); + return encode(key); } - public String getEscapedValue() + public String getEncodedValue() { - return escape(val); + return encode(val); } } - public String escape(String str) + public String encode(String str) { int len = str.length(); @@ -192,21 +192,35 @@ public class StompFrame for (int i = 0; i < len; i++) { char c = str.charAt(i); - if (c == '\n') + + // \n + if (c == (byte) 10) { - buffer[iBuffer++] = '\\'; - buffer[iBuffer] = 'n'; + buffer[iBuffer] = (byte) 92; + buffer[++iBuffer] = (byte) 110; } - else if (c == '\\') + + // \r + else if (c == (byte) 13) { - buffer[iBuffer++] = '\\'; - buffer[iBuffer] = '\\'; + buffer[iBuffer] = (byte) 92; + buffer[++iBuffer] = (byte) 114; } - else if (c == ':') + + // \ + else if (c == (byte) 92) { - buffer[iBuffer++] = '\\'; - buffer[iBuffer] = ':'; + buffer[iBuffer] = (byte) 92; + buffer[++iBuffer] = (byte) 92; } + + // : + else if (c == (byte) 58) + { + buffer[iBuffer] = (byte) 92; + buffer[++iBuffer] = (byte) 99; + } + else { buffer[iBuffer] = c; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java index a5e62fe..a8e6f3e 100644 --- a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java +++ b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java @@ -28,8 +28,8 @@ import org.hornetq.api.core.HornetQExceptionType; import org.hornetq.api.core.Interceptor; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.client.HornetQClient; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.ManagementHelper; -import org.hornetq.api.core.management.NotificationType; import org.hornetq.core.journal.IOAsyncTask; import org.hornetq.core.postoffice.BindingType; import org.hornetq.core.remoting.impl.netty.NettyServerConnection; @@ -42,6 +42,7 @@ import org.hornetq.core.server.management.ManagementService; import org.hornetq.core.server.management.Notification; import org.hornetq.core.server.management.NotificationListener; import org.hornetq.spi.core.protocol.ConnectionEntry; +import org.hornetq.spi.core.protocol.MessageConverter; import org.hornetq.spi.core.protocol.ProtocolManager; import org.hornetq.spi.core.protocol.RemotingConnection; import org.hornetq.spi.core.remoting.Acceptor; @@ -92,6 +93,12 @@ class StompProtocolManager implements ProtocolManager, NotificationListener } } + @Override + public MessageConverter getConverter() + { + return null; + } + // ProtocolManager implementation -------------------------------- public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection) @@ -236,7 +243,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener false, false, null, - stompSession); + stompSession, null); stompSession.setServerSession(session); sessions.put(connection.getID(), stompSession); } @@ -261,7 +268,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener false, false, null, - stompSession); + stompSession, null); stompSession.setServerSession(session); transactedSessions.put(txID, stompSession); } @@ -365,7 +372,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener public ServerMessageImpl createServerMessage() { - return new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512); + return new ServerMessageImpl(server.getStorageManager().generateID(), 512); } public void commitTransaction(StompConnection connection, String txID) throws Exception @@ -402,7 +409,7 @@ class StompProtocolManager implements ProtocolManager, NotificationListener throw new HornetQStompException("There already is a subscription for: " + subscriptionID + ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination"); } - long consumerID = server.getStorageManager().generateUniqueID(); + long consumerID = server.getStorageManager().generateID(); String clientID = (connection.getClientID() != null) ? connection.getClientID() : null; stompSession.addSubscription(consumerID, subscriptionID, @@ -450,7 +457,10 @@ class StompProtocolManager implements ProtocolManager, NotificationListener @Override public void onNotification(Notification notification) { - NotificationType type = notification.getType(); + if (!(notification.getType() instanceof CoreNotificationType)) return; + + CoreNotificationType type = (CoreNotificationType) notification.getType(); + TypedProperties props = notification.getProperties(); switch (type) http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java index 9109541..afc8b28 100644 --- a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java +++ b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java @@ -30,6 +30,7 @@ import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl; import org.hornetq.core.remoting.impl.netty.TransportConstants; import org.hornetq.core.server.LargeServerMessage; import org.hornetq.core.server.QueueQueryResult; +import org.hornetq.core.server.ServerConsumer; import org.hornetq.core.server.ServerMessage; import org.hornetq.core.server.ServerSession; import org.hornetq.core.server.impl.ServerMessageImpl; @@ -86,6 +87,12 @@ public class StompSession implements SessionCallback return session; } + @Override + public boolean hasCredits(ServerConsumer consumerID) + { + return true; + } + public void sendProducerCreditsMessage(int credits, SimpleString address) { } @@ -94,13 +101,13 @@ public class StompSession implements SessionCallback { } - public int sendMessage(ServerMessage serverMessage, long consumerID, int deliveryCount) + public int sendMessage(ServerMessage serverMessage, ServerConsumer consumer, int deliveryCount) { LargeServerMessageImpl largeMessage = null; ServerMessage newServerMessage = serverMessage; try { - StompSubscription subscription = subscriptions.get(consumerID); + StompSubscription subscription = subscriptions.get(consumer.getID()); StompFrame frame = null; if (serverMessage.isLargeMessage()) { @@ -146,13 +153,13 @@ public class StompSession implements SessionCallback if (manager.send(connection, frame)) { //we ack and commit only if the send is successful - session.acknowledge(consumerID, newServerMessage.getMessageID()); + session.acknowledge(consumer.getID(), newServerMessage.getMessageID()); session.commit(); } } else { - messagesToAck.put(newServerMessage.getMessageID(), new Pair<Long, Integer>(consumerID, length)); + messagesToAck.put(newServerMessage.getMessageID(), new Pair<Long, Integer>(consumer.getID(), length)); // Must send AFTER adding to messagesToAck - or could get acked from client BEFORE it's been added! manager.send(connection, frame); } @@ -174,12 +181,12 @@ public class StompSession implements SessionCallback } - public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse) + public int sendLargeMessageContinuation(ServerConsumer consumer, byte[] body, boolean continues, boolean requiresResponse) { return 0; } - public int sendLargeMessage(ServerMessage msg, long consumerID, long bodySize, int deliveryCount) + public int sendLargeMessage(ServerMessage msg, ServerConsumer consumer, long bodySize, int deliveryCount) { return 0; } @@ -199,9 +206,9 @@ public class StompSession implements SessionCallback } @Override - public void disconnect(long consumerId, String queueName) + public void disconnect(ServerConsumer consumerId, String queueName) { - StompSubscription stompSubscription = subscriptions.remove(consumerId); + StompSubscription stompSubscription = subscriptions.remove(consumerId.getID()); if (stompSubscription != null) { StompFrame frame = connection.getFrameHandler().createStompFrame(StompCommands.ERROR.toString()); @@ -380,7 +387,7 @@ public class StompSession implements SessionCallback } StorageManager storageManager = ((ServerSessionImpl) session).getStorageManager(); - long id = storageManager.generateUniqueID(); + long id = storageManager.generateID(); LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message); byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - MessageImpl.BODY_OFFSET]; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java index 483924d..07b9107 100644 --- a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java +++ b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java @@ -730,24 +730,15 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements } case HEADER_SEPARATOR: { - if (isEscaping) + if (inHeaderName) { - //a colon - holder.append(b); - isEscaping = false; - } - else - { - if (inHeaderName) - { - headerName = holder.getString(); + headerName = holder.getString(); - holder.reset(); + holder.reset(); - inHeaderName = false; + inHeaderName = false; - headerValueWhitespace = true; - } + headerValueWhitespace = true; } whiteSpaceOnly = false; @@ -767,6 +758,19 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements } break; } + case StompDecoder.c: + { + if (isEscaping) + { + holder.append(StompDecoder.HEADER_SEPARATOR); + isEscaping = false; + } + else + { + holder.append(b); + } + break; + } case StompDecoder.NEW_LINE: { if (whiteSpaceOnly) http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java index 9bc5b10..2cfc426 100644 --- a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java +++ b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java @@ -62,9 +62,9 @@ public class StompFrameV11 extends StompFrame // Output the headers. for (Header h : allHeaders) { - head.append(h.getEscapedKey()); + head.append(h.getEncodedKey()); head.append(Stomp.Headers.SEPARATOR); - head.append(h.getEscapedValue()); + head.append(h.getEncodedValue()); head.append(Stomp.NEWLINE); } // Add a newline to separate the headers from the content. http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameHandlerV12.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameHandlerV12.java b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameHandlerV12.java index 36a5dbe..3ca1701 100644 --- a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameHandlerV12.java +++ b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameHandlerV12.java @@ -166,24 +166,15 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 implements FrameE } case HEADER_SEPARATOR: { - if (isEscaping) + if (inHeaderName) { - //a colon - holder.append(b); - isEscaping = false; - } - else - { - if (inHeaderName) - { - headerName = holder.getString(); + headerName = holder.getString(); - holder.reset(); + holder.reset(); - inHeaderName = false; + inHeaderName = false; - headerValueWhitespace = true; - } + headerValueWhitespace = true; } whiteSpaceOnly = false; @@ -225,6 +216,19 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 implements FrameE nextEOLChar = true; break; } + case StompDecoder.c: + { + if (isEscaping) + { + holder.append(StompDecoder.HEADER_SEPARATOR); + isEscaping = false; + } + else + { + holder.append(b); + } + break; + } case NEW_LINE: { nextEOLChar = false; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameV12.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameV12.java b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameV12.java index 4dafa0c..2be74f1 100644 --- a/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameV12.java +++ b/hornetq-protocols/hornetq-stomp-protocol/src/main/java/org/hornetq/core/protocol/stomp/v12/StompFrameV12.java @@ -62,9 +62,9 @@ public class StompFrameV12 extends StompFrame // Output the headers. for (Header h : allHeaders) { - head.append(h.getEscapedKey()); + head.append(h.getEncodedKey()); head.append(Stomp.Headers.SEPARATOR); - head.append(h.getEscapedValue()); + head.append(h.getEncodedValue()); head.append(Stomp.NEWLINE); } @@ -104,49 +104,4 @@ public class StompFrameV12 extends StompFrame allHeaders.add(new Header(key, val)); } } - - @Override - public String escape(String str) - { - int len = str.length(); - - char[] buffer = new char[2 * len]; - int iBuffer = 0; - for (int i = 0; i < len; i++) - { - char c = str.charAt(i); - if (c == '\r') - { - buffer[iBuffer++] = '\\'; - buffer[iBuffer] = 'r'; - } - else if (c == '\n') - { - buffer[iBuffer++] = '\\'; - buffer[iBuffer] = 'n'; - } - else if (c == '\\') - { - buffer[iBuffer++] = '\\'; - buffer[iBuffer] = '\\'; - } - else if (c == ':') - { - buffer[iBuffer++] = '\\'; - buffer[iBuffer] = ':'; - } - else - { - buffer[iBuffer] = c; - } - iBuffer++; - } - - char[] total = new char[iBuffer]; - System.arraycopy(buffer, 0, total, 0, iBuffer); - - return new String(total); - } - - } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/pom.xml ---------------------------------------------------------------------- diff --git a/hornetq-protocols/pom.xml b/hornetq-protocols/pom.xml index 54e7012..1ecde53 100644 --- a/hornetq-protocols/pom.xml +++ b/hornetq-protocols/pom.xml @@ -19,6 +19,8 @@ <modules> <module>hornetq-amqp-protocol</module> <module>hornetq-stomp-protocol</module> + <module>hornetq-openwire-protocol</module> + <module>hornetq-proton-plug</module> </modules> -</project> \ No newline at end of file +</project> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/pom.xml ---------------------------------------------------------------------- diff --git a/hornetq-ra/pom.xml b/hornetq-ra/pom.xml index fc9ad51..561af3e 100644 --- a/hornetq-ra/pom.xml +++ b/hornetq-ra/pom.xml @@ -52,6 +52,21 @@ <artifactId>jboss-jca-api</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.jboss.ironjacamar</groupId> + <artifactId>ironjacamar-core-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.jboss.spec.javax.transaction</groupId> + <artifactId>jboss-transaction-api_1.2_spec</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.jboss</groupId> + <artifactId>jboss-transaction-spi</artifactId> + <scope>provided</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java ---------------------------------------------------------------------- diff --git a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java index b51c388..055b3e1 100644 --- a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java +++ b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAManagedConnection.java @@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import org.hornetq.core.client.impl.ClientSessionInternal; import org.hornetq.jms.client.HornetQConnection; import org.hornetq.jms.client.HornetQConnectionFactory; import org.hornetq.jms.client.HornetQXAConnection; @@ -51,6 +52,7 @@ import org.hornetq.jms.client.HornetQXAConnection; * * @author <a href="mailto:[email protected]">Adrian Brock</a> * @author <a href="mailto:[email protected]">Jesper Pedersen</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> */ public final class HornetQRAManagedConnection implements ManagedConnection, ExceptionListener { @@ -292,7 +294,7 @@ public final class HornetQRAManagedConnection implements ManagedConnection, Exce try { /** - * (xa|nonXA)Session.close() may NOT be called BEFORE connection.close(). + * (xa|nonXA)Session.close() may NOT be called BEFORE connection.close() * <p> * If the ClientSessionFactory is trying to fail-over or reconnect with -1 attempts, and * one calls session.close() it may effectively dead-lock. @@ -522,12 +524,14 @@ public final class HornetQRAManagedConnection implements ManagedConnection, Exce } // - // Spec says a mc must allways return the same XA resource, + // Spec says a mc must always return the same XA resource, // so we cache it. // if (xaResource == null) { - xaResource = new HornetQRAXAResource(this, xaSession.getXAResource()); + ClientSessionInternal csi = (ClientSessionInternal) xaSession.getXAResource(); + HornetQRAXAResource hqXAResource = new HornetQRAXAResource(this, xaSession.getXAResource()); + xaResource = new HornetQXAResourceWrapper(hqXAResource, ra.getJndiName(), csi.getNodeId()); } if (HornetQRAManagedConnection.trace) http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASession.java ---------------------------------------------------------------------- diff --git a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASession.java b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASession.java index d03f602..e4ace2b 100644 --- a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASession.java +++ b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASession.java @@ -49,12 +49,16 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import org.hornetq.core.client.impl.ClientSessionFactoryInternal; +import org.hornetq.jms.client.HornetQSession; + /** * A joint interface for JMS sessions * * @author <a href="mailto:[email protected]">Adrian Brock</a> * @author <a href="mailto:[email protected]">Jesper Pedersen</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> */ public final class HornetQRASession implements QueueSession, TopicSession, XAQueueSession, XATopicSession { @@ -1273,6 +1277,18 @@ public final class HornetQRASession implements QueueSession, TopicSession, XAQue } /** + * Returns the ID of the Node that this session is associated with. + * + * @return Node ID + */ + public String getNodeId() throws JMSException + { + HornetQSession session = (HornetQSession) getSessionInternal(); + ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) session.getCoreSession().getSessionFactory(); + return factory.getLiveNodeId(); + } + + /** * Get the session * * @return The session http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASessionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASessionFactoryImpl.java b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASessionFactoryImpl.java index 80493bc..58a88a4 100644 --- a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASessionFactoryImpl.java +++ b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRASessionFactoryImpl.java @@ -923,10 +923,10 @@ public final class HornetQRASessionFactoryImpl extends HornetQConnectionForConte break; //The value {@code Session.CLIENT_ACKNOWLEDGE} may not be used. case Session.CLIENT_ACKNOWLEDGE: - throw HornetQRABundle.BUNDLE.invalidSessionTransactedModeRuntime(); + throw HornetQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime(); //same with this although the spec doesn't explicitly say case Session.SESSION_TRANSACTED: - throw HornetQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime(); + throw HornetQRABundle.BUNDLE.invalidSessionTransactedModeRuntime(); default: throw HornetQRABundle.BUNDLE.invalidAcknowledgeMode(acknowledgeMode); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAXAResource.java ---------------------------------------------------------------------- diff --git a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAXAResource.java b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAXAResource.java index 31cbe75..053b4f2 100644 --- a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAXAResource.java +++ b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQRAXAResource.java @@ -18,6 +18,7 @@ import javax.transaction.xa.Xid; import org.hornetq.api.core.HornetQException; import org.hornetq.core.client.impl.ClientSessionInternal; +import org.hornetq.core.client.impl.HornetQXAResource; /** * HornetQXAResource. @@ -25,7 +26,7 @@ import org.hornetq.core.client.impl.ClientSessionInternal; * @author <a href="[email protected]">Adrian Brock</a> * @author <a href="[email protected]">Jesper Pedersen</a> */ -public class HornetQRAXAResource implements XAResource +public class HornetQRAXAResource implements HornetQXAResource { /** Trace enabled */ private static boolean trace = HornetQRALogger.LOGGER.isTraceEnabled(); @@ -247,4 +248,10 @@ public class HornetQRAXAResource implements XAResource return xaResource.setTransactionTimeout(seconds); } + + @Override + public XAResource getResource() + { + return xaResource; + } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java ---------------------------------------------------------------------- diff --git a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java index c6399ac..d72ee44 100644 --- a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java +++ b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQResourceAdapter.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.hornetq.api.core.BroadcastEndpointFactoryConfiguration; import org.hornetq.api.core.DiscoveryGroupConfiguration; @@ -56,12 +58,18 @@ import org.jgroups.JChannel; * @author <a href="[email protected]">Adrian Brock</a> * @author <a href="[email protected]">Jesper Pedersen</a> * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> */ public class HornetQResourceAdapter implements ResourceAdapter, Serializable { private static final long serialVersionUID = 4756893709825838770L; /** + * The Name of the product that this resource adapter represents. + */ + public static final String PRODUCT_NAME = "HornetQ"; + + /** * Trace enabled */ private static boolean trace = HornetQRALogger.LOGGER.isTraceEnabled(); @@ -111,6 +119,8 @@ public class HornetQResourceAdapter implements ResourceAdapter, Serializable private final List<HornetQRAManagedConnectionFactory> managedConnectionFactories = new ArrayList<HornetQRAManagedConnectionFactory>(); + private String entries; + /** * Constructor */ @@ -371,6 +381,29 @@ public class HornetQResourceAdapter implements ResourceAdapter, Serializable this.raProperties.setHA(ha); } + public String getEntries() + { + return entries; + } + + public String getJndiName() + { + if (!(entries == null || entries.isEmpty())) + { + Matcher m = Pattern.compile("\"(.*?)\"").matcher(entries); + if (m.find()) + { + return m.group(1); + } + } + return null; + } + + public void setEntries(String entries) + { + this.entries = entries; + } + /** * Get the discovery group name * @@ -1890,7 +1923,11 @@ public class HornetQResourceAdapter implements ResourceAdapter, Serializable String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress(); - endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration(discoveryAddress, discoveryPort, localBindAddress, -1); + endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration() + .setGroupAddress(discoveryAddress) + .setGroupPort(discoveryPort) + .setLocalBindAddress(localBindAddress) + .setLocalBindPort(-1); } else if (jgroupsFileName != null) { @@ -1911,7 +1948,10 @@ public class HornetQResourceAdapter implements ResourceAdapter, Serializable initialTimeout = HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; } - DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(refreshTimeout, initialTimeout, endpointFactoryConfiguration); + DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration() + .setRefreshTimeout(refreshTimeout) + .setDiscoveryInitialWaitTimeout(initialTimeout) + .setBroadcastEndpointFactoryConfiguration(endpointFactoryConfiguration); if (HornetQRALogger.LOGGER.isDebugEnabled()) { @@ -2011,7 +2051,11 @@ public class HornetQResourceAdapter implements ResourceAdapter, Serializable String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress() : raProperties.getDiscoveryLocalBindAddress(); - endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration(discoveryAddress, discoveryPort, localBindAddress, -1); + endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration() + .setGroupAddress(discoveryAddress) + .setGroupPort(discoveryPort) + .setLocalBindAddress(localBindAddress) + .setLocalBindPort(-1); } else if (jgroupsFileName != null) { @@ -2045,7 +2089,10 @@ public class HornetQResourceAdapter implements ResourceAdapter, Serializable { initialTimeout = HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT; } - DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(refreshTimeout, initialTimeout, endpointFactoryConfiguration); + DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration() + .setRefreshTimeout(refreshTimeout) + .setDiscoveryInitialWaitTimeout(initialTimeout) + .setBroadcastEndpointFactoryConfiguration(endpointFactoryConfiguration); groupConfiguration.setRefreshTimeout(refreshTimeout); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/HornetQXAResourceWrapper.java ---------------------------------------------------------------------- diff --git a/hornetq-ra/src/main/java/org/hornetq/ra/HornetQXAResourceWrapper.java b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQXAResourceWrapper.java new file mode 100644 index 0000000..043f35b --- /dev/null +++ b/hornetq-ra/src/main/java/org/hornetq/ra/HornetQXAResourceWrapper.java @@ -0,0 +1,143 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.ra; + +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import org.hornetq.core.client.impl.HornetQXAResource; +import org.hornetq.utils.VersionLoader; + +/** + * @author <a href="mailto:[email protected]">Martyn Taylor</a> + * + * Wraps XAResource with org.jboss.tm.XAResourceWrapper. This adds extra meta-data to to the XAResource used by + * Transaction Manager for recovery scenarios. + */ + +public class HornetQXAResourceWrapper implements org.jboss.tm.XAResourceWrapper, org.jboss.jca.core.spi.transaction.xa.XAResourceWrapper, HornetQXAResource +{ + private final XAResource xaResource; + + // The EIS Name + private final String productName; + + // The EIS Version + private final String productVersion; + + // A composite of NodeID + JNDIName that allows adminstrator looking at an XAResource to determine it's origin. + private final String jndiNameNodeId; + + /** + * Creates a new XAResourceWrapper. PRODUCT_NAME, productVersion and jndiName are useful for log output in the + * Transaction Manager. For HornetQ only the resourceManagerID is required to allow Transaction Manager to recover + * from relevant recovery scenarios. + * + * @param xaResource + * @param jndiName + */ + public HornetQXAResourceWrapper(XAResource xaResource, String jndiName, String nodeId) + { + this.xaResource = xaResource; + this.productName = HornetQResourceAdapter.PRODUCT_NAME; + this.productVersion = VersionLoader.getVersion().getFullVersion(); + this.jndiNameNodeId = jndiName + " NodeId:" + nodeId; + } + + @Override + public XAResource getResource() + { + return xaResource; + } + + @Override + public String getProductName() + { + return productName; + } + + @Override + public String getProductVersion() + { + return productVersion; + } + + @Override + public String getJndiName() + { + return jndiNameNodeId; + } + + @Override + public void commit(Xid xid, boolean b) throws XAException + { + getResource().commit(xid, b); + } + + @Override + public void end(Xid xid, int i) throws XAException + { + getResource().end(xid, i); + } + + @Override + public void forget(Xid xid) throws XAException + { + getResource().forget(xid); + } + + @Override + public int getTransactionTimeout() throws XAException + { + return getResource().getTransactionTimeout(); + } + + @Override + public boolean isSameRM(XAResource xaResource) throws XAException + { + return getResource().isSameRM(xaResource); + } + + @Override + public int prepare(Xid xid) throws XAException + { + return getResource().prepare(xid); + } + + @Override + public Xid[] recover(int i) throws XAException + { + return getResource().recover(i); + } + + @Override + public void rollback(Xid xid) throws XAException + { + getResource().rollback(xid); + } + + @Override + public boolean setTransactionTimeout(int i) throws XAException + { + return getResource().setTransactionTimeout(i); + } + + @Override + public void start(Xid xid, int i) throws XAException + { + getResource().start(xid, i); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java ---------------------------------------------------------------------- diff --git a/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java b/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java index d0a17a4..d75be85 100644 --- a/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java +++ b/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivation.java @@ -46,6 +46,7 @@ import org.hornetq.ra.HornetQRAConnectionFactory; import org.hornetq.ra.HornetQRALogger; import org.hornetq.ra.HornetQRaUtils; import org.hornetq.ra.HornetQResourceAdapter; +import org.hornetq.utils.FutureLatch; import org.hornetq.utils.SensitiveDataCodec; /** @@ -401,9 +402,33 @@ public class HornetQActivation handlers.clear(); + FutureLatch future = new FutureLatch(handlersCopy.length); + List<Thread> interruptThreads = new ArrayList<>(); for (HornetQMessageHandler handler : handlersCopy) { - handler.interruptConsumer(); + Thread thread = handler.interruptConsumer(future); + if (thread != null) + { + interruptThreads.add(thread); + } + } + + //wait for all the consumers to complete any onmessage calls + boolean stuckThreads = !future.await(factory.getCallTimeout()); + //if any are stuck then we need to interrupt them + if (stuckThreads) + { + for (Thread interruptThread : interruptThreads) + { + try + { + interruptThread.interrupt(); + } + catch (Exception e) + { + //ok + } + } } Thread threadTearDown = new Thread("TearDown/HornetQActivation") http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java ---------------------------------------------------------------------- diff --git a/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java b/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java index ed40b94..f9ce2f2 100644 --- a/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java +++ b/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQActivationSpec.java @@ -12,13 +12,20 @@ */ package org.hornetq.ra.inflow; +import javax.jms.Queue; import javax.jms.Session; +import javax.jms.Topic; import javax.resource.ResourceException; import javax.resource.spi.ActivationSpec; import javax.resource.spi.InvalidPropertyException; import javax.resource.spi.ResourceAdapter; +import java.beans.IntrospectionException; +import java.beans.PropertyDescriptor; import java.io.Serializable; +import java.util.ArrayList; import java.util.Hashtable; +import java.util.Iterator; +import java.util.List; import org.hornetq.ra.ConnectionFactoryProperties; import org.hornetq.ra.HornetQRALogger; @@ -773,9 +780,47 @@ public class HornetQActivationSpec extends ConnectionFactoryProperties implement HornetQRALogger.LOGGER.trace("validate()"); } - if (destination == null || destination.trim().equals("")) + List<String> errorMessages = new ArrayList<String>(); + List<PropertyDescriptor> propsNotSet = new ArrayList<PropertyDescriptor>(); + + try + { + if (destination == null || destination.trim().equals("")) + { + propsNotSet.add(new PropertyDescriptor("destination", HornetQActivationSpec.class)); + errorMessages.add("Destination is mandatory."); + } + + if (destinationType != null && !Topic.class.getName().equals(destinationType) && !Queue.class.getName().equals(destinationType)) + { + propsNotSet.add(new PropertyDescriptor("destinationType", HornetQActivationSpec.class)); + errorMessages.add("If set, the destinationType must be either 'javax.jms.Topic' or 'javax.jms.Queue'."); + } + + if ((destinationType == null || destinationType.length() == 0 || Topic.class.getName().equals(destinationType)) && isSubscriptionDurable() && (subscriptionName == null || subscriptionName.length() == 0)) + { + propsNotSet.add(new PropertyDescriptor("subscriptionName", HornetQActivationSpec.class)); + errorMessages.add("If subscription is durable then subscription name must be specified."); + } + } + catch (IntrospectionException e) + { + e.printStackTrace(); + } + + if (propsNotSet.size() > 0) { - throw new InvalidPropertyException("Destination is mandatory"); + StringBuffer b = new StringBuffer(); + b.append("Invalid settings:"); + for (Iterator<String> iter = errorMessages.iterator(); iter.hasNext();) + { + b.append(" "); + b.append(iter.next()); + } + InvalidPropertyException e = new InvalidPropertyException(b.toString()); + final PropertyDescriptor[] descriptors = propsNotSet.toArray(new PropertyDescriptor[propsNotSet.size()]); + e.setInvalidPropertyDescriptors(descriptors); + throw e; } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQMessageHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQMessageHandler.java b/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQMessageHandler.java index 82b811d..eeb5b06 100644 --- a/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQMessageHandler.java +++ b/hornetq-ra/src/main/java/org/hornetq/ra/inflow/HornetQMessageHandler.java @@ -28,10 +28,14 @@ import org.hornetq.api.core.client.ClientSession.QueueQuery; import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.MessageHandler; import org.hornetq.core.client.impl.ClientConsumerInternal; +import org.hornetq.core.client.impl.ClientSessionFactoryInternal; import org.hornetq.core.client.impl.ClientSessionInternal; import org.hornetq.jms.client.HornetQDestination; import org.hornetq.jms.client.HornetQMessage; import org.hornetq.ra.HornetQRALogger; +import org.hornetq.ra.HornetQResourceAdapter; +import org.hornetq.ra.HornetQXAResourceWrapper; +import org.hornetq.utils.FutureLatch; /** * The message handler @@ -39,6 +43,7 @@ import org.hornetq.ra.HornetQRALogger; * @author <a href="[email protected]">Adrian Brock</a> * @author <a href="mailto:[email protected]">Jesper Pedersen</a> * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @author <a href="mailto:[email protected]">Martyn Taylor</a> */ public class HornetQMessageHandler implements MessageHandler { @@ -99,11 +104,9 @@ public class HornetQMessageHandler implements MessageHandler SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector); if (activation.isTopic() && spec.isSubscriptionDurable()) { - String subscriptionName = spec.getSubscriptionName(); - String clientID = spec.getClientID(); - - SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(true, clientID, - subscriptionName)); + SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(true, + spec.getClientID(), + spec.getSubscriptionName())); QueueQuery subResponse = session.queueQuery(queueName); @@ -188,7 +191,10 @@ public class HornetQMessageHandler implements MessageHandler transacted = activation.isDeliveryTransacted(); if (activation.isDeliveryTransacted() && !activation.getActivationSpec().isUseLocalTx()) { - endpoint = endpointFactory.createEndpoint(session); + XAResource xaResource = new HornetQXAResourceWrapper(session, + ((HornetQResourceAdapter) spec.getResourceAdapter()).getJndiName(), + ((ClientSessionFactoryInternal) cf).getLiveNodeId()); + endpoint = endpointFactory.createEndpoint(xaResource); useXA = true; } else @@ -204,19 +210,20 @@ public class HornetQMessageHandler implements MessageHandler return useXA ? session : null; } - public void interruptConsumer() + public Thread interruptConsumer(FutureLatch future) { try { if (consumer != null) { - consumer.interruptHandlers(); + return consumer.prepareForClose(future); } } catch (Throwable e) { HornetQRALogger.LOGGER.warn("Error interrupting handler on endpoint " + endpoint + " handler=" + consumer); } + return null; } /** http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-rest/src/test/java/org/hornetq/rest/test/Embedded.java ---------------------------------------------------------------------- diff --git a/hornetq-rest/src/test/java/org/hornetq/rest/test/Embedded.java b/hornetq-rest/src/test/java/org/hornetq/rest/test/Embedded.java index b51bd8b..aa6aff1 100644 --- a/hornetq-rest/src/test/java/org/hornetq/rest/test/Embedded.java +++ b/hornetq-rest/src/test/java/org/hornetq/rest/test/Embedded.java @@ -78,10 +78,10 @@ public class Embedded System.out.println("\nStarting Embedded"); if (hornetqServer == null) { - Configuration configuration = new ConfigurationImpl(); - configuration.setPersistenceEnabled(false); - configuration.setSecurityEnabled(false); - configuration.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + Configuration configuration = new ConfigurationImpl() + .setPersistenceEnabled(false) + .setSecurityEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); hornetqServer = HornetQServers.newHornetQServer(configuration); hornetqServer.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java ---------------------------------------------------------------------- diff --git a/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java b/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java index dd1111a..bbf4ba6 100644 --- a/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java +++ b/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushQueueConsumerTest.java @@ -46,10 +46,10 @@ public class PersistentPushQueueConsumerTest public static void startup() throws Exception { - Configuration configuration = new ConfigurationImpl(); - configuration.setPersistenceEnabled(false); - configuration.setSecurityEnabled(false); - configuration.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + Configuration configuration = new ConfigurationImpl() + .setPersistenceEnabled(false) + .setSecurityEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); hornetqServer = HornetQServers.newHornetQServer(configuration); hornetqServer.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java ---------------------------------------------------------------------- diff --git a/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java b/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java index e21fdda..135286a 100644 --- a/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java +++ b/hornetq-rest/src/test/java/org/hornetq/rest/test/PersistentPushTopicConsumerTest.java @@ -57,10 +57,10 @@ public class PersistentPushTopicConsumerTest @BeforeClass public static void setup() throws Exception { - Configuration configuration = new ConfigurationImpl(); - configuration.setPersistenceEnabled(false); - configuration.setSecurityEnabled(false); - configuration.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + Configuration configuration = new ConfigurationImpl() + .setPersistenceEnabled(false) + .setSecurityEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); server = HornetQServers.newHornetQServer(configuration); server.start(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-rest/src/test/java/org/hornetq/rest/test/RawAckTest.java ---------------------------------------------------------------------- diff --git a/hornetq-rest/src/test/java/org/hornetq/rest/test/RawAckTest.java b/hornetq-rest/src/test/java/org/hornetq/rest/test/RawAckTest.java index 21a5dae..a3eed8c 100644 --- a/hornetq-rest/src/test/java/org/hornetq/rest/test/RawAckTest.java +++ b/hornetq-rest/src/test/java/org/hornetq/rest/test/RawAckTest.java @@ -53,10 +53,10 @@ public class RawAckTest @BeforeClass public static void setup() throws Exception { - Configuration configuration = new ConfigurationImpl(); - configuration.setPersistenceEnabled(false); - configuration.setSecurityEnabled(false); - configuration.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + Configuration configuration = new ConfigurationImpl() + .setPersistenceEnabled(false) + .setSecurityEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); hornetqServer = HornetQServers.newHornetQServer(configuration); hornetqServer.start();
