Repository: activemq Updated Branches: refs/heads/master 10c47d69d -> 72839b78a
http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java new file mode 100644 index 0000000..7aa8c62 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java @@ -0,0 +1,384 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.net.SocketFactory; + +import org.apache.activemq.transport.tcp.TcpBufferedInputStream; +import org.apache.activemq.transport.tcp.TcpBufferedOutputStream; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.InetAddressUtil; +import org.fusesource.hawtbuf.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple TCP based transport used by the client. + */ +public class ClientTcpTransport implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ClientTcpTransport.class); + + public interface TransportListener { + + /** + * Called when new incoming data has become available. + * + * @param incoming + * the next incoming packet of data. + */ + void onData(Buffer incoming); + + /** + * Called if the connection state becomes closed. + */ + void onTransportClosed(); + + /** + * Called when an error occurs during normal Transport operations. + * + * @param cause + * the error that triggered this event. + */ + void onTransportError(Throwable cause); + + } + + private final URI remoteLocation; + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>(); + + private final Socket socket; + private DataOutputStream dataOut; + private DataInputStream dataIn; + private Thread runner; + private TransportListener listener; + + private int socketBufferSize = 64 * 1024; + private int soTimeout = 0; + private int soLinger = Integer.MIN_VALUE; + private Boolean keepAlive; + private Boolean tcpNoDelay = true; + private boolean useLocalHost = false; + private int ioBufferSize = 8 * 1024; + + /** + * Create a new instance of the transport. + * + * @param listener + * The TransportListener that will receive data from this Transport instance. + * @param remoteLocation + * The remote location where this transport should connection to. + */ + public ClientTcpTransport(URI remoteLocation) { + this.remoteLocation = remoteLocation; + + Socket temp = null; + try { + temp = createSocketFactory().createSocket(); + } catch (IOException e) { + connectionError.set(e); + } + + this.socket = temp; + } + + public void connect() throws IOException { + if (connectionError.get() != null) { + throw IOExceptionSupport.create(connectionError.get()); + } + + if (listener == null) { + throw new IllegalStateException("Cannot connect until a listener has been set."); + } + + if (socket == null) { + throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set"); + } + + InetSocketAddress remoteAddress = null; + + if (remoteLocation != null) { + String host = resolveHostName(remoteLocation.getHost()); + remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); + } + + socket.connect(remoteAddress); + + connected.set(true); + + initialiseSocket(socket); + initializeStreams(); + + runner = new Thread(null, this, "ClientTcpTransport: " + toString()); + runner.setDaemon(false); + runner.start(); + } + + public void close() { + if (closed.compareAndSet(false, true)) { + if (socket == null) { + return; + } + + // Closing the streams flush the sockets before closing.. if the socket + // is hung.. then this hangs the close so we perform an asynchronous close + // by default which will timeout if the close doesn't happen after a delay. + final CountDownLatch latch = new CountDownLatch(1); + + final ExecutorService closer = Executors.newSingleThreadExecutor(); + closer.execute(new Runnable() { + @Override + public void run() { + LOG.trace("Closing socket {}", socket); + try { + socket.close(); + LOG.debug("Closed socket {}", socket); + } catch (IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e); + } + } finally { + latch.countDown(); + } + } + }); + + try { + latch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + closer.shutdownNow(); + } + } + } + + public void send(ByteBuffer output) throws IOException { + checkConnected(); + LOG.trace("Client Transport sending packet of size: {}", output.remaining()); + WritableByteChannel channel = Channels.newChannel(dataOut); + channel.write(output); + dataOut.flush(); + } + + public void send(Buffer output) throws IOException { + checkConnected(); + send(output.toByteBuffer()); + } + + public URI getRemoteURI() { + return this.remoteLocation; + } + + public boolean isConnected() { + return this.connected.get(); + } + + public TransportListener getTransportListener() { + return this.listener; + } + + public void setTransportListener(TransportListener listener) { + if (listener == null) { + throw new IllegalArgumentException("Listener cannot be set to null"); + } + + this.listener = listener; + } + + public int getSocketBufferSize() { + return socketBufferSize; + } + + public void setSocketBufferSize(int socketBufferSize) { + this.socketBufferSize = socketBufferSize; + } + + public int getSoTimeout() { + return soTimeout; + } + + public void setSoTimeout(int soTimeout) { + this.soTimeout = soTimeout; + } + + public boolean isTcpNoDelay() { + return tcpNoDelay; + } + + public void setTcpNoDelay(Boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public int getSoLinger() { + return soLinger; + } + + public void setSoLinger(int soLinger) { + this.soLinger = soLinger; + } + + public boolean isKeepAlive() { + return keepAlive; + } + + public void setKeepAlive(Boolean keepAlive) { + this.keepAlive = keepAlive; + } + + public boolean isUseLocalHost() { + return useLocalHost; + } + + public void setUseLocalHost(boolean useLocalHost) { + this.useLocalHost = useLocalHost; + } + + public int getIoBufferSize() { + return ioBufferSize; + } + + public void setIoBufferSize(int ioBufferSize) { + this.ioBufferSize = ioBufferSize; + } + + //---------- Transport internal implementation ---------------------------// + + @Override + public void run() { + LOG.trace("TCP consumer thread for {} starting", this); + try { + while (isConnected()) { + doRun(); + } + } catch (IOException e) { + connectionError.set(e); + onException(e); + } catch (Throwable e) { + IOException ioe = new IOException("Unexpected error occured: " + e); + connectionError.set(ioe); + ioe.initCause(e); + onException(ioe); + } + } + + protected void doRun() throws IOException { + int size = dataIn.available(); + if (size <= 0) { + try { + TimeUnit.NANOSECONDS.sleep(1); + } catch (InterruptedException e) { + } + return; + } + + byte[] buffer = new byte[size]; + dataIn.readFully(buffer); + Buffer incoming = new Buffer(buffer); + listener.onData(incoming); + } + + /** + * Passes any IO exceptions into the transport listener + */ + public void onException(IOException e) { + if (listener != null) { + try { + listener.onTransportError(e); + } catch (RuntimeException e2) { + LOG.debug("Unexpected runtime exception: {}", e2.getMessage(), e2); + } + } + } + + protected SocketFactory createSocketFactory() throws IOException { + return SocketFactory.getDefault(); + } + + protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException { + try { + sock.setReceiveBufferSize(socketBufferSize); + sock.setSendBufferSize(socketBufferSize); + } catch (SocketException se) { + LOG.warn("Cannot set socket buffer size = {}", socketBufferSize); + LOG.debug("Cannot set socket buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se); + } + + sock.setSoTimeout(soTimeout); + + if (keepAlive != null) { + sock.setKeepAlive(keepAlive.booleanValue()); + } + + if (soLinger > -1) { + sock.setSoLinger(true, soLinger); + } else if (soLinger == -1) { + sock.setSoLinger(false, 0); + } + + if (tcpNoDelay != null) { + sock.setTcpNoDelay(tcpNoDelay.booleanValue()); + } + } + + protected void initializeStreams() throws IOException { + try { + TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize); + this.dataIn = new DataInputStream(buffIn); + TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); + this.dataOut = new DataOutputStream(outputStream); + } catch (Throwable e) { + throw IOExceptionSupport.create(e); + } + } + + protected String resolveHostName(String host) throws UnknownHostException { + if (isUseLocalHost()) { + String localName = InetAddressUtil.getLocalHostName(); + if (localName != null && localName.equals(host)) { + return "localhost"; + } + } + return host; + } + + private void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java new file mode 100644 index 0000000..b67b305 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.util.EnumSet; +import java.util.Map; + +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Session; + +/** + * Unmodifiable Connection wrapper used to prevent test code from accidentally + * modifying Connection state. + */ +public class UnmodifiableConnection implements Connection { + + private final Connection connection; + + public UnmodifiableConnection(Connection connection) { + this.connection = connection; + } + + @Override + public EndpointState getLocalState() { + return connection.getLocalState(); + } + + @Override + public EndpointState getRemoteState() { + return connection.getRemoteState(); + } + + @Override + public ErrorCondition getCondition() { + return connection.getCondition(); + } + + @Override + public void setCondition(ErrorCondition condition) { + throw new UnsupportedOperationException("Cannot alter the Connection"); + } + + @Override + public ErrorCondition getRemoteCondition() { + return connection.getRemoteCondition(); + } + + @Override + public void free() { + throw new UnsupportedOperationException("Cannot alter the Connection"); + } + + @Override + public void open() { + throw new UnsupportedOperationException("Cannot alter the Connection"); + } + + @Override + public void close() { + throw new UnsupportedOperationException("Cannot alter the Connection"); + } + + @Override + public Session session() { + throw new UnsupportedOperationException("Cannot alter the Connection"); + } + + @Override + public Session sessionHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) { + Session head = connection.sessionHead(local, remote); + if (head != null) { + head = new UnmodifiableSession(head); + } + + return head; + } + + @Override + public Link linkHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) { + // TODO - If implemented this method should return an unmodifiable link isntance. + return null; + } + + @Override + public Delivery getWorkHead() { + // TODO - If implemented this method should return an unmodifiable delivery isntance. + return null; + } + + @Override + public void setContainer(String container) { + throw new UnsupportedOperationException("Cannot alter the Connection"); + } + + @Override + public void setHostname(String hostname) { + throw new UnsupportedOperationException("Cannot alter the Connection"); + } + + @Override + public String getHostname() { + return connection.getHostname(); + } + + @Override + public String getRemoteContainer() { + return connection.getRemoteContainer(); + } + + @Override + public String getRemoteHostname() { + return connection.getRemoteHostname(); + } + + @Override + public void setOfferedCapabilities(Symbol[] capabilities) { + throw new UnsupportedOperationException("Cannot alter the Connection"); + } + + @Override + public void setDesiredCapabilities(Symbol[] capabilities) { + throw new UnsupportedOperationException("Cannot alter the Connection"); + } + + @Override + public Symbol[] getRemoteOfferedCapabilities() { + return connection.getRemoteOfferedCapabilities(); + } + + @Override + public Symbol[] getRemoteDesiredCapabilities() { + return connection.getRemoteDesiredCapabilities(); + } + + @Override + public Map<Symbol, Object> getRemoteProperties() { + return connection.getRemoteProperties(); + } + + @Override + public void setProperties(Map<Symbol, Object> properties) { + throw new UnsupportedOperationException("Cannot alter the Connection"); + } + + @Override + public Object getContext() { + return connection.getContext(); + } + + @Override + public void setContext(Object context) { + throw new UnsupportedOperationException("Cannot alter the Connection"); + } + + @Override + public void collect(Collector collector) { + throw new UnsupportedOperationException("Cannot alter the Connection"); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java new file mode 100644 index 0000000..fd99665 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableDelivery.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; + +/** + * Unmodifiable Delivery wrapper used to prevent test code from accidentally + * modifying Delivery state. + */ +public class UnmodifiableDelivery implements Delivery { + + private final Delivery delivery; + + public UnmodifiableDelivery(Delivery delivery) { + this.delivery = delivery; + } + + @Override + public byte[] getTag() { + return delivery.getTag(); + } + + @Override + public Link getLink() { + if (delivery.getLink() instanceof Sender) { + return new UnmodifiableSender((Sender) delivery.getLink()); + } else if (delivery.getLink() instanceof Receiver) { + return new UnmodifiableReceiver((Receiver) delivery.getLink()); + } else { + throw new IllegalStateException("Delivery has unknown link type"); + } + } + + @Override + public DeliveryState getLocalState() { + return delivery.getLocalState(); + } + + @Override + public DeliveryState getRemoteState() { + return delivery.getRemoteState(); + } + + @Override + public int getMessageFormat() { + return delivery.getMessageFormat(); + } + + @Override + public void disposition(DeliveryState state) { + throw new UnsupportedOperationException("Cannot alter the Delivery state"); + } + + @Override + public void settle() { + throw new UnsupportedOperationException("Cannot alter the Delivery state"); + } + + @Override + public boolean isSettled() { + return delivery.isSettled(); + } + + @Override + public boolean remotelySettled() { + return delivery.remotelySettled(); + } + + @Override + public void free() { + throw new UnsupportedOperationException("Cannot alter the Delivery state"); + } + + @Override + public Delivery getWorkNext() { + return new UnmodifiableDelivery(delivery.getWorkNext()); + } + + @Override + public Delivery next() { + return new UnmodifiableDelivery(delivery.next()); + } + + @Override + public boolean isWritable() { + return delivery.isWritable(); + } + + @Override + public boolean isReadable() { + return delivery.isReadable(); + } + + @Override + public void setContext(Object o) { + throw new UnsupportedOperationException("Cannot alter the Delivery state"); + } + + @Override + public Object getContext() { + return delivery.getContext(); + } + + @Override + public boolean isUpdated() { + return delivery.isUpdated(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException("Cannot alter the Delivery state"); + } + + @Override + public boolean isPartial() { + return delivery.isPartial(); + } + + @Override + public int pending() { + return delivery.pending(); + } + + @Override + public boolean isBuffered() { + return delivery.isBuffered(); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java new file mode 100644 index 0000000..70665c0 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.util.EnumSet; + +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.amqp.transport.Source; +import org.apache.qpid.proton.amqp.transport.Target; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; + +/** + * Unmodifiable Session wrapper used to prevent test code from accidentally + * modifying Session state. + */ +public class UnmodifiableLink implements Link { + + private final Link link; + + public UnmodifiableLink(Link link) { + this.link = link; + } + + @Override + public EndpointState getLocalState() { + return link.getLocalState(); + } + + @Override + public EndpointState getRemoteState() { + return link.getRemoteState(); + } + + @Override + public ErrorCondition getCondition() { + return link.getCondition(); + } + + @Override + public void setCondition(ErrorCondition condition) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public ErrorCondition getRemoteCondition() { + return link.getRemoteCondition(); + } + + @Override + public void free() { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public void open() { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public void close() { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public void setContext(Object o) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public Object getContext() { + return link.getContext(); + } + + @Override + public String getName() { + return link.getName(); + } + + @Override + public Delivery delivery(byte[] tag) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public Delivery delivery(byte[] tag, int offset, int length) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public Delivery head() { + return new UnmodifiableDelivery(link.head()); + } + + @Override + public Delivery current() { + return new UnmodifiableDelivery(link.current()); + } + + @Override + public boolean advance() { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public Source getSource() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Target getTarget() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setSource(Source address) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public void setTarget(Target address) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public Source getRemoteSource() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Target getRemoteTarget() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Link next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) { + Link next = link.next(local, remote); + + if (next != null) { + if (next instanceof Sender) { + next = new UnmodifiableSender((Sender) next); + } else { + next = new UnmodifiableReceiver((Receiver) next); + } + } + + return next; + } + + @Override + public int getCredit() { + return link.getCredit(); + } + + @Override + public int getQueued() { + return link.getQueued(); + } + + @Override + public int getUnsettled() { + return link.getUnsettled(); + } + + @Override + public Session getSession() { + return new UnmodifiableSession(link.getSession()); + } + + @Override + public SenderSettleMode getSenderSettleMode() { + return link.getSenderSettleMode(); + } + + @Override + public void setSenderSettleMode(SenderSettleMode senderSettleMode) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public SenderSettleMode getRemoteSenderSettleMode() { + return link.getRemoteSenderSettleMode(); + } + + @Override + public ReceiverSettleMode getReceiverSettleMode() { + return link.getReceiverSettleMode(); + } + + @Override + public void setReceiverSettleMode(ReceiverSettleMode receiverSettleMode) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public ReceiverSettleMode getRemoteReceiverSettleMode() { + return link.getRemoteReceiverSettleMode(); + } + + @Override + public void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public int drained() { + return link.drained(); // TODO - Is this a mutating call? + } + + @Override + public int getRemoteCredit() { + return link.getRemoteCredit(); + } + + @Override + public boolean getDrain() { + return link.getDrain(); + } + + @Override + public void detach() { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java new file mode 100644 index 0000000..1b07ed0 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import org.apache.qpid.proton.engine.Receiver; + +/** + * Unmodifiable Receiver wrapper used to prevent test code from accidentally + * modifying Receiver state. + */ +public class UnmodifiableReceiver extends UnmodifiableLink implements Receiver { + + private final Receiver receiver; + + public UnmodifiableReceiver(Receiver receiver) { + super(receiver); + + this.receiver = receiver; + } + + @Override + public void flow(int credits) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public int recv(byte[] bytes, int offset, int size) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public void drain(int credit) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public boolean draining() { + return receiver.draining(); + } + + @Override + public void setDrain(boolean drain) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java new file mode 100644 index 0000000..1517a93 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import org.apache.qpid.proton.engine.Sender; + +/** + * Unmodifiable Sender wrapper used to prevent test code from accidentally + * modifying Sender state. + */ +public class UnmodifiableSender extends UnmodifiableLink implements Sender { + + public UnmodifiableSender(Sender sender) { + super(sender); + } + + @Override + public void offer(int credits) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public int send(byte[] bytes, int offset, int length) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public void abort() { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java new file mode 100644 index 0000000..6a73e0f --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.util.EnumSet; + +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; + +/** + * Unmodifiable Session wrapper used to prevent test code from accidentally + * modifying Session state. + */ +public class UnmodifiableSession implements Session { + + private final Session session; + + public UnmodifiableSession(Session session) { + this.session = session; + } + + @Override + public EndpointState getLocalState() { + return session.getLocalState(); + } + + @Override + public EndpointState getRemoteState() { + return session.getRemoteState(); + } + + @Override + public ErrorCondition getCondition() { + return session.getCondition(); + } + + @Override + public void setCondition(ErrorCondition condition) { + throw new UnsupportedOperationException("Cannot alter the Session"); + } + + @Override + public ErrorCondition getRemoteCondition() { + return session.getRemoteCondition(); + } + + @Override + public void free() { + throw new UnsupportedOperationException("Cannot alter the Session"); + } + + @Override + public void open() { + throw new UnsupportedOperationException("Cannot alter the Session"); + } + + @Override + public void close() { + throw new UnsupportedOperationException("Cannot alter the Session"); + } + + @Override + public void setContext(Object o) { + throw new UnsupportedOperationException("Cannot alter the Session"); + } + + @Override + public Object getContext() { + return session.getContext(); + } + + @Override + public Sender sender(String name) { + throw new UnsupportedOperationException("Cannot alter the Session"); + } + + @Override + public Receiver receiver(String name) { + throw new UnsupportedOperationException("Cannot alter the Session"); + } + + @Override + public Session next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) { + Session next = session.next(local, remote); + if (next != null) { + next = new UnmodifiableSession(next); + } + + return next; + } + + @Override + public Connection getConnection() { + return new UnmodifiableConnection(session.getConnection()); + } + + @Override + public int getIncomingCapacity() { + return session.getIncomingCapacity(); + } + + @Override + public void setIncomingCapacity(int bytes) { + throw new UnsupportedOperationException("Cannot alter the Session"); + } + + @Override + public int getIncomingBytes() { + return session.getIncomingBytes(); + } + + @Override + public int getOutgoingBytes() { + return session.getOutgoingBytes(); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/WrappedAsyncResult.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/WrappedAsyncResult.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/WrappedAsyncResult.java new file mode 100644 index 0000000..bb2d983 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/WrappedAsyncResult.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +/** + * Base class used to wrap one AsyncResult with another. + */ +public abstract class WrappedAsyncResult implements AsyncResult { + + protected final AsyncResult wrapped; + + /** + * Create a new WrappedAsyncResult for the target AsyncResult + */ + public WrappedAsyncResult(AsyncResult wrapped) { + this.wrapped = wrapped; + } + + @Override + public void onFailure(Throwable result) { + if (wrapped != null) { + wrapped.onFailure(result); + } + } + + @Override + public void onSuccess() { + if (wrapped != null) { + wrapped.onSuccess(); + } + } + + @Override + public boolean isComplete() { + if (wrapped != null) { + return wrapped.isComplete(); + } + + return false; + } + + public AsyncResult getWrappedRequest() { + return wrapped; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java new file mode 100644 index 0000000..a35709d --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; +import static org.apache.activemq.transport.amqp.AmqpSupport.contains; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.Map; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpStateInspector; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.AmqpError; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Connection; +import org.junit.Test; + +/** + * Test broker handling of AMQP connections with various configurations. + */ +public class AmqpConnectionsTest extends AmqpClientTestSupport { + + private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); + private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); + private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); + + @Test(timeout = 60000) + public void testCanConnect() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection = client.connect(); + assertNotNull(connection); + + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + + connection.close(); + + assertEquals(0, getProxyToBroker().getCurrentConnectionsCount()); + } + + @Test(timeout = 60000) + public void testConnectionCarriesExpectedCapabilities() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + client.setStateInspector(new AmqpStateInspector() { + + @Override + public void inspectOpenedResource(Connection connection) { + + Symbol[] offered = connection.getRemoteOfferedCapabilities(); + if (!contains(offered, ANONYMOUS_RELAY)) { + markAsInvalid("Broker did not indicate it support anonymous relay"); + } + + Map<Symbol, Object> properties = connection.getRemoteProperties(); + if (!properties.containsKey(QUEUE_PREFIX)) { + markAsInvalid("Broker did not send a queue prefix value"); + } + + if (!properties.containsKey(TOPIC_PREFIX)) { + markAsInvalid("Broker did not send a queue prefix value"); + } + } + }); + + AmqpConnection connection = client.connect(); + assertNotNull(connection); + + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + + connection.close(); + + assertEquals(0, getProxyToBroker().getCurrentConnectionsCount()); + } + + @Test(timeout = 60000) + public void testCanConnectWithDifferentContainerIds() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection1 = client.createConnection(); + AmqpConnection connection2 = client.createConnection(); + + connection1.setContainerId(getTestName() + "-Client:1"); + connection2.setContainerId(getTestName() + "-Client:2"); + + connection1.connect(); + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + + connection2.connect(); + assertEquals(2, getProxyToBroker().getCurrentConnectionsCount()); + + connection1.close(); + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + + connection2.close(); + assertEquals(0, getProxyToBroker().getCurrentConnectionsCount()); + } + + @Test(timeout = 60000) + public void testCannotConnectWithSameContainerId() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection1 = client.createConnection(); + AmqpConnection connection2 = client.createConnection(); + + connection1.setContainerId(getTestName()); + connection2.setContainerId(getTestName()); + + connection1.connect(); + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + + connection2.setStateInspector(new AmqpStateInspector() { + + @Override + public void inspectOpenedResource(Connection connection) { + if (!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { + markAsInvalid("Broker did not set connection establishment failed property"); + } + } + + @Override + public void inspectClosedResource(Connection connection) { + ErrorCondition remoteError = connection.getRemoteCondition(); + if (remoteError == null) { + markAsInvalid("Broker dd not add error condition for duplicate client ID"); + } + + if (!remoteError.getCondition().equals(AmqpError.INVALID_FIELD)) { + markAsInvalid("Broker dd not set condition to " + AmqpError.INVALID_FIELD); + } + } + }); + + try { + connection2.connect(); + //fail("Should not be able to connect with same container Id."); + } catch (Exception ex) { + LOG.info("Second connection with same container Id failed as expected."); + } + + connection2.getStateInspector().assertIfStateChecksFailed(); + + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + + connection1.close(); + assertEquals(0, getProxyToBroker().getCurrentConnectionsCount()); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java new file mode 100644 index 0000000..1245811 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.util.Wait; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverTest extends AmqpClientTestSupport { + + @Override + protected boolean isUseOpenWireConnector() { + return true; + } + + @Test(timeout = 60000) + public void testCreateQueueReceiver() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + + assertEquals(1, brokerService.getAdminView().getQueues().length); + assertNotNull(getProxyToQueue(getTestName())); + assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); + receiver.close(); + assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length); + + connection.close(); + } + + @Test(timeout = 60000) + public void testCreateTopicReceiver() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getTopics().length); + + AmqpReceiver receiver = session.createReceiver("topic://" + getTestName()); + + assertEquals(1, brokerService.getAdminView().getTopics().length); + assertNotNull(getProxyToTopic(getTestName())); + assertEquals(1, brokerService.getAdminView().getTopicSubscribers().length); + receiver.close(); + assertEquals(0, brokerService.getAdminView().getTopicSubscribers().length); + + connection.close(); + } + + @Test(timeout = 60000) + public void testQueueReceiverReadMessage() throws Exception { + sendMessages(getTestName(), 1, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + + QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getQueueSize()); + assertEquals(0, queueView.getDispatchCount()); + + receiver.flow(1); + assertNotNull(receiver.receive(5, TimeUnit.SECONDS)); + receiver.close(); + + assertEquals(1, queueView.getQueueSize()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception { + int MSG_COUNT = 4; + sendMessages(getTestName(), MSG_COUNT, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName()); + + QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getQueueSize()); + + receiver1.flow(2); + assertNotNull(receiver1.receive(5, TimeUnit.SECONDS)); + assertNotNull(receiver1.receive(5, TimeUnit.SECONDS)); + + AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName()); + + assertEquals(2, brokerService.getAdminView().getQueueSubscribers().length); + + receiver2.flow(2); + assertNotNull(receiver2.receive(5, TimeUnit.SECONDS)); + assertNotNull(receiver2.receive(5, TimeUnit.SECONDS)); + + assertEquals(MSG_COUNT, queueView.getDispatchCount()); + assertEquals(0, queueView.getDequeueCount()); + + receiver1.close(); + receiver2.close(); + + assertEquals(MSG_COUNT, queueView.getQueueSize()); + + connection.close(); + } + + @Ignore("Fails due to issues with accept and no credit") + @Test(timeout = 60000) + public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception { + int MSG_COUNT = 4; + sendMessages(getTestName(), MSG_COUNT, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName()); + + final QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getQueueSize()); + + receiver1.flow(2); + AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + message = receiver1.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + + assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getDequeueCount() == 2; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50))); + + AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName()); + + assertEquals(2, brokerService.getAdminView().getQueueSubscribers().length); + + receiver2.flow(2); + message = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + message = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + + assertEquals(MSG_COUNT, queueView.getDispatchCount()); + assertTrue("Queue should be empty now", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getDequeueCount() == 4; + } + }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(10))); + + receiver1.close(); + receiver2.close(); + + assertEquals(0, queueView.getQueueSize()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception { + int MSG_COUNT = 20; + sendMessages(getTestName(), MSG_COUNT, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName()); + + final QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getQueueSize()); + + receiver1.flow(20); + + assertTrue("Should have dispatch to prefetch", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getInFlightCount() >= 2; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50))); + + receiver1.close(); + + AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName()); + + assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); + + receiver2.flow(MSG_COUNT * 2); + AmqpMessage message = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + message = receiver2.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + + assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getDequeueCount() == 2; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50))); + + receiver2.close(); + + assertEquals(MSG_COUNT - 2, queueView.getQueueSize()); + + connection.close(); + } + + @Ignore("Test fails currently due to improper implementation of drain.") + @Test(timeout = 60000) + public void testReceiverCanDrainMessages() throws Exception { + int MSG_COUNT = 20; + sendMessages(getTestName(), MSG_COUNT, false); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + + QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getQueueSize()); + assertEquals(0, queueView.getDispatchCount()); + + receiver.drain(MSG_COUNT); + for (int i = 0; i < MSG_COUNT; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + } + receiver.close(); + + assertEquals(0, queueView.getQueueSize()); + + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java new file mode 100644 index 0000000..3f6a454 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +/** + * Test broker behavior when creating AMQP senders + */ +public class AmqpSenderTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testCreateQueueSender() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getQueues().length); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + + assertEquals(1, brokerService.getAdminView().getQueues().length); + assertNotNull(getProxyToQueue(getTestName())); + assertEquals(1, brokerService.getAdminView().getQueueProducers().length); + sender.close(); + assertEquals(0, brokerService.getAdminView().getQueueProducers().length); + + connection.close(); + } + + @Test(timeout = 60000) + public void testCreateTopicSender() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + assertEquals(0, brokerService.getAdminView().getTopics().length); + + AmqpSender sender = session.createSender("topic://" + getTestName()); + + assertEquals(1, brokerService.getAdminView().getTopics().length); + assertNotNull(getProxyToTopic(getTestName())); + assertEquals(1, brokerService.getAdminView().getTopicProducers().length); + sender.close(); + assertEquals(0, brokerService.getAdminView().getTopicProducers().length); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageToQueue() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + AmqpMessage message = new AmqpMessage(); + + message.setText("Test-Message"); + + sender.send(message); + + QueueViewMBean queue = getProxyToQueue(getTestName()); + + assertEquals(1, queue.getQueueSize()); + + sender.close(); + connection.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/72839b78/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSessionTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSessionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSessionTest.java new file mode 100644 index 0000000..b8f456f --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSessionTest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertNotNull; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +/** + * Test for creation and configuration of AMQP sessions. + */ +public class AmqpSessionTest extends AmqpClientTestSupport { + + @Test + public void testCreateSession() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + assertNotNull(session); + connection.close(); + } +}
