Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/QoS.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/QoS.java?rev=1408852&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/QoS.java (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/QoS.java Tue Nov 13 17:41:01 2012 @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.amqp.hawtdispatch.api; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public enum QoS { + AT_MOST_ONCE, + AT_LEAST_ONCE, + EXACTLY_ONCE +}
Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/TransportState.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/TransportState.java?rev=1408852&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/TransportState.java (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/api/TransportState.java Tue Nov 13 17:41:01 2012 @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.amqp.hawtdispatch.api; + +/** +* @author <a href="http://hiramchirino.com">Hiram Chirino</a> +*/ +public enum TransportState { + CREATED, + CONNECTING, + CONNECTED, + DISCONNECTING, + DISCONNECTED +} Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpHeader.java (from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpHeader.java?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpHeader.java&p1=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java&r1=1406782&r2=1408852&rev=1408852&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpHeader.java (original) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpHeader.java Tue Nov 13 17:41:01 2012 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.apollo.amqp.hawtdispatch; +package org.apache.activemq.apollo.amqp.hawtdispatch.impl; import org.fusesource.hawtbuf.Buffer; Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpListener.java (from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpListener.java?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpListener.java&p1=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java&r1=1406782&r2=1408852&rev=1408852&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpListener.java (original) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpListener.java Tue Nov 13 17:41:01 2012 @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.activemq.apollo.amqp.hawtdispatch; +package org.apache.activemq.apollo.amqp.hawtdispatch.impl; import org.apache.qpid.proton.engine.*; +import org.apache.qpid.proton.engine.impl.EndpointImpl; import org.apache.qpid.proton.engine.impl.TransportImpl; import org.apache.qpid.proton.type.messaging.Accepted; import org.fusesource.hawtdispatch.Task; @@ -29,7 +30,7 @@ import java.io.IOException; */ public class AmqpListener { - public Sasl processSaslConnect(TransportImpl protonTransport) { + public Sasl processSaslConnect(TransportImpl transport) { return null; } @@ -37,46 +38,25 @@ public class AmqpListener { return sasl; } - public void processConnectionOpen(Connection conn, Task onComplete) { - conn.open(); - onComplete.run(); - } - public void processConnectionClose(Connection conn, Task onComplete){ - conn.close(); - onComplete.run(); - } - - public void proccessSessionOpen(Session session, Task onComplete){ - session.open(); - onComplete.run(); - } - public void processSessionClose(Session session, Task onComplete){ - session.close(); + public void processRemoteOpen(Endpoint endpoint, Task onComplete) { + ((EndpointImpl)endpoint).setLocalError(new EndpointError("error", "Not supported")); + endpoint.close(); onComplete.run(); } - public void processSenderOpen(Sender sender, Task onComplete) { - sender.close(); - onComplete.run(); - } - public void processSenderClose(Sender sender, Task onComplete){ - sender.close(); + public void processRemoteClose(Endpoint endpoint, Task onComplete) { + endpoint.close(); onComplete.run(); } - public void processReceiverOpen(Receiver receiver, Task onComplete) { - receiver.open(); - onComplete.run(); - } - public void processReceiverClose(Receiver receiver, Task onComplete) { - receiver.close(); - onComplete.run(); + public void processDelivery(Delivery delivery){ } - public void processDelivery(Receiver receiver, Delivery delivery){ + public void processTransportConnected() { } - public void processDelivery(Sender sender, Delivery delivery) { + public void processTransportFailure(IOException e) { + this.processFailure(e); } public void processFailure(Throwable e) { @@ -86,10 +66,4 @@ public class AmqpListener { public void processRefill() { } - public void processTransportConnected() { - } - - public void processTransportFailure(IOException e) { - e.printStackTrace(); - } } Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java (from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java&p1=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java&r1=1406782&r2=1408852&rev=1408852&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/AmqpProtocolCodec.java (original) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpProtocolCodec.java Tue Nov 13 17:41:01 2012 @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.activemq.apollo.amqp.hawtdispatch; +package org.apache.activemq.apollo.amqp.hawtdispatch.impl; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec; @@ -29,6 +29,8 @@ import java.io.IOException; */ public class AmqpProtocolCodec extends AbstractProtocolCodec { + int maxFrameSize = 4*1024*1024; + @Override protected void encode(Object object) throws IOException { nextWriteBuffer.write((Buffer) object); @@ -55,8 +57,12 @@ public class AmqpProtocolCodec extends A if (sizeBytes != null) { int size = sizeBytes.bigEndianEditor().readInt(); if (size < 8) { - throw new IOException(String.format("specified frame size %d smaller than minimum frame size", size)); + throw new IOException(String.format("specified frame size %d is smaller than minimum frame size", size)); + } + if( size > maxFrameSize ) { + throw new IOException(String.format("specified frame size %d is larger than maximum frame size", size)); } + // TODO: check frame min and max size.. nextDecodeAction = readFrame(size); return nextDecodeAction.apply(); Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java?rev=1408852&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/AmqpTransport.java Tue Nov 13 17:41:01 2012 @@ -0,0 +1,574 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.amqp.hawtdispatch.impl; + +import org.apache.activemq.apollo.amqp.hawtdispatch.api.AmqpConnectOptions; +import org.apache.activemq.apollo.amqp.hawtdispatch.api.Callback; +import org.apache.activemq.apollo.amqp.hawtdispatch.api.ChainedCallback; +import org.apache.activemq.apollo.amqp.hawtdispatch.api.TransportState; +import org.apache.qpid.proton.engine.*; +import org.apache.qpid.proton.engine.impl.ConnectionImpl; +import org.apache.qpid.proton.engine.impl.ProtocolTracer; +import org.apache.qpid.proton.engine.impl.TransportImpl; +import org.apache.qpid.proton.framing.TransportFrame; +import org.apache.qpid.proton.type.transport.Flow; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.fusesource.hawtbuf.UTF8Buffer; +import org.fusesource.hawtdispatch.*; +import org.fusesource.hawtdispatch.transport.DefaultTransportListener; +import org.fusesource.hawtdispatch.transport.SslTransport; +import org.fusesource.hawtdispatch.transport.TcpTransport; +import org.fusesource.hawtdispatch.transport.Transport; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedList; + +import static org.apache.activemq.apollo.amqp.hawtdispatch.api.TransportState.*; +import static org.fusesource.hawtdispatch.Dispatch.NOOP; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class AmqpTransport extends WatchBase { + + private TransportState state = CREATED; + + final DispatchQueue queue; + final ConnectionImpl connection = new ConnectionImpl(); + Transport hawtdispatchTransport; + TransportImpl protonTransport; + Throwable failure; + CustomDispatchSource<Defer,LinkedList<Defer>> defers; + + public static final EnumSet<EndpointState> ALL_SET = EnumSet.allOf(EndpointState.class); + + private AmqpTransport(DispatchQueue queue) { + this.queue = queue; + defers = Dispatch.createSource(EventAggregators.<Defer>linkedList(), this.queue); + defers.setEventHandler(new Task(){ + public void run() { + for( Defer defer: defers.getData() ) { + assert defer.defered = true; + defer.defered = false; + defer.run(); + } + } + }); + defers.resume(); + } + + static public AmqpTransport connect(AmqpConnectOptions options) { + AmqpConnectOptions opts = options.clone(); + if( opts.getDispatchQueue() == null ) { + opts.setDispatchQueue(Dispatch.createQueue()); + } + if( opts.getBlockingExecutor() == null ) { + opts.setBlockingExecutor(AmqpConnectOptions.getBlockingThreadPool()); + } + return new AmqpTransport(opts.getDispatchQueue()).connecting(opts); + } + + private AmqpTransport connecting(final AmqpConnectOptions options) { + assert state == CREATED; + try { + state = CONNECTING; + if( options.getLocalContainerId()!=null ) { + connection.setLocalContainerId(options.getLocalContainerId()); + } + if( options.getRemoteContainerId()!=null ) { + connection.setContainer(options.getRemoteContainerId()); + } + connection.setHostname(options.getHost().getHost()); + Callback<Void> onConnect = new Callback<Void>() { + @Override + public void onSuccess(Void value) { + if( state == CONNECTED ) { + hawtdispatchTransport.setTransportListener(new AmqpTransportListener()); + fireWatches(); + } + } + + @Override + public void onFailure(Throwable value) { + if( state == CONNECTED ) { + failure = value; + disconnect(); + fireWatches(); + } + } + }; + if( options.getUser()!=null ) { + onConnect = new SaslClientHandler(options, onConnect); + } + createTransport(options, onConnect); + } catch (Throwable e) { + failure = e; + } + fireWatches(); + return this; + } + + public TransportState getState() { + return state; + } + + /** + * Creates and start a transport to the AMQP server. Passes it to the onConnect + * once the transport is connected. + * + * @param onConnect + * @throws Exception + */ + void createTransport(AmqpConnectOptions options, final Callback<Void> onConnect) throws Exception { + final TcpTransport transport; + if( options.getSslContext() !=null ) { + SslTransport ssl = new SslTransport(); + ssl.setSSLContext(options.getSslContext()); + transport = ssl; + } else { + transport = new TcpTransport(); + } + + URI host = options.getHost(); + if( host.getPort() == -1 ) { + if( options.getSslContext()!=null ) { + host = new URI(host.getScheme()+"://"+host.getHost()+":5672"); + } else { + host = new URI(host.getScheme()+"://"+host.getHost()+":5671"); + } + } + + + transport.setBlockingExecutor(options.getBlockingExecutor()); + transport.setDispatchQueue(options.getDispatchQueue()); + + transport.setMaxReadRate(options.getMaxReadRate()); + transport.setMaxWriteRate(options.getMaxWriteRate()); + transport.setReceiveBufferSize(options.getReceiveBufferSize()); + transport.setSendBufferSize(options.getSendBufferSize()); + transport.setTrafficClass(options.getTrafficClass()); + transport.setUseLocalHost(options.isUseLocalHost()); + transport.connecting(host, options.getLocalAddress()); + + transport.setTransportListener(new DefaultTransportListener(){ + public void onTransportConnected() { + if(state==CONNECTING) { + state = CONNECTED; + onConnect.onSuccess(null); + transport.resumeRead(); + } + } + + public void onTransportFailure(final IOException error) { + if(state==CONNECTING) { + onConnect.onFailure(error); + } + } + + }); + transport.connecting(host, options.getLocalAddress()); + bind(transport); + transport.start(NOOP); + } + + class SaslClientHandler extends ChainedCallback<Void, Void> { + + private final AmqpConnectOptions options; + + public SaslClientHandler(AmqpConnectOptions options, Callback<Void> next) { + super(next); + this.options = options; + } + + public void onSuccess(final Void value) { + final Sasl s = protonTransport.sasl(); + s.client(); + pumpOut(); + hawtdispatchTransport.setTransportListener(new AmqpTransportListener() { + + Sasl sasl = s; + + @Override + void process() { + if (sasl != null) { + sasl = processSaslEvent(sasl); + if (sasl == null) { + // once sasl handshake is done.. we need to read the protocol header again. + ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader(); + } + } + } + + @Override + public void onTransportFailure(IOException error) { + next.onFailure(error); + } + + @Override + void onFailure(Throwable error) { + next.onFailure(error); + } + + boolean authSent = false; + + private Sasl processSaslEvent(Sasl sasl) { + if (sasl.getOutcome() == Sasl.SaslOutcome.PN_SASL_OK) { + next.onSuccess(null); + return null; + } + HashSet<String> mechanisims = new HashSet<String>(Arrays.asList(sasl.getRemoteMechanisms())); + if (!authSent && !mechanisims.isEmpty()) { + if (!mechanisims.contains("PLAIN")) { + next.onFailure(Support.illegalState("Remote does not support plain password authentication.")); + return null; + } + authSent = true; + DataByteArrayOutputStream os = new DataByteArrayOutputStream(); + try { + os.write(new UTF8Buffer(options.getUser())); + os.writeByte(0); + if (options.getPassword() != null) { + os.write(new UTF8Buffer(options.getPassword())); + os.writeByte(0); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + Buffer buffer = os.toBuffer(); + sasl.setMechanisms(new String[]{"PLAIN"}); + sasl.send(buffer.data, buffer.offset, buffer.length); + } + return sasl; + } + }); + } + } + + class SaslServerListener extends AmqpTransportListener { + Sasl sasl; + + @Override + public void onTransportCommand(Object command) { + try { + if (command.getClass() == AmqpHeader.class) { + AmqpHeader header = (AmqpHeader)command; + switch( header.getProtocolId() ) { + case 3: // Client will be using SASL for auth.. + if( listener!=null ) { + sasl = listener.processSaslConnect(protonTransport); + break; + } + default: + AmqpTransportListener listener = new AmqpTransportListener(); + hawtdispatchTransport.setTransportListener(listener); + listener.onTransportCommand(command); + return; + } + command = header.getBuffer(); + } + } catch (Exception e) { + onFailure(e); + } + super.onTransportCommand(command); + } + + @Override + void process() { + if (sasl != null) { + sasl = listener.processSaslEvent(sasl); + } + if (sasl == null) { + // once sasl handshake is done.. we need to read the protocol header again. + ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader(); + hawtdispatchTransport.setTransportListener(new AmqpTransportListener()); + } + } + } + + static public AmqpTransport accept(Transport transport) { + return new AmqpTransport(transport.getDispatchQueue()).accepted(transport); + } + + private AmqpTransport accepted(final Transport transport) { + state = CONNECTED; + bind(transport); + hawtdispatchTransport.setTransportListener(new SaslServerListener()); + return this; + } + + private void bind(final Transport transport) { + this.hawtdispatchTransport = transport; + this.protonTransport = new TransportImpl(); + this.protonTransport.bind(connection); + if( transport.getProtocolCodec()==null ) { + try { + transport.setProtocolCodec(new AmqpProtocolCodec()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + public void defer(Defer defer) { + if( !defer.defered ) { + defer.defered = true; + defers.merge(defer); + } + } + + public void pumpOut() { + assertExecuting(); + defer(deferedPumpOut); + } + + private Defer deferedPumpOut = new Defer() { + public void run() { + doPumpOut(); + } + }; + + private void doPumpOut() { + switch(state) { + case CONNECTING: + case CONNECTED: + break; + default: + return; + } + + int size = hawtdispatchTransport.getProtocolCodec().getWriteBufferSize(); + byte data[] = new byte[size]; + boolean done = false; + int pumped = 0; + while( !done && !hawtdispatchTransport.full() ) { + int count = protonTransport.output(data, 0, size); + if( count > 0 ) { + pumped += count; + boolean accepted = hawtdispatchTransport.offer(new Buffer(data, 0, count)); + assert accepted: "Should be accepted since the transport was not full"; + } else { + done = true; + } + } + if( pumped > 0 && !hawtdispatchTransport.full() ) { + listener.processRefill(); + } + } + + public Sasl sasl; + public void fireListenerEvents() { + fireWatches(); + + if( sasl!=null ) { + sasl = listener.processSaslEvent(sasl); + if( sasl==null ) { + // once sasl handshake is done.. we need to read the protocol header again. + ((AmqpProtocolCodec)this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader(); + } + } + + context(connection).fireListenerEvents(listener); + + Session session = connection.sessionHead(ALL_SET, ALL_SET); + while(session != null) + { + context(session).fireListenerEvents(listener); + session = session.next(ALL_SET, ALL_SET); + } + + Link link = connection.linkHead(ALL_SET, ALL_SET); + while(link != null) + { + context(link).fireListenerEvents(listener); + link = link.next(ALL_SET, ALL_SET); + } + + Delivery delivery = connection.getWorkHead(); + while(delivery != null) + { + listener.processDelivery(delivery); + delivery = delivery.getWorkNext(); + } + + listener.processRefill(); + } + + + public ConnectionImpl connection() { + return connection; + } + + AmqpListener listener = new AmqpListener(); + public AmqpListener getListener() { + return listener; + } + + public void setListener(AmqpListener listener) { + this.listener = listener; + } + + public EndpointContext context(Endpoint endpoint) { + EndpointContext context = (EndpointContext) endpoint.getContext(); + if( context == null ) { + context = new EndpointContext(this, endpoint); + endpoint.setContext(context); + } + return context; + } + + class AmqpTransportListener extends DefaultTransportListener { + + @Override + public void onTransportConnected() { + if( listener!=null ) { + listener.processTransportConnected(); + } + } + + @Override + public void onRefill() { + if( listener!=null ) { + listener.processRefill(); + } + } + + @Override + public void onTransportCommand(Object command) { + if( state != CONNECTED ) { + return; + } + try { + Buffer buffer; + if (command.getClass() == AmqpHeader.class) { + buffer = ((AmqpHeader) command).getBuffer(); + } else { + buffer = (Buffer) command; + } + protonTransport.input(buffer.data, buffer.offset, buffer.length); + process(); + pumpOut(); + } catch (Exception e) { + onFailure(e); + } + } + + void process() { + fireListenerEvents(); + } + + @Override + public void onTransportFailure(IOException error) { + if( state!=CONNECTED ) { + failure = error; + listener.processTransportFailure(error); + fireWatches(); + } + } + + void onFailure(Throwable error) { + if( listener!=null ) { + failure = error; + listener.processFailure(error); + fireWatches(); + } + } + } + + public void disconnect() { + assertExecuting(); + if( state == CONNECTING || state==CONNECTED) { + state = DISCONNECTING; + if( hawtdispatchTransport!=null ) { + hawtdispatchTransport.stop(new Task(){ + public void run() { + state = DISCONNECTED; + hawtdispatchTransport = null; + protonTransport = null; + } + }); + } + } + } + + public DispatchQueue queue() { + return queue; + } + + public void assertExecuting() { + queue().assertExecuting(); + } + + public void onTransportConnected(final Callback<Void> cb) { + addWatch(new Watch() { + @Override + public boolean execute() { + if( failure !=null ) { + cb.onFailure(failure); + return true; + } + if( state!=CONNECTING ) { + cb.onSuccess(null); + return true; + } + return false; + } + }); + } + + public void onTransportDisconnected(final Callback<Void> cb) { + addWatch(new Watch() { + @Override + public boolean execute() { + if( state!=DISCONNECTED ) { + cb.onSuccess(null); + return true; + } + return false; + } + }); + } + + public void onTransportFailure(final Callback<Throwable> cb) { + addWatch(new Watch() { + @Override + public boolean execute() { + if( failure!=null ) { + cb.onSuccess(failure); + return true; + } + return false; + } + }); + } + + public Throwable getFailure() { + return failure; + } + + public void setProtocolTracer(ProtocolTracer protocolTracer) { + protonTransport.setProtocolTracer(protocolTracer); + } + + public ProtocolTracer getProtocolTracer() { + return protonTransport.getProtocolTracer(); + } +} Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Defer.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Defer.java?rev=1408852&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Defer.java (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Defer.java Tue Nov 13 17:41:01 2012 @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.amqp.hawtdispatch.impl; + +import org.fusesource.hawtdispatch.Task; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +abstract public class Defer extends Task { + boolean defered; +} Copied: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/DroppingWritableBuffer.java (from r1406782, activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/DroppingWritableBuffer.java?p2=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/DroppingWritableBuffer.java&p1=activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java&r1=1406782&r2=1408852&rev=1408852&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/DroppingWritableBuffer.java (original) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/DroppingWritableBuffer.java Tue Nov 13 17:41:01 2012 @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.activemq.apollo.amqp.hawtdispatch; +package org.apache.activemq.apollo.amqp.hawtdispatch.impl; import org.apache.qpid.proton.codec.WritableBuffer; Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/EndpointContext.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/EndpointContext.java?rev=1408852&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/EndpointContext.java (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/EndpointContext.java Tue Nov 13 17:41:01 2012 @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.amqp.hawtdispatch.impl; + +import org.apache.activemq.apollo.amqp.hawtdispatch.api.Callback; +import org.apache.qpid.proton.engine.Endpoint; +import org.apache.qpid.proton.engine.EndpointError; +import org.apache.qpid.proton.engine.EndpointState; +import org.fusesource.hawtdispatch.Task; + +import java.util.LinkedList; + +/** +* @author <a href="http://hiramchirino.com">Hiram Chirino</a> +*/ +public class EndpointContext { + + private final AmqpTransport transport; + private final Endpoint endpoint; + private Object attachment; + boolean listenerProcessing; + + public EndpointContext(AmqpTransport transport, Endpoint endpoint) { + this.transport = transport; + this.endpoint = endpoint; + } + + class ProcessedTask extends Task { + @Override + public void run() { + transport.assertExecuting(); + listenerProcessing = false; + transport.pumpOut(); + } + } + + public void fireListenerEvents(AmqpListener listener) { + if( listener!=null && !listenerProcessing ) { + if( endpoint.getLocalState() == EndpointState.UNINITIALIZED && + endpoint.getRemoteState() != EndpointState.UNINITIALIZED ) { + listenerProcessing = true; + listener.processRemoteOpen(endpoint, new ProcessedTask()); + } else if( endpoint.getLocalState() == EndpointState.ACTIVE && + endpoint.getRemoteState() == EndpointState.CLOSED ) { + listenerProcessing = true; + listener.processRemoteClose(endpoint, new ProcessedTask()); + } + } + if( attachment !=null && attachment instanceof Task ) { + ((Task) attachment).run(); + } + } + + public Object getAttachment() { + return attachment; + } + + public <T> T getAttachment(Class<T> clazz) { + return clazz.cast(getAttachment()); + } + + public void setAttachment(Object attachment) { + this.attachment = attachment; + } +} Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Support.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Support.java?rev=1408852&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Support.java (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Support.java Tue Nov 13 17:41:01 2012 @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.amqp.hawtdispatch.impl; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class Support { + + public static IllegalStateException illegalState(String msg) { + return (IllegalStateException) new IllegalStateException(msg).fillInStackTrace(); + } + + public static IllegalStateException createUnhandledEventError() { + return illegalState("Unhandled event."); + } + + public static IllegalStateException createListenerNotSetError() { + return illegalState("No connection listener set to handle message received from the server."); + } + + public static IllegalStateException createDisconnectedError() { + return illegalState("Disconnected"); + } + +} Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Watch.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Watch.java?rev=1408852&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Watch.java (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/Watch.java Tue Nov 13 17:41:01 2012 @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.amqp.hawtdispatch.impl; + +/** +* @author <a href="http://hiramchirino.com">Hiram Chirino</a> +*/ +public abstract class Watch { + /* returns true if the watch has been triggered */ + public abstract boolean execute(); +} Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/WatchBase.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/WatchBase.java?rev=1408852&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/WatchBase.java (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/hawtdispatch/impl/WatchBase.java Tue Nov 13 17:41:01 2012 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.amqp.hawtdispatch.impl; + +import org.apache.activemq.apollo.amqp.hawtdispatch.impl.Watch; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; +import org.fusesource.hawtdispatch.Task; + +import java.util.LinkedList; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +abstract public class WatchBase { + + private LinkedList<Watch> watches = new LinkedList<Watch>(); + protected void addWatch(final Watch task) { + watches.add(task); + fireWatches(); + } + + protected void fireWatches() { + if( !this.watches.isEmpty() ) { + Dispatch.getCurrentQueue().execute(new Task(){ + @Override + public void run() { + // Lets see if any of the watches are triggered. + LinkedList<Watch> tmp = watches; + watches = new LinkedList<Watch>(); + for (Watch task : tmp) { + if( !task.execute() ) { + watches.add(task); + } + } + } + }); + } + } + +} Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/jul.properties URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/jul.properties?rev=1408852&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/jul.properties (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/jul.properties Tue Nov 13 17:41:01 2012 @@ -0,0 +1,45 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# This file configures how Java Util Logging is handled. +# +# +handlers=java.util.logging.ConsoleHandler + +# Default global logging level. +# Loggers and Handlers may override this level +#.level=ALL +#RAW.level=ALL +FRM.level=ALL + +## Loggers +## ------------------------------------------ +## Loggers are usually attached to packages. +## Here, the level for each package is specified. +## The global level is used by default, so levels +## specified here simply act as an override. +#myapp.ui.level=ALL +#myapp.business.level=CONFIG +#myapp.data.level=SEVERE + +# Handlers +# ----------------------------------------- + +# --- ConsoleHandler --- +# Override of global logging level +java.util.logging.ConsoleHandler.level=ALL +java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter \ No newline at end of file Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala?rev=1408852&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala (added) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/AmqpConnectionTest.scala Tue Nov 13 17:41:01 2012 @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.amqp.test + +import org.apache.activemq.apollo.amqp.hawtdispatch.api._ +import org.apache.qpid.proton.`type`.messaging.{AmqpValue, Source, Target} +import java.util.concurrent.CountDownLatch +import org.fusesource.hawtdispatch._ + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ + +class AmqpConnectionTest extends AmqpTestSupport { + + def print_result[T](action: String)(then: => Unit): Callback[T] = new Callback[T] { + def onSuccess(value: T) { + println(action + " completed"); + then + } + + def onFailure(value: Throwable) { + println(action + " failed: " + value); + value.printStackTrace() + } + } + + def then[T](func: (T) => Unit): Callback[T] = new Callback[T] { + def onSuccess(value: T) { + func(value) + } + + def onFailure(value: Throwable) { + value.printStackTrace() + } + } + + test("Sender Open") { + val amqp = new AmqpConnectOptions(); + amqp.setHost("localhost", port) + amqp.setUser("admin"); + amqp.setPassword("password"); + + val done = new CountDownLatch(1) + val connection = AmqpConnection.connect(amqp) + connection.queue() { + var session = connection.createSession() + val target = new Target + target.setAddress("/queue/FOO") + val sender = session.createSender(target); + val md = sender.send(session.createTextMessage("Hello World")) + md.onSettle(print_result("message sent") { + println("========================================================") + println("========================================================") + val source = new Source + source.setAddress("/queue/FOO") + val receiver = session.createReceiver(source); + receiver.resume() + receiver.setDeliveryListener(new AmqpDeliveryListener { + def onMessageDelivery(delivery: MessageDelivery) = { + println("Received: " + delivery.getMessage().getBody().asInstanceOf[AmqpValue].getValue); + delivery.settle() + done.countDown() + } + }) + }) + } + + done.await + connection.waitForDisconnected() + } +} \ No newline at end of file Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala?rev=1408852&r1=1408851&r2=1408852&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala (original) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/test/QpidJmsTest.scala Tue Nov 13 17:41:01 2012 @@ -20,10 +20,37 @@ package org.apache.activemq.apollo.amqp. import org.apache.qpid.amqp_1_0.jms.impl.{ConnectionFactoryImpl, QueueImpl} import javax.jms._ + + + + +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; + +object QpidJmsTest { + def enableJMSFrameTracing { + val out = System.out // new PrintStream(new FileOutputStream(new File("/tmp/amqp-trace.txt"))) + val handler = new Handler { + setLevel(Level.ALL) + def publish(r: LogRecord) = out.println(String.format("%s:%s", r.getLoggerName, r.getMessage)) + def flush = out.flush + def close {} + } + var log = Logger.getLogger("FRM") + log.setLevel(Level.ALL) + log.addHandler(handler) + +// log = Logger.getLogger("RAW") +// log.setLevel(Level.ALL) +// log.addHandler(handler) + } +} + /** * @author <a href="http://hiramchirino.com">Hiram Chirino</a> */ - class QpidJmsTest extends AmqpTestSupport { def createConnection: Connection = { @@ -56,6 +83,7 @@ class QpidJmsTest extends AmqpTestSuppor // } test("Send Nack Receive") { + // enableJMSFrameTracing val queue = new QueueImpl("/queue/testqueue") val nMsgs = 1 val dataFormat: String = "%01024d" Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1408852&r1=1408851&r2=1408852&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Tue Nov 13 17:41:01 2012 @@ -263,4 +263,6 @@ class Delivery { record } + def redelivered = redeliveries = ((redeliveries+1).min(Short.MaxValue)).toShort + }
