http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java new file mode 100644 index 0000000..4ebf21a --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/api/TransportState.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.api; + +/** +* @author <a href="http://hiramchirino.com">Hiram Chirino</a> +*/ +public enum TransportState { + CREATED, + CONNECTING, + CONNECTED, + DISCONNECTING, + DISCONNECTED +}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java new file mode 100644 index 0000000..de8a2cd --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpHeader.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.hawtdispatch.impl; + +import org.fusesource.hawtbuf.Buffer; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class AmqpHeader { + + static final Buffer PREFIX = new Buffer(new byte[]{ + 'A', 'M', 'Q', 'P' + }); + + private Buffer buffer; + + public AmqpHeader(){ + this(new Buffer(new byte[]{ + 'A', 'M', 'Q', 'P', 0, 1, 0, 0 + })); + } + + public AmqpHeader(Buffer buffer){ + setBuffer(buffer); + } + + public int getProtocolId() { + return buffer.get(4) & 0xFF; + } + public void setProtocolId(int value) { + buffer.data[buffer.offset+4] = (byte) value; + } + + public int getMajor() { + return buffer.get(5) & 0xFF; + } + public void setMajor(int value) { + buffer.data[buffer.offset+5] = (byte) value; + } + + public int getMinor() { + return buffer.get(6) & 0xFF; + } + public void setMinor(int value) { + buffer.data[buffer.offset+6] = (byte) value; + } + + public int getRevision() { + return buffer.get(7) & 0xFF; + } + public void setRevision(int value) { + buffer.data[buffer.offset+7] = (byte) value; + } + + public Buffer getBuffer() { + return buffer; + } + public void setBuffer(Buffer value) { + if( !value.startsWith(PREFIX) || value.length()!=8 ) { + throw new IllegalArgumentException("Not an AMQP header buffer"); + } + buffer = value.buffer(); + } + + + @Override + public String toString() { + return buffer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java new file mode 100644 index 0000000..f372d99 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpListener.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.impl; + +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.*; +import org.fusesource.hawtdispatch.Task; + +import java.io.IOException; + + +/** +* @author <a href="http://hiramchirino.com">Hiram Chirino</a> +*/ +public class AmqpListener { + + public Sasl processSaslConnect(ProtonJTransport protonTransport) { + return null; + } + + public Sasl processSaslEvent(Sasl sasl) { + return sasl; + } + + public void processRemoteOpen(Endpoint endpoint, Task onComplete) { + ErrorCondition condition = endpoint.getCondition(); + condition.setCondition(Symbol.valueOf("error")); + condition.setDescription("Not supported"); + endpoint.close(); + onComplete.run(); + } + + public void processRemoteClose(Endpoint endpoint, Task onComplete) { + endpoint.close(); + onComplete.run(); + } + + public void processDelivery(Delivery delivery){ + } + + public void processTransportConnected() { + } + + public void processTransportFailure(IOException e) { + this.processFailure(e); + } + + public void processFailure(Throwable e) { + e.printStackTrace(); + } + + public void processRefill() { + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java new file mode 100644 index 0000000..13ed1e3 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpProtocolCodec.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.impl; + +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec; + +import java.io.IOException; + +/** + * A HawtDispatch protocol codec that encodes/decodes AMQP 1.0 frames. + * + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class AmqpProtocolCodec extends AbstractProtocolCodec { + + int maxFrameSize = 4*1024*1024; + + @Override + protected void encode(Object object) throws IOException { + nextWriteBuffer.write((Buffer) object); + } + + @Override + protected Action initialDecodeAction() { + return new Action() { + public Object apply() throws IOException { + Buffer magic = readBytes(8); + if (magic != null) { + nextDecodeAction = readFrameSize; + return new AmqpHeader(magic); + } else { + return null; + } + } + }; + } + + private final Action readFrameSize = new Action() { + public Object apply() throws IOException { + Buffer sizeBytes = peekBytes(4); + if (sizeBytes != null) { + int size = sizeBytes.bigEndianEditor().readInt(); + if (size < 8) { + throw new IOException(String.format("specified frame size %d is smaller than minimum frame size", size)); + } + if( size > maxFrameSize ) { + throw new IOException(String.format("specified frame size %d is larger than maximum frame size", size)); + } + + // TODO: check frame min and max size.. + nextDecodeAction = readFrame(size); + return nextDecodeAction.apply(); + } else { + return null; + } + } + }; + + + private final Action readFrame(final int size) { + return new Action() { + public Object apply() throws IOException { + Buffer frameData = readBytes(size); + if (frameData != null) { + nextDecodeAction = readFrameSize; + return frameData; + } else { + return null; + } + } + }; + } + + public int getReadBytesPendingDecode() { + return readBuffer.position() - readStart; + } + + public void skipProtocolHeader() { + nextDecodeAction = readFrameSize; + } + + public void readProtocolHeader() { + nextDecodeAction = initialDecodeAction(); + } + + public int getMaxFrameSize() { + return maxFrameSize; + } + + public void setMaxFrameSize(int maxFrameSize) { + this.maxFrameSize = maxFrameSize; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java new file mode 100644 index 0000000..9ea048b --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/AmqpTransport.java @@ -0,0 +1,587 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.hawtdispatch.impl; + +import org.apache.qpid.proton.hawtdispatch.api.AmqpConnectOptions; +import org.apache.qpid.proton.hawtdispatch.api.Callback; +import org.apache.qpid.proton.hawtdispatch.api.ChainedCallback; +import org.apache.qpid.proton.hawtdispatch.api.TransportState; +import org.apache.qpid.proton.engine.*; +import org.apache.qpid.proton.engine.impl.ByteBufferUtils; +import org.apache.qpid.proton.engine.impl.ProtocolTracer; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.fusesource.hawtbuf.UTF8Buffer; +import org.fusesource.hawtdispatch.*; +import org.fusesource.hawtdispatch.transport.DefaultTransportListener; +import org.fusesource.hawtdispatch.transport.SslTransport; +import org.fusesource.hawtdispatch.transport.TcpTransport; +import org.fusesource.hawtdispatch.transport.Transport; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedList; + +import static org.apache.qpid.proton.hawtdispatch.api.TransportState.*; +import static org.fusesource.hawtdispatch.Dispatch.NOOP; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class AmqpTransport extends WatchBase { + + private TransportState state = CREATED; + + final DispatchQueue queue; + final ProtonJConnection connection; + Transport hawtdispatchTransport; + ProtonJTransport protonTransport; + Throwable failure; + CustomDispatchSource<Defer,LinkedList<Defer>> defers; + + public static final EnumSet<EndpointState> ALL_SET = EnumSet.allOf(EndpointState.class); + + private AmqpTransport(DispatchQueue queue) { + this.queue = queue; + this.connection = (ProtonJConnection) Connection.Factory.create(); + + defers = Dispatch.createSource(EventAggregators.<Defer>linkedList(), this.queue); + defers.setEventHandler(new Task(){ + public void run() { + for( Defer defer: defers.getData() ) { + assert defer.defered = true; + defer.defered = false; + defer.run(); + } + } + }); + defers.resume(); + } + + static public AmqpTransport connect(AmqpConnectOptions options) { + AmqpConnectOptions opts = options.clone(); + if( opts.getDispatchQueue() == null ) { + opts.setDispatchQueue(Dispatch.createQueue()); + } + if( opts.getBlockingExecutor() == null ) { + opts.setBlockingExecutor(AmqpConnectOptions.getBlockingThreadPool()); + } + return new AmqpTransport(opts.getDispatchQueue()).connecting(opts); + } + + private AmqpTransport connecting(final AmqpConnectOptions options) { + assert state == CREATED; + try { + state = CONNECTING; + if( options.getLocalContainerId()!=null ) { + connection.setLocalContainerId(options.getLocalContainerId()); + } + if( options.getRemoteContainerId()!=null ) { + connection.setContainer(options.getRemoteContainerId()); + } + connection.setHostname(options.getHost().getHost()); + Callback<Void> onConnect = new Callback<Void>() { + @Override + public void onSuccess(Void value) { + if( state == CONNECTED ) { + hawtdispatchTransport.setTransportListener(new AmqpTransportListener()); + fireWatches(); + } + } + + @Override + public void onFailure(Throwable value) { + if( state == CONNECTED || state == CONNECTING ) { + failure = value; + disconnect(); + fireWatches(); + } + } + }; + if( options.getUser()!=null ) { + onConnect = new SaslClientHandler(options, onConnect); + } + createTransport(options, onConnect); + } catch (Throwable e) { + failure = e; + } + fireWatches(); + return this; + } + + public TransportState getState() { + return state; + } + + /** + * Creates and start a transport to the AMQP server. Passes it to the onConnect + * once the transport is connected. + * + * @param onConnect + * @throws Exception + */ + void createTransport(AmqpConnectOptions options, final Callback<Void> onConnect) throws Exception { + final TcpTransport transport; + if( options.getSslContext() !=null ) { + SslTransport ssl = new SslTransport(); + ssl.setSSLContext(options.getSslContext()); + transport = ssl; + } else { + transport = new TcpTransport(); + } + + URI host = options.getHost(); + if( host.getPort() == -1 ) { + if( options.getSslContext()!=null ) { + host = new URI(host.getScheme()+"://"+host.getHost()+":5672"); + } else { + host = new URI(host.getScheme()+"://"+host.getHost()+":5671"); + } + } + + + transport.setBlockingExecutor(options.getBlockingExecutor()); + transport.setDispatchQueue(options.getDispatchQueue()); + + transport.setMaxReadRate(options.getMaxReadRate()); + transport.setMaxWriteRate(options.getMaxWriteRate()); + transport.setReceiveBufferSize(options.getReceiveBufferSize()); + transport.setSendBufferSize(options.getSendBufferSize()); + transport.setTrafficClass(options.getTrafficClass()); + transport.setUseLocalHost(options.isUseLocalHost()); + transport.connecting(host, options.getLocalAddress()); + + transport.setTransportListener(new DefaultTransportListener(){ + public void onTransportConnected() { + if(state==CONNECTING) { + state = CONNECTED; + onConnect.onSuccess(null); + transport.resumeRead(); + } + } + + public void onTransportFailure(final IOException error) { + if(state==CONNECTING) { + onConnect.onFailure(error); + } + } + + }); + transport.connecting(host, options.getLocalAddress()); + bind(transport); + transport.start(NOOP); + } + + class SaslClientHandler extends ChainedCallback<Void, Void> { + + private final AmqpConnectOptions options; + + public SaslClientHandler(AmqpConnectOptions options, Callback<Void> next) { + super(next); + this.options = options; + } + + public void onSuccess(final Void value) { + final Sasl s = protonTransport.sasl(); + s.client(); + pumpOut(); + hawtdispatchTransport.setTransportListener(new AmqpTransportListener() { + + Sasl sasl = s; + + @Override + void process() { + if (sasl != null) { + sasl = processSaslEvent(sasl); + if (sasl == null) { + // once sasl handshake is done.. we need to read the protocol header again. + ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader(); + } + } + } + + @Override + public void onTransportFailure(IOException error) { + next.onFailure(error); + } + + @Override + void onFailure(Throwable error) { + next.onFailure(error); + } + + boolean authSent = false; + + private Sasl processSaslEvent(Sasl sasl) { + if (sasl.getOutcome() == Sasl.SaslOutcome.PN_SASL_OK) { + next.onSuccess(null); + return null; + } + HashSet<String> mechanisims = new HashSet<String>(Arrays.asList(sasl.getRemoteMechanisms())); + if (!authSent && !mechanisims.isEmpty()) { + if (mechanisims.contains("PLAIN")) { + authSent = true; + DataByteArrayOutputStream os = new DataByteArrayOutputStream(); + try { + os.writeByte(0); + os.write(new UTF8Buffer(options.getUser())); + os.writeByte(0); + if (options.getPassword() != null) { + os.write(new UTF8Buffer(options.getPassword())); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + Buffer buffer = os.toBuffer(); + sasl.setMechanisms(new String[]{"PLAIN"}); + sasl.send(buffer.data, buffer.offset, buffer.length); + } else if (mechanisims.contains("ANONYMOUS")) { + authSent = true; + sasl.setMechanisms(new String[]{"ANONYMOUS"}); + sasl.send(new byte[0], 0, 0); + } else { + next.onFailure(Support.illegalState("Remote does not support plain password authentication.")); + return null; + } + } + return sasl; + } + }); + } + } + + class SaslServerListener extends AmqpTransportListener { + Sasl sasl; + + @Override + public void onTransportCommand(Object command) { + try { + if (command.getClass() == AmqpHeader.class) { + AmqpHeader header = (AmqpHeader)command; + switch( header.getProtocolId() ) { + case 3: // Client will be using SASL for auth.. + if( listener!=null ) { + sasl = listener.processSaslConnect(protonTransport); + break; + } + default: + AmqpTransportListener listener = new AmqpTransportListener(); + hawtdispatchTransport.setTransportListener(listener); + listener.onTransportCommand(command); + return; + } + command = header.getBuffer(); + } + } catch (Exception e) { + onFailure(e); + } + super.onTransportCommand(command); + } + + @Override + void process() { + if (sasl != null) { + sasl = listener.processSaslEvent(sasl); + } + if (sasl == null) { + // once sasl handshake is done.. we need to read the protocol header again. + ((AmqpProtocolCodec) hawtdispatchTransport.getProtocolCodec()).readProtocolHeader(); + hawtdispatchTransport.setTransportListener(new AmqpTransportListener()); + } + } + } + + static public AmqpTransport accept(Transport transport) { + return new AmqpTransport(transport.getDispatchQueue()).accepted(transport); + } + + private AmqpTransport accepted(final Transport transport) { + state = CONNECTED; + bind(transport); + hawtdispatchTransport.setTransportListener(new SaslServerListener()); + return this; + } + + private void bind(final Transport transport) { + this.hawtdispatchTransport = transport; + this.protonTransport = (ProtonJTransport) org.apache.qpid.proton.engine.Transport.Factory.create(); + this.protonTransport.bind(connection); + if( transport.getProtocolCodec()==null ) { + try { + transport.setProtocolCodec(new AmqpProtocolCodec()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + public void defer(Defer defer) { + if( !defer.defered ) { + defer.defered = true; + defers.merge(defer); + } + } + + public void pumpOut() { + assertExecuting(); + defer(deferedPumpOut); + } + + private Defer deferedPumpOut = new Defer() { + public void run() { + doPumpOut(); + } + }; + + private void doPumpOut() { + switch(state) { + case CONNECTING: + case CONNECTED: + break; + default: + return; + } + + int size = hawtdispatchTransport.getProtocolCodec().getWriteBufferSize(); + byte data[] = new byte[size]; + boolean done = false; + int pumped = 0; + while( !done && !hawtdispatchTransport.full() ) { + int count = protonTransport.output(data, 0, size); + if( count > 0 ) { + pumped += count; + boolean accepted = hawtdispatchTransport.offer(new Buffer(data, 0, count)); + assert accepted: "Should be accepted since the transport was not full"; + } else { + done = true; + } + } + if( pumped > 0 && !hawtdispatchTransport.full() ) { + listener.processRefill(); + } + } + + public Sasl sasl; + public void fireListenerEvents() { + fireWatches(); + + if( sasl!=null ) { + sasl = listener.processSaslEvent(sasl); + if( sasl==null ) { + // once sasl handshake is done.. we need to read the protocol header again. + ((AmqpProtocolCodec)this.hawtdispatchTransport.getProtocolCodec()).readProtocolHeader(); + } + } + + context(connection).fireListenerEvents(listener); + + Session session = connection.sessionHead(ALL_SET, ALL_SET); + while(session != null) + { + context(session).fireListenerEvents(listener); + session = session.next(ALL_SET, ALL_SET); + } + + Link link = connection.linkHead(ALL_SET, ALL_SET); + while(link != null) + { + context(link).fireListenerEvents(listener); + link = link.next(ALL_SET, ALL_SET); + } + + Delivery delivery = connection.getWorkHead(); + while(delivery != null) + { + listener.processDelivery(delivery); + delivery = delivery.getWorkNext(); + } + + listener.processRefill(); + } + + + public ProtonJConnection connection() { + return connection; + } + + AmqpListener listener = new AmqpListener(); + public AmqpListener getListener() { + return listener; + } + + public void setListener(AmqpListener listener) { + this.listener = listener; + } + + public EndpointContext context(Endpoint endpoint) { + EndpointContext context = (EndpointContext) endpoint.getContext(); + if( context == null ) { + context = new EndpointContext(this, endpoint); + endpoint.setContext(context); + } + return context; + } + + class AmqpTransportListener extends DefaultTransportListener { + + @Override + public void onTransportConnected() { + if( listener!=null ) { + listener.processTransportConnected(); + } + } + + @Override + public void onRefill() { + if( listener!=null ) { + listener.processRefill(); + } + } + + @Override + public void onTransportCommand(Object command) { + if( state != CONNECTED ) { + return; + } + try { + Buffer buffer; + if (command.getClass() == AmqpHeader.class) { + buffer = ((AmqpHeader) command).getBuffer(); + } else { + buffer = (Buffer) command; + } + ByteBuffer bbuffer = buffer.toByteBuffer(); + do { + ByteBuffer input = protonTransport.getInputBuffer(); + ByteBufferUtils.pour(bbuffer, input); + protonTransport.processInput(); + } while (bbuffer.remaining() > 0); + process(); + pumpOut(); + } catch (Exception e) { + onFailure(e); + } + } + + void process() { + fireListenerEvents(); + } + + @Override + public void onTransportFailure(IOException error) { + if( state==CONNECTED ) { + failure = error; + if( listener!=null ) { + listener.processTransportFailure(error); + fireWatches(); + } + } + } + + void onFailure(Throwable error) { + failure = error; + if( listener!=null ) { + listener.processFailure(error); + fireWatches(); + } + } + } + + public void disconnect() { + assertExecuting(); + if( state == CONNECTING || state==CONNECTED) { + state = DISCONNECTING; + if( hawtdispatchTransport!=null ) { + hawtdispatchTransport.stop(new Task(){ + public void run() { + state = DISCONNECTED; + hawtdispatchTransport = null; + protonTransport = null; + fireWatches(); + } + }); + } + } + } + + public DispatchQueue queue() { + return queue; + } + + public void assertExecuting() { + queue().assertExecuting(); + } + + public void onTransportConnected(final Callback<Void> cb) { + addWatch(new Watch() { + @Override + public boolean execute() { + if( failure !=null ) { + cb.onFailure(failure); + return true; + } + if( state!=CONNECTING ) { + cb.onSuccess(null); + return true; + } + return false; + } + }); + } + + public void onTransportDisconnected(final Callback<Void> cb) { + addWatch(new Watch() { + @Override + public boolean execute() { + if( state==DISCONNECTED ) { + cb.onSuccess(null); + return true; + } + return false; + } + }); + } + + public void onTransportFailure(final Callback<Throwable> cb) { + addWatch(new Watch() { + @Override + public boolean execute() { + if( failure!=null ) { + cb.onSuccess(failure); + return true; + } + return false; + } + }); + } + + public Throwable getFailure() { + return failure; + } + + public void setProtocolTracer(ProtocolTracer protocolTracer) { + protonTransport.setProtocolTracer(protocolTracer); + } + + public ProtocolTracer getProtocolTracer() { + return protonTransport.getProtocolTracer(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java new file mode 100644 index 0000000..eee8241 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Defer.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.impl; + +import org.fusesource.hawtdispatch.Task; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +abstract public class Defer extends Task { + boolean defered; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java new file mode 100644 index 0000000..c12a849 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/EndpointContext.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.impl; + +import org.apache.qpid.proton.engine.Endpoint; +import org.apache.qpid.proton.engine.EndpointState; +import org.fusesource.hawtdispatch.Task; + +/** +* @author <a href="http://hiramchirino.com">Hiram Chirino</a> +*/ +public class EndpointContext { + + private final AmqpTransport transport; + private final Endpoint endpoint; + private Object attachment; + boolean listenerProcessing; + + public EndpointContext(AmqpTransport transport, Endpoint endpoint) { + this.transport = transport; + this.endpoint = endpoint; + } + + class ProcessedTask extends Task { + @Override + public void run() { + transport.assertExecuting(); + listenerProcessing = false; + transport.pumpOut(); + } + } + + public void fireListenerEvents(AmqpListener listener) { + if( listener!=null && !listenerProcessing ) { + if( endpoint.getLocalState() == EndpointState.UNINITIALIZED && + endpoint.getRemoteState() != EndpointState.UNINITIALIZED ) { + listenerProcessing = true; + listener.processRemoteOpen(endpoint, new ProcessedTask()); + } else if( endpoint.getLocalState() == EndpointState.ACTIVE && + endpoint.getRemoteState() == EndpointState.CLOSED ) { + listenerProcessing = true; + listener.processRemoteClose(endpoint, new ProcessedTask()); + } + } + if( attachment !=null && attachment instanceof Task ) { + ((Task) attachment).run(); + } + } + + public Object getAttachment() { + return attachment; + } + + public <T> T getAttachment(Class<T> clazz) { + return clazz.cast(getAttachment()); + } + + public void setAttachment(Object attachment) { + this.attachment = attachment; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java new file mode 100644 index 0000000..8d6f83b --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Support.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.impl; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class Support { + + public static IllegalStateException illegalState(String msg) { + return (IllegalStateException) new IllegalStateException(msg).fillInStackTrace(); + } + + public static IllegalStateException createUnhandledEventError() { + return illegalState("Unhandled event."); + } + + public static IllegalStateException createListenerNotSetError() { + return illegalState("No connection listener set to handle message received from the server."); + } + + public static IllegalStateException createDisconnectedError() { + return illegalState("Disconnected"); + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java new file mode 100644 index 0000000..6bb7603 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/Watch.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.impl; + +/** +* @author <a href="http://hiramchirino.com">Hiram Chirino</a> +*/ +public abstract class Watch { + /* returns true if the watch has been triggered */ + public abstract boolean execute(); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java new file mode 100644 index 0000000..a4b1591 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/main/java/org/apache/qpid/proton/hawtdispatch/impl/WatchBase.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.proton.hawtdispatch.impl; + +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.Task; + +import java.util.LinkedList; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +abstract public class WatchBase { + + private LinkedList<Watch> watches = new LinkedList<Watch>(); + protected void addWatch(final Watch task) { + watches.add(task); + fireWatches(); + } + + protected void fireWatches() { + if( !this.watches.isEmpty() ) { + Dispatch.getCurrentQueue().execute(new Task(){ + @Override + public void run() { + // Lets see if any of the watches are triggered. + LinkedList<Watch> tmp = watches; + watches = new LinkedList<Watch>(); + for (Watch task : tmp) { + if( !task.execute() ) { + watches.add(task); + } + } + } + }); + } + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java b/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java new file mode 100644 index 0000000..7cbed14 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/api/SampleTest.java @@ -0,0 +1,292 @@ +package org.apache.qpid.proton.hawtdispatch.api; + +import static junit.framework.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.net.URISyntaxException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.hawtdispatch.test.MessengerServer; +import org.apache.qpid.proton.message.Message; +import org.fusesource.hawtdispatch.Task; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +/** + * Hello world! + * + */ + +public class SampleTest { + + private static final Logger _logger = Logger.getLogger(SampleTest.class.getName()); + + private MessengerServer server; + + @Before + public void startServer() { + server = new MessengerServer(); + server.start(); + } + + @After + public void stopServer() { + server.stop(); + } + + @Test + public void test() throws Exception { + int expected = 10; + final AtomicInteger countdown = new AtomicInteger(expected); + AmqpConnectOptions options = new AmqpConnectOptions(); + final String container = UUID.randomUUID().toString(); + try { + options.setHost(server.getHost(), server.getPort()); + options.setLocalContainerId(container); + options.setUser("anonymous"); + options.setPassword("changeit"); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + final AmqpConnection conn = AmqpConnection.connect(options ); + _logger.fine("connection queue"); + conn.queue().execute(new Task() { + + @Override + public void run() { + _logger.fine("connection running, setup callbacks"); + conn.onTransportFailure(new Callback<Throwable>() { + + @Override + public void onSuccess(Throwable value) { + _logger.fine("transportFailure Success? " + str(value)); + conn.close(); + } + + @Override + public void onFailure(Throwable value) { + _logger.fine("transportFailure Trouble! " + str(value)); + conn.close(); + } + }); + + conn.onConnected(new Callback<Void>() { + + @Override + public void onSuccess(Void value) { + _logger.fine("on connect Success! in container " + container); + final AmqpSession session = conn.createSession(); + Target rqtarget = new Target(); + rqtarget.setAddress("rq-tgt"); + final AmqpSender sender = session.createSender(rqtarget, QoS.AT_LEAST_ONCE, "request-yyy"); + Source rqsource = new Source(); + rqsource.setAddress("rs-src"); + sender.getEndpoint().setSource(rqsource); + Source rssource = new Source(); + rssource.setAddress("rs-src"); + final AmqpReceiver receiver = session.createReceiver(rssource , QoS.AT_LEAST_ONCE, 10, "response-yyy"); + Target rstarget = new Target(); + final String address = "rs-tgt"; + rstarget.setAddress(address); + receiver.getEndpoint().setTarget(rstarget); + sender.onRemoteClose(new Callback<ErrorCondition>() { + + @Override + public void onSuccess(ErrorCondition value) { + _logger.fine("sender remote close!" + str(value)); + } + + @Override + public void onFailure(Throwable value) { + _logger.fine("sender remote close Trouble!" + str(value)); + conn.close(); + + } + + }); + receiver.onRemoteClose(new Callback<ErrorCondition>() { + + @Override + public void onSuccess(ErrorCondition value) { + _logger.fine("receiver remote close!" + str(value)); + } + + @Override + public void onFailure(Throwable value) { + _logger.fine("receiver remote close Trouble!" + str(value)); + conn.close(); + + } + + }); + + final Task work = new Task() { + + private AtomicInteger count = new AtomicInteger(); + + @Override + public void run() { + Message message = session.createTextMessage("hello world! " + String.valueOf(count.incrementAndGet())); + message.setAddress("amqp://joze/rq-src"); + String reply_to = "amqp://" + container + "/" + address; + message.setReplyTo(reply_to); + message.setCorrelationId("correlator"); + final MessageDelivery md = sender.send(message); + md.onRemoteStateChange(new Callback<DeliveryState>() { + + @Override + public void onSuccess(DeliveryState value) { + _logger.fine("delivery remote state change! " + str(value) + + " local: "+ str(md.getLocalState()) + + " remote: " + str(md.getRemoteState())); + } + + @Override + public void onFailure(Throwable value) { + _logger.fine("remote state change Trouble!" + str(value)); + conn.close(); + } + + }); + md.onSettle(new Callback<DeliveryState>() { + + @Override + public void onSuccess(DeliveryState value) { + _logger.fine("delivery settled! " + str(value) + + " local: "+ str(md.getLocalState()) + + " remote: " + str(md.getRemoteState())); + _logger.fine("sender settle mode state " + + " local receiver " + str(sender.getEndpoint().getReceiverSettleMode()) + + " local sender " + str(sender.getEndpoint().getSenderSettleMode()) + + " remote receiver " + str(sender.getEndpoint().getRemoteReceiverSettleMode()) + + " remote sender " + str(sender.getEndpoint().getRemoteSenderSettleMode()) + + "" + ); + } + + @Override + public void onFailure(Throwable value) { + _logger.fine("delivery sending Trouble!" + str(value)); + conn.close(); + } + }); + } + + }; + receiver.setDeliveryListener(new AmqpDeliveryListener() { + + @Override + public void onMessageDelivery( + MessageDelivery delivery) { + Message message = delivery.getMessage(); + _logger.fine("incoming message delivery! " + + " local " + str(delivery.getLocalState()) + + " remote " + str(delivery.getRemoteState()) + + " message " + str(message.getBody()) + + ""); + delivery.onSettle(new Callback<DeliveryState>() { + + @Override + public void onSuccess(DeliveryState value) { + _logger.fine("incoming message settled! "); + int i = countdown.decrementAndGet(); + if ( i > 0 ) { + _logger.fine("More work " + str(i)); + work.run(); + } else { + conn.queue().executeAfter(100, TimeUnit.MILLISECONDS, new Task() { + + @Override + public void run() { + _logger.fine("stopping sender"); + sender.close(); + } + }); + conn.queue().executeAfter(200, TimeUnit.MILLISECONDS, new Task() { + + @Override + public void run() { + _logger.fine("stopping receiver"); + receiver.close(); + + } + }); + conn.queue().executeAfter(300, TimeUnit.MILLISECONDS, new Task() { + + @Override + public void run() { + _logger.fine("stopping session"); + session.close(); + + } + }); + conn.queue().executeAfter(400, TimeUnit.MILLISECONDS, new Task() { + + @Override + public void run() { + _logger.fine("stopping connection"); + conn.close(); + + } + }); + } + } + + @Override + public void onFailure(Throwable value) { + _logger.fine("trouble settling incoming message " + str(value)); + conn.close(); + } + }); + delivery.settle(); + } + + }); + + // start the receiver + receiver.resume(); + + // send first message + conn.queue().execute(work); + } + + @Override + public void onFailure(Throwable value) { + _logger.fine("on connect Failure?" + str(value)); + conn.close(); + } + }); + _logger.fine("connection setup done"); + + + } + + }); + try { + _logger.fine("Waiting..."); + Future<Void> disconnectedFuture = conn.getDisconnectedFuture(); + disconnectedFuture.await(10, TimeUnit.SECONDS); + _logger.fine("done"); + assertEquals(expected, server.getMessagesReceived()); + } catch (Exception e) { + _logger.log(Level.SEVERE, "Test failed, possibly due to timeout", e); + throw e; + } + } + + private String str(Object value) { + if (value == null) + return "null"; + return value.toString(); + } + + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java ---------------------------------------------------------------------- diff --git a/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java b/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java new file mode 100644 index 0000000..dae68b6 --- /dev/null +++ b/contrib/proton-hawtdispatch/src/test/java/org/apache/qpid/proton/hawtdispatch/test/MessengerServer.java @@ -0,0 +1,135 @@ +package org.apache.qpid.proton.hawtdispatch.test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.qpid.proton.InterruptException; +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.messenger.Messenger; +import org.apache.qpid.proton.messenger.Tracker; + +public class MessengerServer { + public static final String REJECT_ME = "*REJECT-ME*"; + private int timeout = 1000; + private String host = "127.0.0.1"; + private int port = 55555; + private Messenger msgr; + private AtomicInteger messagesReceived = new AtomicInteger(0); + private AtomicInteger messagesSent = new AtomicInteger(0); + private AtomicBoolean serverShouldRun = new AtomicBoolean(); + private AtomicReference<Throwable> issues = new AtomicReference<Throwable>(); + private Thread thread; + private CountDownLatch serverStart; + + public MessengerServer() { + } + public void start() { + if (!serverShouldRun.compareAndSet(false, true)) { + throw new IllegalStateException("started twice"); + } + msgr = Proton.messenger(); + serverStart = new CountDownLatch(1); + thread = new Thread(new Runnable() { + + @Override + public void run() { + try { + msgr.start(); + msgr.subscribe("amqp://~"+host+":"+String.valueOf(port)); + serverStart.countDown(); + try { + while(serverShouldRun.get()) { + msgr.recv(100); + while (msgr.incoming() > 0) { + Message msg = msgr.get(); + messagesReceived.incrementAndGet(); + Tracker tracker = msgr.incomingTracker(); + if (REJECT_ME.equals(msg.getBody())) { + msgr.reject(tracker , 0); + } else { + msgr.accept(tracker, 0); + } + String reply_to = msg.getReplyTo(); + if (reply_to != null) { + msg.setAddress(reply_to); + msgr.put(msg); + msgr.settle(msgr.outgoingTracker(), 0); + } + } + } + } finally { + msgr.stop(); + } + } catch (InterruptException ex) { + // we're done + } catch (Exception ex) { + issues.set(ex); + } + } + + }); + thread.setName("MessengerServer"); + thread.setDaemon(true); + thread.start(); + try { + serverStart.await(); + } catch (InterruptedException e) { + msgr.interrupt(); + } + } + + public void stop() { + if (!serverShouldRun.compareAndSet(true, false)) { + return; + } + if (serverStart.getCount() == 0) + msgr.interrupt(); + try { + thread.join(timeout); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + thread = null; + if (!msgr.stopped()) + msgr.stop(); + Throwable throwable = issues.get(); + if (throwable != null) + throw new RuntimeException("Messenger server had problems", throwable); + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public void setHost(String host) { + this.host = host; + } + + public void setPort(int port) { + this.port = port; + } + public int getTimeout() { + return timeout; + } + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public int getMessagesReceived() { + return messagesReceived.get(); + } + + public int getMessagesSent() { + return messagesSent.get(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/proton-jms/pom.xml b/contrib/proton-jms/pom.xml new file mode 100644 index 0000000..4515cbb --- /dev/null +++ b/contrib/proton-jms/pom.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>proton-project</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>../..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>proton-jms</artifactId> + <name>proton-jms</name> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>proton-j</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + <version>1.1.1</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + </build> + <scm> + <url>http://svn.apache.org/viewvc/qpid/proton/</url> + </scm> + +</project> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java ---------------------------------------------------------------------- diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java new file mode 100644 index 0000000..4beb401 --- /dev/null +++ b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeInboundTransformer.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.jms; + +import javax.jms.Message; + +/** +* @author <a href="http://hiramchirino.com">Hiram Chirino</a> +*/ +public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer { + + + public AMQPNativeInboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public Message transform(EncodedMessage amqpMessage) throws Exception { + org.apache.qpid.proton.message.Message amqp = amqpMessage.decode(); + + Message rc = super.transform(amqpMessage); + + populateMessage(rc, amqp); + return rc; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java new file mode 100644 index 0000000..66ff0b1 --- /dev/null +++ b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPNativeOutboundTransformer.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.jms; + +import org.apache.qpid.proton.codec.CompositeWritableBuffer; +import org.apache.qpid.proton.codec.DroppingWritableBuffer; +import org.apache.qpid.proton.codec.WritableBuffer; +import org.apache.qpid.proton.amqp.UnsignedInteger; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageFormatException; +import java.nio.ByteBuffer; + +import org.apache.qpid.proton.message.ProtonJMessage; +/** +* @author <a href="http://hiramchirino.com">Hiram Chirino</a> +*/ +public class AMQPNativeOutboundTransformer extends OutboundTransformer { + + public AMQPNativeOutboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public EncodedMessage transform(Message msg) throws Exception { + if( msg == null ) + return null; + if( !(msg instanceof BytesMessage) ) + return null; + try { + if( !msg.getBooleanProperty(prefixVendor + "NATIVE") ) { + return null; + } + } catch (MessageFormatException e) { + return null; + } + return transform(this, (BytesMessage) msg); + } + + static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException { + long messageFormat; + try { + messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT"); + } catch (MessageFormatException e) { + return null; + } + byte data[] = new byte[(int) msg.getBodyLength()]; + int dataSize = data.length; + msg.readBytes(data); + msg.reset(); + + try { + int count = msg.getIntProperty("JMSXDeliveryCount"); + if( count > 1 ) { + + // decode... + ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(); + int offset = 0; + int len = data.length; + while( len > 0 ) { + final int decoded = amqp.decode(data, offset, len); + assert decoded > 0: "Make progress decoding the message"; + offset += decoded; + len -= decoded; + } + + // Update the DeliveryCount header... + amqp.getHeader().setDeliveryCount(new UnsignedInteger(count)); + + // Re-encode... + ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]); + final DroppingWritableBuffer overflow = new DroppingWritableBuffer(); + int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow)); + if( overflow.position() > 0 ) { + buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]); + c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer)); + } + data = buffer.array(); + dataSize = c; + } + } catch (JMSException e) { + } + + return new EncodedMessage(messageFormat, data, 0, dataSize); + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java ---------------------------------------------------------------------- diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java new file mode 100644 index 0000000..9baabdf --- /dev/null +++ b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AMQPRawInboundTransformer.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.jms; + +import javax.jms.BytesMessage; +import javax.jms.Message; + +public class AMQPRawInboundTransformer extends InboundTransformer { + + public AMQPRawInboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public Message transform(EncodedMessage amqpMessage) throws Exception { + BytesMessage rc = vendor.createBytesMessage(); + rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()); + + rc.setJMSDeliveryMode(defaultDeliveryMode); + rc.setJMSPriority(defaultPriority); + + final long now = System.currentTimeMillis(); + rc.setJMSTimestamp(now); + if( defaultTtl > 0 ) { + rc.setJMSExpiration(now + defaultTtl); + } + + rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); + rc.setBooleanProperty(prefixVendor + "NATIVE", true); + + return rc; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java new file mode 100644 index 0000000..a72198b --- /dev/null +++ b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/AutoOutboundTransformer.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.jms; + +import javax.jms.BytesMessage; +import javax.jms.Message; + +/** +* @author <a href="http://hiramchirino.com">Hiram Chirino</a> +*/ +public class AutoOutboundTransformer extends JMSMappingOutboundTransformer { + + public AutoOutboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public EncodedMessage transform(Message msg) throws Exception { + if( msg == null ) + return null; + if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) { + if( msg instanceof BytesMessage ) { + return AMQPNativeOutboundTransformer.transform(this, (BytesMessage)msg); + } else { + return null; + } + } else { + return JMSMappingOutboundTransformer.transform(this, msg); + } + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java ---------------------------------------------------------------------- diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java new file mode 100644 index 0000000..19602c9 --- /dev/null +++ b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/EncodedMessage.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.jms; + +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.amqp.Binary; + +/** + * @author <a href="http://hiramchirino.com">Hiram Chirino</a> + */ +public class EncodedMessage +{ + + private final Binary data; + final long messageFormat; + + public EncodedMessage(long messageFormat, byte[] data, int offset, int length) { + this.data = new Binary(data, offset, length); + this.messageFormat = messageFormat; + } + + public long getMessageFormat() { + return messageFormat; + } + + public Message decode() throws Exception { + Message amqp = Message.Factory.create(); + + int offset = getArrayOffset(); + int len = getLength(); + while( len > 0 ) { + final int decoded = amqp.decode(getArray(), offset, len); + assert decoded > 0: "Make progress decoding the message"; + offset += decoded; + len -= decoded; + } + + return amqp; + } + + public int getLength() + { + return data.getLength(); + } + + public int getArrayOffset() + { + return data.getArrayOffset(); + } + + public byte[] getArray() + { + return data.getArray(); + } + + @Override + public String toString() + { + return data.toString(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java ---------------------------------------------------------------------- diff --git a/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java new file mode 100644 index 0000000..0374e6a --- /dev/null +++ b/contrib/proton-jms/src/main/java/org/apache/qpid/proton/jms/InboundTransformer.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.proton.jms; + +import org.apache.qpid.proton.amqp.*; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; + +import javax.jms.*; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** +* @author <a href="http://hiramchirino.com">Hiram Chirino</a> +*/ +public abstract class InboundTransformer { + + JMSVendor vendor; + + public static final String TRANSFORMER_NATIVE = "native"; + public static final String TRANSFORMER_RAW = "raw"; + public static final String TRANSFORMER_JMS = "jms"; + + String prefixVendor = "JMS_AMQP_"; + String prefixDeliveryAnnotations = "DA_"; + String prefixMessageAnnotations= "MA_"; + String prefixFooter = "FT_"; + + int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE; + int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY; + long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE; + + public InboundTransformer(JMSVendor vendor) { + this.vendor = vendor; + } + + abstract public Message transform(EncodedMessage amqpMessage) throws Exception; + + public int getDefaultDeliveryMode() { + return defaultDeliveryMode; + } + + public void setDefaultDeliveryMode(int defaultDeliveryMode) { + this.defaultDeliveryMode = defaultDeliveryMode; + } + + public int getDefaultPriority() { + return defaultPriority; + } + + public void setDefaultPriority(int defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public long getDefaultTtl() { + return defaultTtl; + } + + public void setDefaultTtl(long defaultTtl) { + this.defaultTtl = defaultTtl; + } + + public String getPrefixVendor() { + return prefixVendor; + } + + public void setPrefixVendor(String prefixVendor) { + this.prefixVendor = prefixVendor; + } + + public JMSVendor getVendor() { + return vendor; + } + + public void setVendor(JMSVendor vendor) { + this.vendor = vendor; + } + + protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception { + Header header = amqp.getHeader(); + if( header==null ) { + header = new Header(); + } + + if( header.getDurable()!=null ) { + jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + } else { + jms.setJMSDeliveryMode(defaultDeliveryMode); + } + if( header.getPriority()!=null ) { + jms.setJMSPriority(header.getPriority().intValue()); + } else { + jms.setJMSPriority(defaultPriority); + } + if( header.getFirstAcquirer() !=null ) { + jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer()); + } + if( header.getDeliveryCount()!=null ) { + vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue()); + } + + final DeliveryAnnotations da = amqp.getDeliveryAnnotations(); + if( da!=null ) { + for (Map.Entry<?,?> entry : da.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue()); + } + } + + Class<? extends Destination> toAttributes = Destination.class; + Class<? extends Destination> replyToAttributes = Destination.class; + + final MessageAnnotations ma = amqp.getMessageAnnotations(); + if( ma!=null ) { + for (Map.Entry<?,?> entry : ma.getValue().entrySet()) { + String key = entry.getKey().toString(); + if( "x-opt-jms-type".equals(key.toString()) && entry.getValue() != null ) { + jms.setJMSType(entry.getValue().toString()); + } else if( "x-opt-to-type".equals(key.toString()) ) { + toAttributes = toClassFromAttributes(entry.getValue().toString()); + } else if( "x-opt-reply-type".equals(key.toString()) ) { + replyToAttributes = toClassFromAttributes(entry.getValue().toString()); + } else { + setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); + } + } + } + + final ApplicationProperties ap = amqp.getApplicationProperties(); + if( ap !=null ) { + for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) { + String key = entry.getKey().toString(); + if( "JMSXGroupID".equals(key) ) { + vendor.setJMSXGroupID(jms, entry.getValue().toString()); + } else if( "JMSXGroupSequence".equals(key) ) { + vendor.setJMSXGroupSequence(jms, ((Number)entry.getValue()).intValue()); + } else if( "JMSXUserID".equals(key) ) { + vendor.setJMSXUserID(jms, entry.getValue().toString()); + } else { + setProperty(jms, key, entry.getValue()); + } + } + } + + final Properties properties = amqp.getProperties(); + if( properties!=null ) { + if( properties.getMessageId()!=null ) { + jms.setJMSMessageID(properties.getMessageId().toString()); + } + Binary userId = properties.getUserId(); + if( userId!=null ) { + vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8")); + } + if( properties.getTo()!=null ) { + jms.setJMSDestination(vendor.createDestination(properties.getTo(), toAttributes)); + } + if( properties.getSubject()!=null ) { + jms.setStringProperty(prefixVendor + "Subject", properties.getSubject()); + } + if( properties.getReplyTo() !=null ) { + jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo(), replyToAttributes)); + } + if( properties.getCorrelationId() !=null ) { + jms.setJMSCorrelationID(properties.getCorrelationId().toString()); + } + if( properties.getContentType() !=null ) { + jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString()); + } + if( properties.getContentEncoding() !=null ) { + jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString()); + } + if( properties.getCreationTime()!=null ) { + jms.setJMSTimestamp(properties.getCreationTime().getTime()); + } + if( properties.getGroupId()!=null ) { + vendor.setJMSXGroupID(jms, properties.getGroupId()); + } + if( properties.getGroupSequence()!=null ) { + vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue()); + } + if( properties.getReplyToGroupId()!=null ) { + jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId()); + } + if( properties.getAbsoluteExpiryTime()!=null ) { + jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); + } + } + + // If the jms expiration has not yet been set... + if( jms.getJMSExpiration()==0 ) { + // Then lets try to set it based on the message ttl. + long ttl = defaultTtl; + if( header.getTtl()!=null ) { + ttl = header.getTtl().longValue(); + } + if( ttl == 0 ) { + jms.setJMSExpiration(0); + } else { + jms.setJMSExpiration(System.currentTimeMillis()+ttl); + } + } + + final Footer fp = amqp.getFooter(); + if( fp !=null ) { + for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue()); + } + } + } + + private static final Set<String> QUEUE_ATTRIBUTES = createSet("queue"); + private static final Set<String> TOPIC_ATTRIBUTES = createSet("topic"); + private static final Set<String> TEMP_QUEUE_ATTRIBUTES = createSet("queue", "temporary"); + private static final Set<String> TEMP_TOPIC_ATTRIBUTES = createSet("topic", "temporary"); + + private static Set<String> createSet(String ... args) { + HashSet<String> s = new HashSet<String>(); + for (String arg : args) + { + s.add(arg); + } + return Collections.unmodifiableSet(s); + } + + Class<? extends Destination> toClassFromAttributes(String value) + { + if( value ==null ) { + return null; + } + HashSet<String> attributes = new HashSet<String>(); + for( String x: value.split("\\s*,\\s*") ) { + attributes.add(x); + } + + if( QUEUE_ATTRIBUTES.equals(attributes) ) { + return Queue.class; + } + if( TOPIC_ATTRIBUTES.equals(attributes) ) { + return Topic.class; + } + if( TEMP_QUEUE_ATTRIBUTES.equals(attributes) ) { + return TemporaryQueue.class; + } + if( TEMP_TOPIC_ATTRIBUTES.equals(attributes) ) { + return TemporaryTopic.class; + } + return Destination.class; + } + + + private void setProperty(Message msg, String key, Object value) throws JMSException { + if( value instanceof UnsignedLong) { + long v = ((UnsignedLong) value).longValue(); + msg.setLongProperty(key, v); + } else if( value instanceof UnsignedInteger) { + long v = ((UnsignedInteger) value).longValue(); + if( Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE ) { + msg.setIntProperty(key, (int) v); + } else { + msg.setLongProperty(key, v); + } + } else if( value instanceof UnsignedShort) { + int v = ((UnsignedShort) value).intValue(); + if( Short.MIN_VALUE <= v && v <= Short.MAX_VALUE ) { + msg.setShortProperty(key, (short) v); + } else { + msg.setIntProperty(key, v); + } + } else if( value instanceof UnsignedByte) { + short v = ((UnsignedByte) value).shortValue(); + if( Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE ) { + msg.setByteProperty(key, (byte) v); + } else { + msg.setShortProperty(key, v); + } + } else if( value instanceof Symbol) { + msg.setStringProperty(key, value.toString()); + } else if( value instanceof Decimal128 ) { + msg.setDoubleProperty(key, ((Decimal128)value).doubleValue()); + } else if( value instanceof Decimal64 ) { + msg.setDoubleProperty(key, ((Decimal64)value).doubleValue()); + } else if( value instanceof Decimal32 ) { + msg.setFloatProperty(key, ((Decimal32)value).floatValue()); + } else if( value instanceof Binary ) { + msg.setStringProperty(key, value.toString()); + } else { + msg.setObjectProperty(key, value); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
