http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java new file mode 100644 index 0000000..4253434 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.JMSSecurityException; +import javax.jms.Session; + +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.meta.JmsConnectionInfo; +import org.apache.qpid.jms.meta.JmsSessionId; +import org.apache.qpid.jms.meta.JmsSessionInfo; +import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFactory; +import org.apache.qpid.jms.util.IOExceptionSupport; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Sasl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Connection> { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class); + + private final AmqpJmsMessageFactory amqpMessageFactory; + + private final URI remoteURI; + private final Map<JmsSessionId, AmqpSession> sessions = new HashMap<JmsSessionId, AmqpSession>(); + private final Map<JmsDestination, AmqpTemporaryDestination> tempDests = new HashMap<JmsDestination, AmqpTemporaryDestination>(); + private final AmqpProvider provider; + private boolean connected; + private AmqpSaslAuthenticator authenticator; + private final AmqpSession connectionSession; + + private String queuePrefix; + private String topicPrefix; + private String tempQueuePrefix; + private String tempTopicPrefix; + + public AmqpConnection(AmqpProvider provider, Connection protonConnection, Sasl sasl, JmsConnectionInfo info) { + super(info, protonConnection); + + this.provider = provider; + this.remoteURI = provider.getRemoteURI(); + this.amqpMessageFactory = new AmqpJmsMessageFactory(this); + + if (sasl != null) { + this.authenticator = new AmqpSaslAuthenticator(sasl, info); + } + + this.info.getConnectionId().setProviderHint(this); + + this.queuePrefix = info.getQueuePrefix(); + this.topicPrefix = info.getTopicPrefix(); + this.tempQueuePrefix = info.getTempQueuePrefix(); + this.tempTopicPrefix = info.getTempTopicPrefix(); + + // Create a Session for this connection that is used for Temporary Destinations + // and perhaps later on management and advisory monitoring. + JmsSessionInfo sessionInfo = new JmsSessionInfo(this.info, -1); + sessionInfo.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE); + + this.connectionSession = new AmqpSession(this, sessionInfo); + } + + @Override + protected void doOpen() { + this.endpoint.setContainer(info.getClientId()); + this.endpoint.setHostname(remoteURI.getHost()); + } + + @Override + protected void doClose() { + } + + public AmqpSession createSession(JmsSessionInfo sessionInfo) { + AmqpSession session = new AmqpSession(this, sessionInfo); + return session; + } + + public AmqpTemporaryDestination createTemporaryDestination(JmsDestination destination) { + AmqpTemporaryDestination temporary = new AmqpTemporaryDestination(connectionSession, destination); + return temporary; + } + + /** + * Called on receiving an event from Proton indicating a state change on the remote + * side of the Connection. + */ + @Override + public void processStateChange() { + + if (!connected && isOpen()) { + connected = true; + connectionSession.open(new AsyncResult() { + + @Override + public boolean isComplete() { + return connected; + } + + @Override + public void onSuccess() { + LOG.debug("AMQP Connection Session opened."); + opened(); + } + + @Override + public void onFailure(Throwable result) { + LOG.debug("AMQP Connection Session failed to open."); + failed(IOExceptionSupport.create(result)); + } + }); + } + + EndpointState localState = endpoint.getLocalState(); + EndpointState remoteState = endpoint.getRemoteState(); + + // We are still active (connected or not) and something on the remote end has + // closed us, signal an error if one was sent. + if (localState == EndpointState.ACTIVE && remoteState != EndpointState.ACTIVE) { + if (endpoint.getRemoteCondition().getCondition() != null) { + LOG.info("Error condition detected on Connection open {}.", endpoint.getRemoteCondition().getCondition()); + Exception remoteError = getRemoteError(); + if (isAwaitingOpen()) { + openRequest.onFailure(remoteError); + } else { + provider.fireProviderException(remoteError); + } + } + } + + // Transition cleanly to closed state. + if (localState == EndpointState.CLOSED && remoteState == EndpointState.CLOSED) { + LOG.debug("{} has been closed successfully.", this); + closed(); + } + } + + public void processSaslAuthentication() { + if (connected || authenticator == null) { + return; + } + + try { + if (authenticator.authenticate()) { + authenticator = null; + } + } catch (JMSSecurityException ex) { + failed(ex); + } + } + + void addTemporaryDestination(AmqpTemporaryDestination destination) { + tempDests.put(destination.getJmsDestination(), destination); + } + + void removeTemporaryDestination(AmqpTemporaryDestination destination) { + tempDests.remove(destination.getJmsDestination()); + } + + void addSession(AmqpSession session) { + this.sessions.put(session.getSessionId(), session); + } + + void removeSession(AmqpSession session) { + this.sessions.remove(session.getSessionId()); + } + + public JmsConnectionInfo getConnectionInfo() { + return this.info; + } + + public Connection getProtonConnection() { + return this.endpoint; + } + + public URI getRemoteURI() { + return this.remoteURI; + } + + public String getUsername() { + return this.info.getUsername(); + } + + public String getPassword() { + return this.info.getPassword(); + } + + public AmqpProvider getProvider() { + return this.provider; + } + + public String getQueuePrefix() { + return queuePrefix; + } + + public void setQueuePrefix(String queuePrefix) { + this.queuePrefix = queuePrefix; + } + + public String getTopicPrefix() { + return topicPrefix; + } + + public void setTopicPrefix(String topicPrefix) { + this.topicPrefix = topicPrefix; + } + + public String getTempQueuePrefix() { + return tempQueuePrefix; + } + + public void setTempQueuePrefix(String tempQueuePrefix) { + this.tempQueuePrefix = tempQueuePrefix; + } + + public String getTempTopicPrefix() { + return tempTopicPrefix; + } + + public void setTempTopicPrefix(String tempTopicPrefix) { + this.tempTopicPrefix = tempTopicPrefix; + } + + /** + * Retrieve the indicated Session instance from the list of active sessions. + * + * @param sessionId + * The JmsSessionId that's associated with the target session. + * + * @return the AmqpSession associated with the given id. + */ + public AmqpSession getSession(JmsSessionId sessionId) { + if (sessionId.getProviderHint() instanceof AmqpSession) { + return (AmqpSession) sessionId.getProviderHint(); + } + return this.sessions.get(sessionId); + } + + /** + * @return true if the provider has been configured for presettle operations. + */ + public boolean isPresettleConsumers() { + return provider.isPresettleConsumers(); + } + + /** + * @return true if the provider has been configured for presettle operations. + */ + public boolean isPresettleProducers() { + return provider.isPresettleProducers(); + } + + /** + * @return the AMQP based JmsMessageFactory for this Connection. + */ + public AmqpJmsMessageFactory getAmqpMessageFactory() { + return this.amqpMessageFactory; + } + + @Override + public String toString() { + return "AmqpConnection { " + getConnectionInfo().getConnectionId() + " }"; + } +} +
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java new file mode 100644 index 0000000..3ecb58e --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.message.JmsMessage; +import org.apache.qpid.jms.meta.JmsConsumerId; +import org.apache.qpid.jms.meta.JmsConsumerInfo; +import org.apache.qpid.jms.meta.JmsMessageId; +import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.ProviderListener; +import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; +import org.apache.qpid.jms.util.IOExceptionSupport; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.apache.qpid.proton.amqp.transaction.TransactionalState; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.jms.EncodedMessage; +import org.apache.qpid.proton.jms.InboundTransformer; +import org.apache.qpid.proton.jms.JMSMappingInboundTransformer; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.ByteArrayOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * AMQP Consumer object that is used to manage JMS MessageConsumer semantics. + */ +public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver> { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class); + + protected static final Symbol COPY = Symbol.getSymbol("copy"); + protected static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf("no-local"); + protected static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf("jms-selector"); + + protected final AmqpSession session; + protected final InboundTransformer inboundTransformer = + new JMSMappingInboundTransformer(AmqpJMSVendor.INSTANCE);; + protected final Map<JmsMessageId, Delivery> delivered = new LinkedHashMap<JmsMessageId, Delivery>(); + protected boolean presettle; + + private final ByteArrayOutputStream streamBuffer = new ByteArrayOutputStream(); + private final byte incomingBuffer[] = new byte[1024 * 64]; + + public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) { + super(info); + this.session = session; + + // Add a shortcut back to this Consumer for quicker lookups + this.info.getConsumerId().setProviderHint(this); + } + + /** + * Starts the consumer by setting the link credit to the given prefetch value. + */ + public void start(AsyncResult request) { + this.endpoint.flow(info.getPrefetchSize()); + request.onSuccess(); + } + + @Override + protected void doOpen() { + JmsDestination destination = info.getDestination(); + String subscription = session.getQualifiedName(destination); + + Source source = new Source(); + source.setAddress(subscription); + Target target = new Target(); + + configureSource(source); + + String receiverName = getConsumerId() + ":" + subscription; + if (info.getSubscriptionName() != null && !info.getSubscriptionName().isEmpty()) { + // In the case of Durable Topic Subscriptions the client must use the same + // receiver name which is derived from the subscription name property. + receiverName = info.getSubscriptionName(); + } + + endpoint = session.getProtonSession().receiver(receiverName); + endpoint.setSource(source); + endpoint.setTarget(target); + if (isPresettle()) { + endpoint.setSenderSettleMode(SenderSettleMode.SETTLED); + } else { + endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); + } + endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); + } + + @Override + public void opened() { + this.session.addResource(this); + super.opened(); + } + + @Override + public void closed() { + this.session.removeResource(this); + super.closed(); + } + + protected void configureSource(Source source) { + Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>(); + + if (info.getSubscriptionName() != null && !info.getSubscriptionName().isEmpty()) { + source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); + source.setDurable(TerminusDurability.UNSETTLED_STATE); + source.setDistributionMode(COPY); + } else { + source.setDurable(TerminusDurability.NONE); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + } + + if (info.isNoLocal()) { + filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL); + } + + if (info.getSelector() != null && !info.getSelector().trim().equals("")) { + filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(info.getSelector())); + } + + if (!filters.isEmpty()) { + source.setFilter(filters); + } + } + + /** + * Called to acknowledge all messages that have been marked as delivered but + * have not yet been marked consumed. Usually this is called as part of an + * client acknowledge session operation. + * + * Only messages that have already been acknowledged as delivered by the JMS + * framework will be in the delivered Map. This means that the link credit + * would already have been given for these so we just need to settle them. + */ + public void acknowledge() { + LOG.trace("Session Acknowledge for consumer: {}", info.getConsumerId()); + for (Delivery delivery : delivered.values()) { + delivery.disposition(Accepted.getInstance()); + delivery.settle(); + } + delivered.clear(); + } + + /** + * Called to acknowledge a given delivery. Depending on the Ack Mode that + * the consumer was created with this method can acknowledge more than just + * the target delivery. + * + * @param envelope + * the delivery that is to be acknowledged. + * @param ackType + * the type of acknowledgment to perform. + * + * @throws JMSException if an error occurs accessing the Message properties. + */ + public void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException { + JmsMessageId messageId = envelope.getMessage().getFacade().getMessageId(); + Delivery delivery = null; + + if (messageId.getProviderHint() instanceof Delivery) { + delivery = (Delivery) messageId.getProviderHint(); + } else { + delivery = delivered.get(messageId); + if (delivery == null) { + LOG.warn("Received Ack for unknown message: {}", messageId); + return; + } + } + + if (ackType.equals(ACK_TYPE.DELIVERED)) { + LOG.debug("Delivered Ack of message: {}", messageId); + if (session.isTransacted()) { + Binary txnId = session.getTransactionContext().getAmqpTransactionId(); + if (txnId != null) { + TransactionalState txState = new TransactionalState(); + txState.setOutcome(Accepted.getInstance()); + txState.setTxnId(txnId); + delivery.disposition(txState); + session.getTransactionContext().registerTxConsumer(this); + } + } + if (!isPresettle()) { + delivered.put(messageId, delivery); + } + sendFlowIfNeeded(); + } else if (ackType.equals(ACK_TYPE.CONSUMED)) { + // A Consumer may not always send a delivered ACK so we need to check to + // ensure we don't add to much credit to the link. + if (isPresettle() || delivered.remove(messageId) == null) { + sendFlowIfNeeded(); + } + LOG.debug("Consumed Ack of message: {}", messageId); + if (!delivery.isSettled()) { + delivery.disposition(Accepted.getInstance()); + delivery.settle(); + } + } else if (ackType.equals(ACK_TYPE.REDELIVERED)) { + Modified disposition = new Modified(); + disposition.setUndeliverableHere(false); + disposition.setDeliveryFailed(true); + delivery.disposition(disposition); + delivery.settle(); + } else if (ackType.equals(ACK_TYPE.POISONED)) { + deliveryFailed(delivery, false); + } else { + LOG.warn("Unsupporeted Ack Type for message: {}", messageId); + } + } + + /** + * We only send more credits as the credit window dwindles to a certain point and + * then we open the window back up to full prefetch size. + */ + private void sendFlowIfNeeded() { + if (info.getPrefetchSize() == 0) { + return; + } + + int currentCredit = endpoint.getCredit(); + if (currentCredit <= info.getPrefetchSize() * 0.2) { + endpoint.flow(info.getPrefetchSize() - currentCredit); + } + } + + /** + * Recovers all previously delivered but not acknowledged messages. + * + * @throws Exception if an error occurs while performing the recover. + */ + public void recover() throws Exception { + LOG.debug("Session Recover for consumer: {}", info.getConsumerId()); + for (Delivery delivery : delivered.values()) { + // TODO - increment redelivery counter and apply connection redelivery policy + // to those messages that are past max redlivery. + JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) delivery.getContext(); + envelope.onMessageRedelivered(); + deliver(envelope); + } + delivered.clear(); + } + + /** + * For a consumer whose prefetch value is set to zero this method will attempt to solicite + * a new message dispatch from the broker. + * + * @param timeout + */ + public void pull(long timeout) { + if (info.getPrefetchSize() == 0 && endpoint.getCredit() == 0) { + // expand the credit window by one. + endpoint.flow(1); + } + } + + @Override + public void processDeliveryUpdates() throws IOException { + Delivery incoming = null; + do { + incoming = endpoint.current(); + if (incoming != null && incoming.isReadable() && !incoming.isPartial()) { + LOG.trace("{} has incoming Message(s).", this); + try { + processDelivery(incoming); + } catch (Exception e) { + throw IOExceptionSupport.create(e); + } + endpoint.advance(); + } else { + LOG.trace("{} has a partial incoming Message(s), deferring.", this); + incoming = null; + } + } while (incoming != null); + } + + private void processDelivery(Delivery incoming) throws Exception { + EncodedMessage encoded = readIncomingMessage(incoming); + JmsMessage message = null; + try { + message = (JmsMessage) inboundTransformer.transform(encoded); + } catch (Exception e) { + LOG.warn("Error on transform: {}", e.getMessage()); + // TODO - We could signal provider error but not sure we want to fail + // the connection just because we can't convert the message. + // In the future once the JMS mapping is complete we should be + // able to convert everything to some message even if its just + // a bytes messages as a fall back. + deliveryFailed(incoming, true); + return; + } + + try { + message.setJMSDestination(info.getDestination()); + } catch (JMSException e) { + LOG.warn("Error on transform: {}", e.getMessage()); + // TODO - We could signal provider error but not sure we want to fail + // the connection just because we can't convert the destination. + deliveryFailed(incoming, true); + return; + } + + // Store link to delivery in the hint for use in acknowledge requests. + message.getFacade().getMessageId().setProviderHint(incoming); + + // We need to signal to the create message that it's being dispatched and for now + // the transformer creates the message in write mode, onSend will reset it to read + // mode and the consumer will see it as a normal received message. + message.onSend(); + + JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(); + envelope.setMessage(message); + envelope.setConsumerId(info.getConsumerId()); + envelope.setProviderHint(incoming); + + // Store reference to envelope in delivery context for recovery + incoming.setContext(envelope); + + deliver(envelope); + } + + @Override + protected void doClose() { + } + + public AmqpSession getSession() { + return this.session; + } + + public JmsConsumerId getConsumerId() { + return this.info.getConsumerId(); + } + + public Receiver getProtonReceiver() { + return this.endpoint; + } + + public boolean isBrowser() { + return false; + } + + public boolean isPresettle() { + return presettle; + } + + public void setPresettle(boolean presettle) { + this.presettle = presettle; + } + + @Override + public String toString() { + return "AmqpConsumer { " + this.info.getConsumerId() + " }"; + } + + protected void deliveryFailed(Delivery incoming, boolean expandCredit) { + Modified disposition = new Modified(); + disposition.setUndeliverableHere(true); + disposition.setDeliveryFailed(true); + incoming.disposition(disposition); + incoming.settle(); + if (expandCredit) { + endpoint.flow(1); + } + } + + protected void deliver(JmsInboundMessageDispatch envelope) throws Exception { + ProviderListener listener = session.getProvider().getProviderListener(); + if (listener != null) { + if (envelope.getMessage() != null) { + LOG.debug("Dispatching received message: {}", envelope.getMessage().getFacade().getMessageId()); + } else { + LOG.debug("Dispatching end of browse to: {}", envelope.getConsumerId()); + } + listener.onMessage(envelope); + } else { + LOG.error("Provider listener is not set, message will be dropped."); + } + } + + protected EncodedMessage readIncomingMessage(Delivery incoming) { + Buffer buffer; + int count; + + while ((count = endpoint.recv(incomingBuffer, 0, incomingBuffer.length)) > 0) { + streamBuffer.write(incomingBuffer, 0, count); + } + + buffer = streamBuffer.toBuffer(); + + try { + return new EncodedMessage(incoming.getMessageFormat(), buffer.data, buffer.offset, buffer.length); + } finally { + streamBuffer.reset(); + } + } + + public void preCommit() { + } + + public void preRollback() { + } + + /** + * Ensures that all delivered messages are marked as settled locally before the TX state + * is cleared and the next TX started. + * + * @throws Exception if an error occurs while performing this action. + */ + public void postCommit() throws Exception { + for (Delivery delivery : delivered.values()) { + delivery.settle(); + } + this.delivered.clear(); + } + + /** + * Redeliver Acknowledge all previously delivered messages and clear state to prepare for + * the next TX to start. + * + * @throws Exception if an error occurs while performing this action. + */ + public void postRollback() throws Exception { + for (Delivery delivery : delivered.values()) { + JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) delivery.getContext(); + acknowledge(envelope, ACK_TYPE.REDELIVERED); + } + this.delivered.clear(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java new file mode 100644 index 0000000..a36e419 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.message.JmsMessage; +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; +import org.apache.qpid.jms.message.facade.JmsMessageFacade; +import org.apache.qpid.jms.meta.JmsProducerInfo; +import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade; +import org.apache.qpid.jms.util.IOExceptionSupport; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Outcome; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transaction.TransactionalState; +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.jms.AutoOutboundTransformer; +import org.apache.qpid.proton.jms.EncodedMessage; +import org.apache.qpid.proton.jms.OutboundTransformer; +import org.apache.qpid.proton.message.Message; +import org.fusesource.hawtbuf.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * AMQP Producer object that is used to manage JMS MessageProducer semantics. + * + * This Producer is fixed to a given JmsDestination and can only produce messages to it. + */ +public class AmqpFixedProducer extends AmqpProducer { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpFixedProducer.class); + private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; + + private final AmqpTransferTagGenerator tagGenerator = new AmqpTransferTagGenerator(true); + private final Set<Delivery> pending = new LinkedHashSet<Delivery>(); + private final LinkedList<PendingSend> pendingSends = new LinkedList<PendingSend>(); + private byte[] encodeBuffer = new byte[1024 * 8]; + + private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(AmqpJMSVendor.INSTANCE); + private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT"; + private boolean presettle = false; + + public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) { + super(session, info); + } + + @Override + public void close(AsyncResult request) { + // If any sends are held we need to wait for them to complete. + if (!pendingSends.isEmpty()) { + this.closeRequest = request; + return; + } + + super.close(request); + } + + @Override + public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { + + // TODO - Handle the case where remote has no credit which means we can't send to it. + // We need to hold the send until remote credit becomes available but we should + // also have a send timeout option and filter timed out sends. + if (endpoint.getCredit() <= 0) { + LOG.trace("Holding Message send until credit is available."); + // Once a message goes into a held mode we no longer can send it async, so + // we clear the async flag if set to avoid the sender never getting notified. + envelope.setSendAsync(false); + this.pendingSends.addLast(new PendingSend(envelope, request)); + return false; + } else { + doSend(envelope, request); + return true; + } + } + + private void doSend(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { + JmsMessageFacade facade = envelope.getMessage().getFacade(); + + LOG.trace("Producer sending message: {}", envelope.getMessage().getFacade().getMessageId()); + + byte[] tag = tagGenerator.getNextTag(); + Delivery delivery = null; + + if (presettle) { + delivery = endpoint.delivery(EMPTY_BYTE_ARRAY, 0, 0); + } else { + delivery = endpoint.delivery(tag, 0, tag.length); + } + + delivery.setContext(request); + + if (session.isTransacted()) { + Binary amqpTxId = session.getTransactionContext().getAmqpTransactionId(); + TransactionalState state = new TransactionalState(); + state.setTxnId(amqpTxId); + delivery.disposition(state); + } + + JmsMessage message = envelope.getMessage(); + message.setReadOnlyBody(true); + + // TODO: why do we need this? + // Possibly because AMQP spec "2.7.5 Transfer" says message format MUST be set on at least + // the first Transfer frame of a message. That is on the encoded Transfer frames though and + // this property isn't, but rather within the application-properties map. We should probably + // ensure this elsewhere (appears Proton does so itself in TransportImpl#processTransportWorkSender) + if (!message.getProperties().containsKey(MESSAGE_FORMAT_KEY)) { + message.setProperty(MESSAGE_FORMAT_KEY, 0); + } + + if (facade instanceof AmqpJmsMessageFacade) { + AmqpJmsMessageFacade amqpMessage = (AmqpJmsMessageFacade) facade; + encodeAndSend(amqpMessage.getAmqpMessage(), delivery); + } else { + encodeAndSend(envelope.getMessage(), delivery); + } + + if (presettle) { + delivery.settle(); + } else { + pending.add(delivery); + endpoint.advance(); + } + + if (envelope.isSendAsync() || presettle) { + request.onSuccess(); + } + } + + private void encodeAndSend(Message message, Delivery delivery) throws IOException { + + int encodedSize; + while (true) { + try { + encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length); + break; + } catch (java.nio.BufferOverflowException e) { + encodeBuffer = new byte[encodeBuffer.length * 2]; + } + } + + Buffer sendBuffer = new Buffer(encodeBuffer, 0, encodedSize); + + while (true) { + int sent = endpoint.send(sendBuffer.data, sendBuffer.offset, sendBuffer.length); + if (sent > 0) { + sendBuffer.moveHead(sent); + if (sendBuffer.length == 0) { + break; + } + } else { + LOG.warn("{} failed to send any data from current Message.", this); + } + } + } + + private void encodeAndSend(JmsMessage message, Delivery delivery) throws IOException { + + Buffer sendBuffer = null; + EncodedMessage amqp = null; + + try { + amqp = outboundTransformer.transform(message); + } catch (Exception e) { + throw IOExceptionSupport.create(e); + } + + if (amqp != null && amqp.getLength() > 0) { + sendBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength()); + } + + while (true) { + int sent = endpoint.send(sendBuffer.data, sendBuffer.offset, sendBuffer.length); + if (sent > 0) { + sendBuffer.moveHead(sent); + if (sendBuffer.length == 0) { + break; + } + } else { + LOG.warn("{} failed to send any data from current Message.", this); + } + } + } + + @Override + public void processFlowUpdates() throws IOException { + if (!pendingSends.isEmpty() && endpoint.getCredit() > 0) { + while (endpoint.getCredit() > 0 && !pendingSends.isEmpty()) { + LOG.trace("Dispatching previously held send"); + PendingSend held = pendingSends.pop(); + try { + doSend(held.envelope, held.request); + } catch (JMSException e) { + throw IOExceptionSupport.create(e); + } + } + } + + // Once the pending sends queue is drained we can propagate the close request. + if (pendingSends.isEmpty() && isAwaitingClose()) { + super.close(closeRequest); + } + } + + @Override + public void processDeliveryUpdates() { + List<Delivery> toRemove = new ArrayList<Delivery>(); + + for (Delivery delivery : pending) { + DeliveryState state = delivery.getRemoteState(); + if (state == null) { + continue; + } + + Outcome outcome = null; + if (state instanceof TransactionalState) { + LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state); + outcome = ((TransactionalState) state).getOutcome(); + } else if (state instanceof Outcome) { + outcome = (Outcome) state; + } else { + LOG.warn("Message send updated with unsupported state: {}", state); + continue; + } + + AsyncResult request = (AsyncResult) delivery.getContext(); + + if (outcome instanceof Accepted) { + toRemove.add(delivery); + LOG.trace("Outcome of delivery was accepted: {}", delivery); + tagGenerator.returnTag(delivery.getTag()); + if (request != null && !request.isComplete()) { + request.onSuccess(); + } + } else if (outcome instanceof Rejected) { + Exception remoteError = getRemoteError(); + toRemove.add(delivery); + LOG.trace("Outcome of delivery was rejected: {}", delivery); + tagGenerator.returnTag(delivery.getTag()); + if (request != null && !request.isComplete()) { + request.onFailure(remoteError); + } else { + connection.getProvider().fireProviderException(remoteError); + } + } else { + LOG.warn("Message send updated with unsupported outcome: {}", outcome); + } + } + + pending.removeAll(toRemove); + } + + @Override + protected void doOpen() { + JmsDestination destination = info.getDestination(); + + String destnationName = session.getQualifiedName(destination); + String sourceAddress = getProducerId().toString(); + Source source = new Source(); + source.setAddress(sourceAddress); + Target target = new Target(); + target.setAddress(destnationName); + + String senderName = sourceAddress + ":" + destnationName != null ? destnationName : "Anonymous"; + endpoint = session.getProtonSession().sender(senderName); + endpoint.setSource(source); + endpoint.setTarget(target); + if (presettle) { + endpoint.setSenderSettleMode(SenderSettleMode.SETTLED); + } else { + endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED); + } + endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST); + } + + @Override + protected void doClose() { + } + + public AmqpSession getSession() { + return this.session; + } + + public Sender getProtonSender() { + return this.endpoint; + } + + @Override + public boolean isAnonymous() { + return false; + } + + @Override + public void setPresettle(boolean presettle) { + this.presettle = presettle; + } + + @Override + public boolean isPresettle() { + return this.presettle; + } + + @Override + public String toString() { + return "AmqpFixedProducer { " + getProducerId() + " }"; + } + + private class PendingSend { + + public JmsOutboundMessageDispatch envelope; + public AsyncResult request; + + public PendingSend(JmsOutboundMessageDispatch envelope, AsyncResult request) { + this.envelope = envelope; + this.request = request; + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java new file mode 100644 index 0000000..eba019e --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJMSVendor.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.JmsQueue; +import org.apache.qpid.jms.JmsTemporaryQueue; +import org.apache.qpid.jms.JmsTemporaryTopic; +import org.apache.qpid.jms.JmsTopic; +import org.apache.qpid.jms.message.JmsDefaultMessageFactory; +import org.apache.qpid.jms.message.JmsMessage; +import org.apache.qpid.jms.message.JmsMessageFactory; +import org.apache.qpid.proton.jms.JMSVendor; + +/** + * Vendor instance used with Proton-J JMS Transformer bits. + * + * TODO - This can go once we have our own message wrappers all working. + */ +public class AmqpJMSVendor extends JMSVendor { + + public static final AmqpJMSVendor INSTANCE = new AmqpJMSVendor(); + + private final JmsMessageFactory factory = new JmsDefaultMessageFactory(); + + private AmqpJMSVendor() { + } + + @Override + public BytesMessage createBytesMessage() { + try { + return factory.createBytesMessage(); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + + @Override + public StreamMessage createStreamMessage() { + try { + return factory.createStreamMessage(); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + + @Override + public Message createMessage() { + try { + return factory.createMessage(); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + + @Override + public TextMessage createTextMessage() { + try { + return factory.createTextMessage(); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + + @Override + public ObjectMessage createObjectMessage() { + try { + return factory.createObjectMessage(); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + + @Override + public MapMessage createMapMessage() { + try { + return factory.createMapMessage(); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + + @Override + public Destination createDestination(String name) { + return super.createDestination(name, Destination.class); + } + + @Override + public <T extends Destination> T createDestination(String name, Class<T> kind) { + if (kind == Queue.class) { + return kind.cast(new JmsQueue(name)); + } + if (kind == Topic.class) { + return kind.cast(new JmsTopic(name)); + } + if (kind == TemporaryQueue.class) { + return kind.cast(new JmsTemporaryQueue(name)); + } + if (kind == TemporaryTopic.class) { + return kind.cast(new JmsTemporaryTopic(name)); + } + + return kind.cast(new JmsQueue(name)); + } + + @Override + public void setJMSXUserID(Message msg, String value) { + ((JmsMessage) msg).getFacade().setUserId(value); + } + + @Override + public void setJMSXGroupID(Message msg, String value) { + ((JmsMessage) msg).getFacade().setGroupId(value); + } + + @Override + public void setJMSXGroupSequence(Message msg, int value) { + ((JmsMessage) msg).getFacade().setGroupSequence(value); + } + + @Override + public void setJMSXDeliveryCount(Message msg, long value) { + // Delivery count tracks total deliveries which is always one higher than + // re-delivery count since first delivery counts to. + ((JmsMessage) msg).getFacade().setRedeliveryCounter((int) (value == 0 ? value : value - 1)); + } + + @Override + public String toAddress(Destination dest) { + return ((JmsDestination) dest).getName(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsNoLocalType.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsNoLocalType.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsNoLocalType.java new file mode 100644 index 0000000..24c6286 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsNoLocalType.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.UnsignedLong; + +/** + * A Described Type wrapper for JMS no local option for MessageConsumer. + */ +public class AmqpJmsNoLocalType implements DescribedType { + + public static final AmqpJmsNoLocalType NO_LOCAL = new AmqpJmsNoLocalType(); + + private final String noLocal; + + public AmqpJmsNoLocalType() { + this.noLocal = "NoLocalFilter{}"; + } + + @Override + public Object getDescriptor() { + return UnsignedLong.valueOf(0x0000468C00000003L); + } + + @Override + public Object getDescribed() { + return this.noLocal; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsSelectorType.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsSelectorType.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsSelectorType.java new file mode 100644 index 0000000..021f943 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpJmsSelectorType.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.UnsignedLong; + +/** + * A Described Type wrapper for JMS selector values. + */ +public class AmqpJmsSelectorType implements DescribedType { + + private final String selector; + + public AmqpJmsSelectorType(String selector) { + this.selector = selector; + } + + @Override + public Object getDescriptor() { + return UnsignedLong.valueOf(0x0000468C00000004L); + } + + @Override + public Object getDescribed() { + return this.selector; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java new file mode 100644 index 0000000..81ebf9e --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import java.io.IOException; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; +import org.apache.qpid.jms.meta.JmsProducerId; +import org.apache.qpid.jms.meta.JmsProducerInfo; +import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.proton.engine.Sender; + +/** + * Base class for Producer instances. + */ +public abstract class AmqpProducer extends AbstractAmqpResource<JmsProducerInfo, Sender> { + + protected final AmqpSession session; + protected final AmqpConnection connection; + protected boolean presettle; + + public AmqpProducer(AmqpSession session, JmsProducerInfo info) { + super(info); + this.session = session; + this.connection = session.getConnection(); + + // Add a shortcut back to this Producer for quicker lookup. + this.info.getProducerId().setProviderHint(this); + } + + /** + * Sends the given message + * + * @param envelope + * The envelope that contains the message and it's targeted destination. + * @param request + * The AsyncRequest that will be notified on send success or failure. + * + * @returns true if the producer had credit to send or false if there was no available + * credit and the send needed to be deferred. + * + * @throws IOException if an error occurs sending the message + * @throws JMSException if an error occurs while preparing the message for send. + */ + public abstract boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException; + + /** + * @return true if this is an anonymous producer or false if fixed to a given destination. + */ + public abstract boolean isAnonymous(); + + /** + * @return the JmsProducerId that was assigned to this AmqpProducer. + */ + public JmsProducerId getProducerId() { + return this.info.getProducerId(); + } + + /** + * @return true if the producer should presettle all sent messages. + */ + public boolean isPresettle() { + return presettle; + } + + /** + * Sets whether the producer will presettle all messages that it sends. Sending + * presettled reduces the time it takes to send a message but increases the change + * of message loss should the connection drop during send. + * + * @param presettle + * true if all messages are sent settled. + */ + public void setPresettle(boolean presettle) { + this.presettle = presettle; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java new file mode 100644 index 0000000..aad47f1 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -0,0 +1,821 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.message.JmsMessageFactory; +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; +import org.apache.qpid.jms.meta.JmsConnectionInfo; +import org.apache.qpid.jms.meta.JmsConsumerId; +import org.apache.qpid.jms.meta.JmsConsumerInfo; +import org.apache.qpid.jms.meta.JmsDefaultResourceVisitor; +import org.apache.qpid.jms.meta.JmsProducerId; +import org.apache.qpid.jms.meta.JmsProducerInfo; +import org.apache.qpid.jms.meta.JmsResource; +import org.apache.qpid.jms.meta.JmsResourceVistor; +import org.apache.qpid.jms.meta.JmsSessionId; +import org.apache.qpid.jms.meta.JmsSessionInfo; +import org.apache.qpid.jms.meta.JmsTransactionInfo; +import org.apache.qpid.jms.provider.AbstractProvider; +import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.ProviderFuture; +import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; +import org.apache.qpid.jms.transports.TcpTransport; +import org.apache.qpid.jms.transports.TransportListener; +import org.apache.qpid.jms.util.IOExceptionSupport; +import org.apache.qpid.jms.util.PropertyUtil; +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Event.Type; +import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.engine.impl.CollectorImpl; +import org.apache.qpid.proton.engine.impl.ProtocolTracer; +import org.apache.qpid.proton.engine.impl.TransportImpl; +import org.apache.qpid.proton.framing.TransportFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.vertx.java.core.buffer.Buffer; + +/** + * An AMQP v1.0 Provider. + * + * The AMQP Provider is bonded to a single remote broker instance. The provider will attempt + * to connect to only that instance and once failed can not be recovered. For clients that + * wish to implement failover type connections a new AMQP Provider instance must be created + * and state replayed from the JMS layer using the standard recovery process defined in the + * JMS Provider API. + * + * All work within this Provider is serialized to a single Thread. Any asynchronous exceptions + * will be dispatched from that Thread and all in-bound requests are handled there as well. + */ +public class AmqpProvider extends AbstractProvider implements TransportListener { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpProvider.class); + + private static final Logger TRACE_BYTES = LoggerFactory.getLogger(AmqpConnection.class.getPackage().getName() + ".BYTES"); + private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpConnection.class.getPackage().getName() + ".FRAMES"); + private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 1; + + private AmqpConnection connection; + private org.apache.qpid.jms.transports.Transport transport; + private boolean traceFrames; + private boolean traceBytes; + private boolean presettleConsumers; + private boolean presettleProducers; + private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT; + private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT; + private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT; + private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT; + + private final Transport protonTransport = Transport.Factory.create(); + private final Collector protonCollector = new CollectorImpl(); + + /** + * Create a new instance of an AmqpProvider bonded to the given remote URI. + * + * @param remoteURI + * The URI of the AMQP broker this Provider instance will connect to. + */ + public AmqpProvider(URI remoteURI) { + this(remoteURI, null); + } + + /** + * Create a new instance of an AmqpProvider bonded to the given remote URI. + * + * @param remoteURI + * The URI of the AMQP broker this Provider instance will connect to. + */ + public AmqpProvider(URI remoteURI, Map<String, String> extraOptions) { + super(remoteURI); + updateTracer(); + } + + @Override + public void connect() throws IOException { + checkClosed(); + + transport = createTransport(getRemoteURI()); + + Map<String, String> map = Collections.emptyMap(); + try { + map = PropertyUtil.parseQuery(remoteURI.getQuery()); + } catch (Exception e) { + IOExceptionSupport.create(e); + } + Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "transport."); + + if (!PropertyUtil.setProperties(transport, providerOptions)) { + String msg = "" + + " Not all transport options could be set on the AMQP Provider transport." + + " Check the options are spelled correctly." + + " Given parameters=[" + providerOptions + "]." + + " This provider instance cannot be started."; + throw new IOException(msg); + } + + transport.connect(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + final ProviderFuture request = new ProviderFuture(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + + // If we are not connected then there is nothing we can do now + // just signal success. + if (!transport.isConnected()) { + request.onSuccess(); + } + + if (connection != null) { + connection.close(request); + } else { + request.onSuccess(); + } + + pumpToProtonTransport(); + } catch (Exception e) { + LOG.debug("Caught exception while closing proton connection"); + } + } + }); + + try { + if (closeTimeout < 0) { + request.sync(); + } else { + request.sync(closeTimeout, TimeUnit.MILLISECONDS); + } + } catch (IOException e) { + LOG.warn("Error caught while closing Provider: ", e.getMessage()); + } finally { + if (transport != null) { + try { + transport.close(); + } catch (Exception e) { + LOG.debug("Cuaght exception while closing down Transport: {}", e.getMessage()); + } + } + + if (serializer != null) { + serializer.shutdown(); + } + } + } + } + + @Override + public void create(final JmsResource resource, final AsyncResult request) throws IOException, JMSException { + checkClosed(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + checkClosed(); + resource.visit(new JmsResourceVistor() { + + @Override + public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception { + AmqpSession session = connection.createSession(sessionInfo); + session.open(request); + } + + @Override + public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception { + AmqpSession session = connection.getSession(producerInfo.getParentId()); + AmqpProducer producer = session.createProducer(producerInfo); + producer.open(request); + } + + @Override + public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception { + AmqpSession session = connection.getSession(consumerInfo.getParentId()); + AmqpConsumer consumer = session.createConsumer(consumerInfo); + consumer.open(request); + } + + @Override + public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception { + closeTimeout = connectionInfo.getCloseTimeout(); + connectTimeout = connectionInfo.getConnectTimeout(); + sendTimeout = connectionInfo.getSendTimeout(); + requestTimeout = connectionInfo.getRequestTimeout(); + + Connection protonConnection = Connection.Factory.create(); + protonTransport.setMaxFrameSize(getMaxFrameSize()); + protonTransport.bind(protonConnection); + protonConnection.collect(protonCollector); + Sasl sasl = protonTransport.sasl(); + if (sasl != null) { + sasl.client(); + } + connection = new AmqpConnection(AmqpProvider.this, protonConnection, sasl, connectionInfo); + connection.open(request); + } + + @Override + public void processDestination(JmsDestination destination) throws Exception { + if (destination.isTemporary()) { + AmqpTemporaryDestination temporary = connection.createTemporaryDestination(destination); + temporary.open(request); + } else { + request.onSuccess(); + } + } + + @Override + public void processTransactionInfo(JmsTransactionInfo transactionInfo) throws Exception { + AmqpSession session = connection.getSession(transactionInfo.getParentId()); + session.begin(transactionInfo.getTransactionId(), request); + } + }); + + pumpToProtonTransport(); + } catch (Exception error) { + request.onFailure(error); + } + } + }); + } + + @Override + public void start(final JmsResource resource, final AsyncResult request) throws IOException { + checkClosed(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + checkClosed(); + resource.visit(new JmsDefaultResourceVisitor() { + + @Override + public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception { + AmqpSession session = connection.getSession(consumerInfo.getParentId()); + AmqpConsumer consumer = session.getConsumer(consumerInfo); + consumer.start(request); + } + }); + + pumpToProtonTransport(); + } catch (Exception error) { + request.onFailure(error); + } + } + }); + } + + @Override + public void destroy(final JmsResource resource, final AsyncResult request) throws IOException { + //TODO: improve or delete this logging + LOG.debug("Destroy called"); + + checkClosed(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + //TODO: improve or delete this logging + LOG.debug("Processing resource destroy request"); + checkClosed(); + resource.visit(new JmsDefaultResourceVisitor() { + + @Override + public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception { + AmqpSession session = connection.getSession(sessionInfo.getSessionId()); + session.close(request); + } + + @Override + public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception { + AmqpSession session = connection.getSession(producerInfo.getParentId()); + AmqpProducer producer = session.getProducer(producerInfo); + producer.close(request); + } + + @Override + public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception { + AmqpSession session = connection.getSession(consumerInfo.getParentId()); + AmqpConsumer consumer = session.getConsumer(consumerInfo); + consumer.close(request); + } + + @Override + public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception { + connection.close(request); + } + + @Override + public void processDestination(JmsDestination destination) throws Exception { + // TODO - Delete remote temporary Topic or Queue + request.onSuccess(); + } + }); + + pumpToProtonTransport(); + } catch (Exception error) { + request.onFailure(error); + } + } + }); + } + + @Override + public void send(final JmsOutboundMessageDispatch envelope, final AsyncResult request) throws IOException { + checkClosed(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + checkClosed(); + + JmsProducerId producerId = envelope.getProducerId(); + AmqpProducer producer = null; + + if (producerId.getProviderHint() instanceof AmqpFixedProducer) { + producer = (AmqpFixedProducer) producerId.getProviderHint(); + } else { + AmqpSession session = connection.getSession(producerId.getParentId()); + producer = session.getProducer(producerId); + } + + boolean couldSend = producer.send(envelope, request); + pumpToProtonTransport(); + if (couldSend && envelope.isSendAsync()) { + request.onSuccess(); + } + } catch (Exception error) { + request.onFailure(error); + } + } + }); + } + + @Override + public void acknowledge(final JmsSessionId sessionId, final AsyncResult request) throws IOException { + checkClosed(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + checkClosed(); + AmqpSession amqpSession = connection.getSession(sessionId); + amqpSession.acknowledge(); + pumpToProtonTransport(); + request.onSuccess(); + } catch (Exception error) { + request.onFailure(error); + } + } + }); + } + + @Override + public void acknowledge(final JmsInboundMessageDispatch envelope, final ACK_TYPE ackType, final AsyncResult request) throws IOException { + checkClosed(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + checkClosed(); + + JmsConsumerId consumerId = envelope.getConsumerId(); + AmqpConsumer consumer = null; + + if (consumerId.getProviderHint() instanceof AmqpConsumer) { + consumer = (AmqpConsumer) consumerId.getProviderHint(); + } else { + AmqpSession session = connection.getSession(consumerId.getParentId()); + consumer = session.getConsumer(consumerId); + } + + consumer.acknowledge(envelope, ackType); + + if (consumer.getSession().isAsyncAck()) { + request.onSuccess(); + pumpToProtonTransport(); + } else { + pumpToProtonTransport(); + request.onSuccess(); + } + } catch (Exception error) { + request.onFailure(error); + } + } + }); + } + + @Override + public void commit(final JmsSessionId sessionId, final AsyncResult request) throws IOException { + checkClosed(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + checkClosed(); + AmqpSession session = connection.getSession(sessionId); + session.commit(request); + pumpToProtonTransport(); + } catch (Exception error) { + request.onFailure(error); + } + } + }); + } + + @Override + public void rollback(final JmsSessionId sessionId, final AsyncResult request) throws IOException { + checkClosed(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + checkClosed(); + AmqpSession session = connection.getSession(sessionId); + session.rollback(request); + pumpToProtonTransport(); + } catch (Exception error) { + request.onFailure(error); + } + } + }); + } + + @Override + public void recover(final JmsSessionId sessionId, final AsyncResult request) throws IOException { + checkClosed(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + checkClosed(); + AmqpSession session = connection.getSession(sessionId); + session.recover(); + pumpToProtonTransport(); + request.onSuccess(); + } catch (Exception error) { + request.onFailure(error); + } + } + }); + } + + @Override + public void unsubscribe(final String subscription, final AsyncResult request) throws IOException { + checkClosed(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + checkClosed(); + pumpToProtonTransport(); + request.onSuccess(); + } catch (Exception error) { + request.onFailure(error); + } + } + }); + } + + @Override + public void pull(final JmsConsumerId consumerId, final long timeout, final AsyncResult request) throws IOException { + checkClosed(); + serializer.execute(new Runnable() { + + @Override + public void run() { + try { + checkClosed(); + AmqpConsumer consumer = null; + + if (consumerId.getProviderHint() instanceof AmqpConsumer) { + consumer = (AmqpConsumer) consumerId.getProviderHint(); + } else { + AmqpSession session = connection.getSession(consumerId.getParentId()); + consumer = session.getConsumer(consumerId); + } + + consumer.pull(timeout); + pumpToProtonTransport(); + request.onSuccess(); + } catch (Exception error) { + request.onFailure(error); + } + } + }); + } + + /** + * Provides an extension point for subclasses to insert other types of transports such + * as SSL etc. + * + * @param remoteLocation + * The remote location where the transport should attempt to connect. + * + * @return the newly created transport instance. + */ + protected org.apache.qpid.jms.transports.Transport createTransport(URI remoteLocation) { + return new TcpTransport(this, remoteLocation); + } + + private void updateTracer() { + if (isTraceFrames()) { + ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() { + @Override + public void receivedFrame(TransportFrame transportFrame) { + TRACE_FRAMES.trace("RECV: {}", transportFrame.getBody()); + } + + @Override + public void sentFrame(TransportFrame transportFrame) { + TRACE_FRAMES.trace("SENT: {}", transportFrame.getBody()); + } + }); + } + } + + @Override + public void onData(Buffer input) { + + // Create our own copy since we will process later. + final ByteBuffer source = ByteBuffer.wrap(input.getBytes()); + + serializer.execute(new Runnable() { + + @Override + public void run() { + LOG.trace("Received from Broker {} bytes:", source.remaining()); + + do { + ByteBuffer buffer = protonTransport.getInputBuffer(); + int limit = Math.min(buffer.remaining(), source.remaining()); + ByteBuffer duplicate = source.duplicate(); + duplicate.limit(source.position() + limit); + buffer.put(duplicate); + protonTransport.processInput(); + source.position(source.position() + limit); + } while (source.hasRemaining()); + + // Process the state changes from the latest data and then answer back + // any pending updates to the Broker. + processUpdates(); + pumpToProtonTransport(); + } + }); + } + + /** + * Callback method for the Transport to report connection errors. When called + * the method will queue a new task to fire the failure error back to the listener. + * + * @param error + * the error that causes the transport to fail. + */ + @Override + public void onTransportError(final Throwable error) { + if (!serializer.isShutdown()) { + serializer.execute(new Runnable() { + @Override + public void run() { + LOG.info("Transport failed: {}", error.getMessage()); + if (!closed.get()) { + fireProviderException(error); + if (connection != null) { + connection.closed(); + } + } + } + }); + } + } + + /** + * Callback method for the Transport to report that the underlying connection + * has closed. When called this method will queue a new task that will check for + * the closed state on this transport and if not closed then an exception is raied + * to the registered ProviderListener to indicate connection loss. + */ + @Override + public void onTransportClosed() { + // TODO: improve or delete this logging + LOG.debug("onTransportClosed listener called"); + if (!serializer.isShutdown()) { + serializer.execute(new Runnable() { + @Override + public void run() { + LOG.debug("Transport connection remotely closed"); + if (!closed.get()) { + fireProviderException(new IOException("Connection remotely closed.")); + if (connection != null) { + connection.closed(); + } + } + } + }); + } + } + + private void processUpdates() { + try { + Event protonEvent = null; + while ((protonEvent = protonCollector.peek()) != null) { + if (!protonEvent.getType().equals(Type.TRANSPORT)) { + LOG.trace("New Proton Event: {}", protonEvent.getType()); + } + + AmqpResource amqpResource = null; + switch (protonEvent.getType()) { + case CONNECTION_REMOTE_CLOSE: + case CONNECTION_REMOTE_OPEN: + AmqpConnection connection = (AmqpConnection) protonEvent.getConnection().getContext(); + connection.processStateChange(); + break; + case SESSION_REMOTE_CLOSE: + case SESSION_REMOTE_OPEN: + AmqpSession session = (AmqpSession) protonEvent.getSession().getContext(); + session.processStateChange(); + break; + case LINK_REMOTE_CLOSE: + case LINK_REMOTE_OPEN: + AmqpResource resource = (AmqpResource) protonEvent.getLink().getContext(); + resource.processStateChange(); + break; + case LINK_FLOW: + amqpResource = (AmqpResource) protonEvent.getLink().getContext(); + amqpResource.processFlowUpdates(); + break; + case DELIVERY: + amqpResource = (AmqpResource) protonEvent.getLink().getContext(); + amqpResource.processDeliveryUpdates(); + break; + default: + break; + } + + protonCollector.pop(); + } + + // We have to do this to pump SASL bytes in as SASL is not event driven yet. + if (connection != null) { + connection.processSaslAuthentication(); + } + } catch (Exception ex) { + LOG.warn("Caught Exception during update processing: {}", ex.getMessage(), ex); + fireProviderException(ex); + } + } + + private void pumpToProtonTransport() { + try { + boolean done = false; + while (!done) { + ByteBuffer toWrite = protonTransport.getOutputBuffer(); + if (toWrite != null && toWrite.hasRemaining()) { + // TODO - Get Bytes in a readable form + if (isTraceBytes()) { + TRACE_BYTES.info("Sending: {}", toWrite.toString()); + } + transport.send(toWrite); + protonTransport.outputConsumed(); + } else { + done = true; + } + } + } catch (IOException e) { + fireProviderException(e); + } + } + + //---------- Property Setters and Getters --------------------------------// + + @Override + public JmsMessageFactory getMessageFactory() { + if (connection == null) { + throw new RuntimeException("Message Factory is not accessible when not connected."); + } + return connection.getAmqpMessageFactory(); + } + + public void setTraceFrames(boolean trace) { + this.traceFrames = trace; + updateTracer(); + } + + public boolean isTraceFrames() { + return this.traceFrames; + } + + public void setTraceBytes(boolean trace) { + this.traceBytes = trace; + } + + public boolean isTraceBytes() { + return this.traceBytes; + } + + public long getCloseTimeout() { + return this.closeTimeout; + } + + public void setCloseTimeout(long closeTimeout) { + this.closeTimeout = closeTimeout; + } + + public long getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(long connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public long getRequestTimeout() { + return requestTimeout; + } + + public void setRequestTimeout(long requestTimeout) { + this.requestTimeout = requestTimeout; + } + + public long getSendTimeout() { + return sendTimeout; + } + + public void setSendTimeout(long sendTimeout) { + this.sendTimeout = sendTimeout; + } + + public void setPresettle(boolean presettle) { + setPresettleConsumers(presettle); + setPresettleProducers(presettle); + } + + public boolean isPresettleConsumers() { + return this.presettleConsumers; + } + + public void setPresettleConsumers(boolean presettle) { + this.presettleConsumers = presettle; + } + + public boolean isPresettleProducers() { + return this.presettleProducers; + } + + public void setPresettleProducers(boolean presettle) { + this.presettleProducers = presettle; + } + + /** + * @return the currently set Max Frame Size value. + */ + public int getMaxFrameSize() { + return DEFAULT_MAX_FRAME_SIZE; + } + + @Override + public String toString() { + return "AmqpProvider: " + getRemoteURI().getHost() + ":" + getRemoteURI().getPort(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
