http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscription.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscription.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscription.java new file mode 100644 index 0000000..0eb84a2 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscription.java @@ -0,0 +1,241 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.io.IOException; +import java.util.List; + +import javax.jms.InvalidSelectorException; +import javax.management.ObjectName; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.Response; +import org.apache.activemq.filter.MessageEvaluationContext; + +public interface AMQSubscription extends AMQSubscriptionRecovery +{ + + /** + * Used to add messages that match the subscription. + * @param node + * @throws Exception + * @throws InterruptedException + * @throws IOException + */ + void add(MessageReference node) throws Exception; + + /** + * Used when client acknowledge receipt of dispatched message. + * @throws IOException + * @throws Exception + */ + void acknowledge(AMQConnectionContext context, final MessageAck ack) throws Exception; + + /** + * Allows a consumer to pull a message on demand + */ + Response pullMessage(AMQConnectionContext context, MessagePull pull) throws Exception; + + /** + * Returns true if this subscription is a Wildcard subscription. + * @return true if wildcard subscription. + */ + boolean isWildcard(); + + /** + * Is the subscription interested in the message? + * @param node + * @param context + * @return + * @throws IOException + */ + boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException; + + /** + * Is the subscription interested in messages in the destination? + * @param destination + * @return + */ + boolean matches(ActiveMQDestination destination); + + /** + * The subscription will be receiving messages from the destination. + * @param context + * @param destination + * @throws Exception + */ + void add(AMQConnectionContext context, AMQDestination destination) throws Exception; + + /** + * The subscription will be no longer be receiving messages from the destination. + * @param context + * @param destination + * @return a list of un-acked messages that were added to the subscription. + */ + List<MessageReference> remove(AMQConnectionContext context, AMQDestination destination) throws Exception; + + /** + * The ConsumerInfo object that created the subscription. + */ + ConsumerInfo getConsumerInfo(); + + /** + * The subscription should release as may references as it can to help the garbage collector + * reclaim memory. + */ + void gc(); + + /** + * Used by a Slave Broker to update dispatch infomation + * @param mdn + * @throws Exception + */ + void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception; + + /** + * @return number of messages pending delivery + */ + int getPendingQueueSize(); + + /** + * @return number of messages dispatched to the client + */ + int getDispatchedQueueSize(); + + /** + * @return number of messages dispatched to the client + */ + long getDispatchedCounter(); + + /** + * @return number of messages that matched the subscription + */ + long getEnqueueCounter(); + + /** + * @return number of messages queued by the client + */ + long getDequeueCounter(); + + /** + * @return the JMS selector on the current subscription + */ + String getSelector(); + + /** + * Attempts to change the current active selector on the subscription. + * This operation is not supported for persistent topics. + */ + void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException; + + /** + * @return the JMX object name that this subscription was registered as if applicable + */ + ObjectName getObjectName(); + + /** + * Set when the subscription is registered in JMX + */ + void setObjectName(ObjectName objectName); + + /** + * @return true when 60% or more room is left for dispatching messages + */ + boolean isLowWaterMark(); + + /** + * @return true when 10% or less room is left for dispatching messages + */ + boolean isHighWaterMark(); + + /** + * @return true if there is no space to dispatch messages + */ + boolean isFull(); + + /** + * inform the MessageConsumer on the client to change it's prefetch + * @param newPrefetch + */ + void updateConsumerPrefetch(int newPrefetch); + + /** + * Called when the subscription is destroyed. + */ + void destroy(); + + /** + * @return the prefetch size that is configured for the subscription + */ + int getPrefetchSize(); + + /** + * @return the number of messages awaiting acknowledgement + */ + int getInFlightSize(); + + /** + * @return the in flight messages as a percentage of the prefetch size + */ + int getInFlightUsage(); + + /** + * Informs the Broker if the subscription needs to intervention to recover it's state + * e.g. DurableTopicSubscriber may do + * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor + * @return true if recovery required + */ + boolean isRecoveryRequired(); + + /** + * @return true if a browser + */ + boolean isBrowser(); + + /** + * @return the number of messages this subscription can accept before its full + */ + int countBeforeFull(); + + AMQConnectionContext getContext(); + + int getCursorMemoryHighWaterMark(); + + void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark); + + boolean isSlowConsumer(); + + void unmatched(MessageReference node) throws IOException; + + /** + * Returns the time since the last Ack message was received by this subscription. + * + * If there has never been an ack this value should be set to the creation time of the + * subscription. + * + * @return time of last received Ack message or Subscription create time if no Acks. + */ + long getTimeOfLastMessageAck(); + + long getConsumedCount(); + + void incrementConsumedCount(); + + void resetConsumedCount(); + +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscriptionRecovery.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscriptionRecovery.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscriptionRecovery.java new file mode 100644 index 0000000..7cf1e84 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscriptionRecovery.java @@ -0,0 +1,42 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.ActiveMQDestination; + +/** + * An interface for recoverying transient messages held by the broker for + * retractive recovery for subscribers + * + * + */ +public interface AMQSubscriptionRecovery +{ + + /** + * Add a message to the SubscriptionRecovery + * + * @param context + * @param message + * @return true if the message is accepted + * @throws Exception + */ + boolean addRecoveredMessage(AMQConnectionContext context, MessageReference message) throws Exception; + + /** + * @return the Destination associated with this Subscription + */ + ActiveMQDestination getActiveMQDestination(); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransaction.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransaction.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransaction.java new file mode 100644 index 0000000..1a477b9 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransaction.java @@ -0,0 +1,249 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; + +import javax.transaction.xa.XAException; + +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.transaction.Synchronization; +import org.slf4j.Logger; + +public abstract class AMQTransaction +{ + public static final byte START_STATE = 0; // can go to: 1,2,3 + public static final byte IN_USE_STATE = 1; // can go to: 2,3 + public static final byte PREPARED_STATE = 2; // can go to: 3 + public static final byte FINISHED_STATE = 3; + boolean committed = false; + + private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>(); + private byte state = START_STATE; + protected FutureTask<?> preCommitTask = new FutureTask<Object>( + new Callable<Object>() + { + public Object call() throws Exception + { + doPreCommit(); + return null; + } + }); + protected FutureTask<?> postCommitTask = new FutureTask<Object>( + new Callable<Object>() + { + public Object call() throws Exception + { + doPostCommit(); + return null; + } + }); + + public byte getState() + { + return state; + } + + public void setState(byte state) + { + this.state = state; + } + + public boolean isCommitted() + { + return committed; + } + + public void setCommitted(boolean committed) + { + this.committed = committed; + } + + public void addSynchronization(Synchronization r) + { + synchronizations.add(r); + if (state == START_STATE) + { + state = IN_USE_STATE; + } + } + + public Synchronization findMatching(Synchronization r) + { + int existing = synchronizations.indexOf(r); + if (existing != -1) + { + return synchronizations.get(existing); + } + return null; + } + + public void removeSynchronization(Synchronization r) + { + synchronizations.remove(r); + } + + public void prePrepare() throws Exception + { + + // Is it ok to call prepare now given the state of the + // transaction? + switch (state) + { + case START_STATE: + case IN_USE_STATE: + break; + default: + XAException xae = new XAException("Prepare cannot be called now."); + xae.errorCode = XAException.XAER_PROTO; + throw xae; + } + + // // Run the prePrepareTasks + // for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) { + // Callback r = (Callback) iter.next(); + // r.execute(); + // } + } + + protected void fireBeforeCommit() throws Exception + { + for (Iterator<Synchronization> iter = synchronizations.iterator(); iter + .hasNext();) + { + Synchronization s = iter.next(); + s.beforeCommit(); + } + } + + protected void fireAfterCommit() throws Exception + { + for (Iterator<Synchronization> iter = synchronizations.iterator(); iter + .hasNext();) + { + Synchronization s = iter.next(); + s.afterCommit(); + } + } + + public void fireAfterRollback() throws Exception + { + Collections.reverse(synchronizations); + for (Iterator<Synchronization> iter = synchronizations.iterator(); iter + .hasNext();) + { + Synchronization s = iter.next(); + s.afterRollback(); + } + } + + @Override + public String toString() + { + return "Local-" + getTransactionId() + "[synchronizations=" + + synchronizations + "]"; + } + + public abstract void commit(boolean onePhase) throws XAException, + IOException; + + public abstract void rollback() throws XAException, IOException; + + public abstract int prepare() throws XAException, IOException; + + public abstract TransactionId getTransactionId(); + + public abstract Logger getLog(); + + public boolean isPrepared() + { + return getState() == PREPARED_STATE; + } + + public int size() + { + return synchronizations.size(); + } + + protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException + { + try + { + postCommitTask.get(); + } + catch (InterruptedException e) + { + throw new InterruptedIOException(e.toString()); + } + catch (ExecutionException e) + { + Throwable t = e.getCause(); + if (t instanceof XAException) + { + throw (XAException) t; + } + else if (t instanceof IOException) + { + throw (IOException) t; + } + else + { + throw new XAException(e.toString()); + } + } + } + + protected void doPreCommit() throws XAException + { + try + { + fireBeforeCommit(); + } + catch (Throwable e) + { + // I guess this could happen. Post commit task failed + // to execute properly. + getLog().warn("PRE COMMIT FAILED: ", e); + XAException xae = new XAException("PRE COMMIT FAILED"); + xae.errorCode = XAException.XAER_RMERR; + xae.initCause(e); + throw xae; + } + } + + protected void doPostCommit() throws XAException + { + try + { + setCommitted(true); + fireAfterCommit(); + } + catch (Throwable e) + { + // I guess this could happen. Post commit task failed + // to execute properly. + getLog().warn("POST COMMIT FAILED: ", e); + XAException xae = new XAException("POST COMMIT FAILED"); + xae.errorCode = XAException.XAER_RMERR; + xae.initCause(e); + throw xae; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransactionFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransactionFactory.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransactionFactory.java new file mode 100644 index 0000000..5632a63 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransactionFactory.java @@ -0,0 +1,29 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import javax.transaction.xa.Xid; + +import org.hornetq.core.persistence.StorageManager; +import org.hornetq.core.protocol.openwire.AMQTransactionImpl; +import org.hornetq.core.transaction.Transaction; +import org.hornetq.core.transaction.TransactionFactory; + +public class AMQTransactionFactory implements TransactionFactory +{ + @Override + public Transaction newTransaction(Xid xid, StorageManager storageManager, int timeoutSeconds) + { + return new AMQTransactionImpl(xid, storageManager, timeoutSeconds); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionState.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionState.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionState.java new file mode 100644 index 0000000..be11e5b --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionState.java @@ -0,0 +1,86 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.state.ConnectionState; +import org.hornetq.core.protocol.openwire.OpenWireConnection; + +/** + * @See org.apache.activemq.broker.TransportConnectionState + * @author howard + * + */ +public class AMQTransportConnectionState extends ConnectionState +{ + + private AMQConnectionContext context; + private OpenWireConnection connection; + private AtomicInteger referenceCounter = new AtomicInteger(); + private final Object connectionMutex = new Object(); + + public AMQTransportConnectionState(ConnectionInfo info, + OpenWireConnection transportConnection) + { + super(info); + connection = transportConnection; + } + + public AMQConnectionContext getContext() + { + return context; + } + + public OpenWireConnection getConnection() + { + return connection; + } + + public void setContext(AMQConnectionContext context) + { + this.context = context; + } + + public void setConnection(OpenWireConnection connection) + { + this.connection = connection; + } + + public int incrementReference() + { + return referenceCounter.incrementAndGet(); + } + + public int decrementReference() + { + return referenceCounter.decrementAndGet(); + } + + public AtomicInteger getReferenceCounter() + { + return referenceCounter; + } + + public void setReferenceCounter(AtomicInteger referenceCounter) + { + this.referenceCounter = referenceCounter; + } + + public Object getConnectionMutex() + { + return connectionMutex; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java new file mode 100644 index 0000000..d0e68f5 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java @@ -0,0 +1,58 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import java.util.List; +import java.util.Map; + +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.SessionId; + +/** + * What's the purpose of this? + * + * @author howard + * + */ +public interface AMQTransportConnectionStateRegister +{ + AMQTransportConnectionState registerConnectionState(ConnectionId connectionId, + AMQTransportConnectionState state); + + AMQTransportConnectionState unregisterConnectionState(ConnectionId connectionId); + + List<AMQTransportConnectionState> listConnectionStates(); + + Map<ConnectionId, AMQTransportConnectionState> mapStates(); + + AMQTransportConnectionState lookupConnectionState(String connectionId); + + AMQTransportConnectionState lookupConnectionState(ConsumerId id); + + AMQTransportConnectionState lookupConnectionState(ProducerId id); + + AMQTransportConnectionState lookupConnectionState(SessionId id); + + AMQTransportConnectionState lookupConnectionState(ConnectionId connectionId); + + boolean isEmpty(); + + boolean doesHandleMultipleConnectionStates(); + + void intialize(AMQTransportConnectionStateRegister other); + + void clear(); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/BrowserListener.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/BrowserListener.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/BrowserListener.java new file mode 100644 index 0000000..12eaf23 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/BrowserListener.java @@ -0,0 +1,18 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +interface BrowserListener +{ + void browseFinished(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/MessageInfo.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/MessageInfo.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/MessageInfo.java new file mode 100644 index 0000000..6315d86 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/MessageInfo.java @@ -0,0 +1,47 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire.amq; + +import org.apache.activemq.command.MessageId; + +public class MessageInfo +{ + public MessageId amqId; + public long nativeId; + public int size; + //mark message that is acked within a local tx + public boolean localAcked; + + public MessageInfo(MessageId amqId, long nativeId, int size) + { + this.amqId = amqId; + this.nativeId = nativeId; + this.size = size; + } + + @Override + public String toString() + { + return "native mid: " + this.nativeId + " amqId: " + amqId + " local acked: " + localAcked; + } + + public void setLocalAcked(boolean ack) + { + localAcked = ack; + } + + public boolean isLocalAcked() + { + return localAcked; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory b/hornetq-protocols/hornetq-openwire-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory new file mode 100644 index 0000000..fd06ac6 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory @@ -0,0 +1 @@ +org.hornetq.core.protocol.openwire.OpenWireProtocolManagerFactory http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/pom.xml ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/pom.xml b/hornetq-protocols/hornetq-proton-plug/pom.xml new file mode 100644 index 0000000..98ece12 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/pom.xml @@ -0,0 +1,80 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>hornetq-protocols</artifactId> + <groupId>org.hornetq</groupId> + <version>2.5.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>hornetq-proton-plug</artifactId> + + <properties> + <hornetq.basedir>${project.basedir}/../..</hornetq.basedir> + </properties> + + <dependencies> + <!-- JMS Client because of some Convertions that are done --> + <dependency> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-jms-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-core-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging-processor</artifactId> + </dependency> + + <!-- + JBoss Logging + --> + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging</artifactId> + </dependency> + <dependency> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-server</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>proton-j</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-client</artifactId> + <version>0.24</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-amqp-1-0-client-jms</artifactId> + <version>0.24</version> + <scope>test</scope> + </dependency> + + + + <dependency> + <groupId>org.jboss.spec.javax.jms</groupId> + <artifactId>jboss-jms-api_2.0_spec</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + + </dependencies> + + +</project> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java new file mode 100644 index 0000000..5678a38 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java @@ -0,0 +1,34 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug; + +import org.proton.plug.exceptions.HornetQAMQPException; + +/** + * This is valid only on a client connection. + * + * @author Clebert Suconic + */ + +public interface AMQPClientConnectionContext extends AMQPConnectionContext +{ + /** + * This will send an open and block for its return on AMQP protocol. + * + * @throws Exception + */ + void clientOpen(ClientSASL sasl) throws Exception; + + AMQPClientSessionContext createClientSession() throws HornetQAMQPException; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java new file mode 100644 index 0000000..4e94a73 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java @@ -0,0 +1,29 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug; + +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.proton.message.ProtonJMessage; + +/** + * @author Clebert Suconic + */ + +public interface AMQPClientReceiverContext +{ + ProtonJMessage receiveMessage(int time, TimeUnit unit) throws Exception; + + void flow(int credits); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java new file mode 100644 index 0000000..1fb5375 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java @@ -0,0 +1,25 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug; + +import org.apache.qpid.proton.message.ProtonJMessage; + +/** + * @author Clebert Suconic + */ + +public interface AMQPClientSenderContext +{ + void send(ProtonJMessage message); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java new file mode 100644 index 0000000..45cbaad --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java @@ -0,0 +1,27 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug; + +import org.proton.plug.exceptions.HornetQAMQPException; + +/** + * @author Clebert Suconic + */ + +public interface AMQPClientSessionContext extends AMQPSessionContext +{ + AMQPClientSenderContext createSender(String address, boolean preSettled) throws HornetQAMQPException; + + AMQPClientReceiverContext createReceiver(String address) throws HornetQAMQPException; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java new file mode 100644 index 0000000..d6a1b5a --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java @@ -0,0 +1,41 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug; + +import io.netty.buffer.ByteBuf; + +/** + * @author Clebert Suconic + */ + +public interface AMQPConnectionCallback +{ + void close(); + + /** + * this is called when bytes are available to be sent to the client. + * you have to callback {@link org.proton.plug.AMQPConnectionContext#outputDone(int)} after you're done with this buffer + * + * @param bytes + */ + void onTransport(ByteBuf bytes, AMQPConnectionContext connection); + + AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection); + + void setConnection(AMQPConnectionContext connection); + + AMQPConnectionContext getConnection(); + + ServerSASL[] getSASLMechnisms(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java new file mode 100644 index 0000000..4969b10 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java @@ -0,0 +1,68 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug; + +import io.netty.buffer.ByteBuf; + +/** + * @author Clebert Suconic + */ + +public interface AMQPConnectionContext +{ + + void close(); + + Object getLock(); + + boolean checkDataReceived(); + + long getCreationTime(); + + SASLResult getSASLResult(); + + /** + * Even though we are currently always sending packets asynchronsouly + * we have a possibility to start trusting on the network flow control + * and always sync on the send of the packet + * + * This is for future use and should be kept returning false. + * + * We will have to do some testing before we make this return true + * @return + */ + boolean isSyncOnFlush(); + + int capacity(); + + /** + * This is for the Remoting layer to push bytes on the AMQP Connection + * The buffer readerIndex should be at the latest read byte after this method is called + * + * @param buffer + * @return + */ + void inputBuffer(ByteBuf buffer); + + void flush(); + + /** + * To be called when the bytes were sent down the stream (flushed on the socket for example) + * + * @param numberOfBytes + */ + void outputDone(int numberOfBytes); + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java new file mode 100644 index 0000000..4f0b127 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java @@ -0,0 +1,26 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug; + +/** + * @author Clebert Suconic + */ + +public abstract class AMQPConnectionContextFactory +{ + /** + * @return + */ + public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPServerConnectionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPServerConnectionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPServerConnectionContext.java new file mode 100644 index 0000000..62846a8 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPServerConnectionContext.java @@ -0,0 +1,22 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug; + +/** + * @author Clebert Suconic + */ + +public interface AMQPServerConnectionContext extends AMQPConnectionContext +{ +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java new file mode 100644 index 0000000..10933a5 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java @@ -0,0 +1,85 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug; + +import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.proton.plug.context.ProtonPlugSender; + +/** + * These are methods where the Proton Plug component will call your server + * + * @author Clebert Suconic + */ + +public interface AMQPSessionCallback +{ + + void init(AMQPSessionContext session, SASLResult saslResult) throws Exception; + + void start(); + + void onFlowConsumer(Object consumer, int credits); + + Object createSender(ProtonPlugSender protonSender, String queue, String filer, boolean browserOnly) throws Exception; + + void startSender(Object brokerConsumer) throws Exception; + + void createTemporaryQueue(String queueName) throws Exception; + + boolean queueQuery(String queueName) throws Exception; + + void closeSender(Object brokerConsumer) throws Exception; + + // This one can be a lot improved + ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception; + + Binary getCurrentTXID(); + + String tempQueueName(); + + void commitCurrentTX() throws Exception; + + void rollbackCurrentTX() throws Exception; + + void close() throws Exception; + + + void ack(Object brokerConsumer, Object message) throws Exception; + + /** + * @param brokerConsumer + * @param message + * @param updateCounts this identified if the cancel was because of a failure or just cleaning up the + * client's cache. + * in some implementations you could call this failed + */ + void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception; + + + void resumeDelivery(Object consumer); + + + /** + * @param delivery + * @param address + * @param messageFormat + * @param messageEncoded a Heap Buffer ByteBuffer (safe to convert into byte[]) + */ + void serverSend(Receiver receiver, Delivery delivery, String address, int messageFormat, ByteBuf messageEncoded) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionContext.java new file mode 100644 index 0000000..2cc725d --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionContext.java @@ -0,0 +1,35 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug; + +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.proton.plug.exceptions.HornetQAMQPException; + +/** + * @author Clebert Suconic + */ + +public interface AMQPSessionContext +{ + byte[] getTag(); + + void replaceTag(byte[] tag); + + void close(); + + void removeSender(Sender sender) throws HornetQAMQPException; + + void removeReceiver(Receiver receiver); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ClientSASL.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ClientSASL.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ClientSASL.java new file mode 100644 index 0000000..ef3af34 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ClientSASL.java @@ -0,0 +1,24 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug; + +/** + * @author Clebert Suconic + */ +public interface ClientSASL +{ + byte[] getBytes(); + + String getName(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/SASLResult.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/SASLResult.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/SASLResult.java new file mode 100644 index 0000000..e4b697e --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/SASLResult.java @@ -0,0 +1,25 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug; + +/** + * @author Clebert Suconic + */ + +public interface SASLResult +{ + String getUser(); + + boolean isSuccess(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ServerSASL.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ServerSASL.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ServerSASL.java new file mode 100644 index 0000000..dbadaca --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ServerSASL.java @@ -0,0 +1,25 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug; + +/** + * @author Clebert Suconic + */ + +public interface ServerSASL +{ + String getName(); + + SASLResult processSASL(byte[] bytes); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java new file mode 100644 index 0000000..45a3ef0 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java @@ -0,0 +1,282 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.engine.Transport; +import org.proton.plug.AMQPConnectionCallback; +import org.proton.plug.AMQPConnectionContext; +import org.proton.plug.SASLResult; +import org.proton.plug.exceptions.HornetQAMQPException; +import org.proton.plug.handler.ProtonHandler; +import org.proton.plug.handler.impl.DefaultEventHandler; +import org.proton.plug.util.ByteUtil; +import org.proton.plug.util.DebugInfo; + +/** + * Clebert Suconic + */ +public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext +{ + + + protected ProtonHandler handler = ProtonHandler.Factory.create(); + + protected AMQPConnectionCallback connectionCallback; + + private final Map<Session, AbstractProtonSessionContext> sessions = new ConcurrentHashMap<>(); + + + public AbstractConnectionContext(AMQPConnectionCallback connectionCallback) + { + this.connectionCallback = connectionCallback; + connectionCallback.setConnection(this); + handler.addEventHandler(new LocalListener()); + } + + public SASLResult getSASLResult() + { + return handler.getSASLResult(); + } + + @Override + public void inputBuffer(ByteBuf buffer) + { + if (DebugInfo.debug) + { + ByteUtil.debugFrame("Buffer Received ", buffer); + } + + handler.inputBuffer(buffer); + } + + public void destroy() + { + connectionCallback.close(); + } + + /** + * See comment at {@link org.proton.plug.AMQPConnectionContext#isSyncOnFlush()} + * @return + * @See {@link org.proton.plug.AMQPConnectionContext#isSyncOnFlush()} + */ + public boolean isSyncOnFlush() + { + return false; + } + + + public Object getLock() + { + return handler.getLock(); + } + + @Override + public int capacity() + { + return handler.capacity(); + } + + @Override + public void outputDone(int bytes) + { + handler.outputDone(bytes); + } + + public void flush() + { + handler.flush(); + } + + public void close() + { + handler.close(); + } + + protected AbstractProtonSessionContext getSessionExtension(Session realSession) throws HornetQAMQPException + { + AbstractProtonSessionContext sessionExtension = sessions.get(realSession); + if (sessionExtension == null) + { + // how this is possible? Log a warn here + sessionExtension = newSessionExtension(realSession); + realSession.setContext(sessionExtension); + sessions.put(realSession, sessionExtension); + } + return sessionExtension; + } + + protected abstract void remoteLinkOpened(Link link) throws Exception; + + + protected abstract AbstractProtonSessionContext newSessionExtension(Session realSession) throws HornetQAMQPException; + + @Override + public boolean checkDataReceived() + { + return handler.checkDataReceived(); + } + + @Override + public long getCreationTime() + { + return handler.getCreationTime(); + } + + protected void flushBytes() + { + ByteBuf bytes; + // handler.outputBuffer has the lock + while ((bytes = handler.outputBuffer()) != null) + { + connectionCallback.onTransport(bytes, AbstractConnectionContext.this); + } + } + + + // This listener will perform a bunch of things here + class LocalListener extends DefaultEventHandler + { + + @Override + public void onSASLInit(ProtonHandler handler, Connection connection) + { + handler.createServerSASL(connectionCallback.getSASLMechnisms()); + } + + @Override + public void onTransport(Transport transport) + { + flushBytes(); + } + + @Override + public void onRemoteOpen(Connection connection) throws Exception + { + synchronized (getLock()) + { + connection.setContext(AbstractConnectionContext.this); + connection.open(); + } + initialise(); + } + + + @Override + public void onRemoteClose(Connection connection) + { + synchronized (getLock()) + { + connection.close(); + for (AbstractProtonSessionContext protonSession : sessions.values()) + { + protonSession.close(); + } + sessions.clear(); + } + // We must force write the channel before we actually destroy the connection + onTransport(handler.getTransport()); + destroy(); + } + + @Override + public void onLocalOpen(Session session) throws Exception + { + getSessionExtension(session); + } + + @Override + public void onRemoteOpen(Session session) throws Exception + { + getSessionExtension(session).initialise(); + synchronized (getLock()) + { + session.open(); + } + } + + + @Override + public void onLocalClose(Session session) throws Exception + { + } + + @Override + public void onRemoteClose(Session session) throws Exception + { + synchronized (getLock()) + { + session.close(); + } + + AbstractProtonSessionContext sessionContext = (AbstractProtonSessionContext) session.getContext(); + if (sessionContext != null) + { + sessionContext.close(); + sessions.remove(session); + session.setContext(null); + } + } + + @Override + public void onRemoteOpen(Link link) throws Exception + { + remoteLinkOpened(link); + } + + @Override + public void onFlow(Link link) throws Exception + { + ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit()); + } + + @Override + public void onRemoteClose(Link link) throws Exception + { + link.close(); + ((ProtonDeliveryHandler) link.getContext()).close(); + } + + + public void onRemoteDetach(Link link) throws Exception + { + link.detach(); + } + + public void onDelivery(Delivery delivery) throws Exception + { + ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext(); + if (handler != null) + { + handler.onMessage(delivery); + } + else + { + // TODO: logs + + System.err.println("Handler is null, can't delivery " + delivery); + } + } + + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java new file mode 100644 index 0000000..45b5250 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java @@ -0,0 +1,154 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.exceptions.HornetQAMQPException; +import org.proton.plug.util.CreditsSemaphore; +import org.proton.plug.util.NettyWritable; + +/** + * A this is a wrapper around a HornetQ ServerConsumer for handling outgoing messages and incoming acks via a Proton Sender + * + * @author <a href="mailto:[email protected]">Andy Taylor</a> + */ +public abstract class AbstractProtonContextSender extends ProtonInitializable implements ProtonDeliveryHandler +{ + protected final AbstractProtonSessionContext protonSession; + protected final Sender sender; + protected final AbstractConnectionContext connection; + protected boolean closed = false; + protected final AMQPSessionCallback sessionSPI; + protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0); + + + public AbstractProtonContextSender(AbstractConnectionContext connection, Sender sender, AbstractProtonSessionContext protonSession, AMQPSessionCallback server) + { + this.connection = connection; + this.sender = sender; + this.protonSession = protonSession; + this.sessionSPI = server; + } + + public void onFlow(int credits) + { + this.creditsSemaphore.setCredits(credits); + } + + /* + * start the session + * */ + public void start() throws HornetQAMQPException + { + sessionSPI.start(); + // protonSession.getServerSession().start(); + } + + /* + * close the session + * */ + public void close() throws HornetQAMQPException + { + closed = true; + protonSession.removeSender(sender); + synchronized (connection.getLock()) + { + sender.close(); + } + + connection.flush(); + } + + @Override + /* + * handle an incoming Ack from Proton, basically pass to HornetQ to handle + * */ + public abstract void onMessage(Delivery delivery) throws HornetQAMQPException; + + /* + * check the state of the consumer, i.e. are there any more messages. only really needed for browsers? + * */ + public void checkState() + { + } + + public Sender getSender() + { + return sender; + } + + protected int performSend(ProtonJMessage serverMessage, Object context) + { + if (!creditsSemaphore.tryAcquire()) + { + try + { + creditsSemaphore.acquire(); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + // nothing to be done here.. we just keep going + throw new IllegalStateException(e.getMessage(), e); + } + } + + //presettle means we can ack the message on the dealer side before we send it, i.e. for browsers + boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; + + //we only need a tag if we are going to ack later + byte[] tag = preSettle ? new byte[0] : protonSession.getTag(); + + ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); + try + { + serverMessage.encode(new NettyWritable(nettyBuffer)); + + int size = nettyBuffer.writerIndex(); + + synchronized (connection.getLock()) + { + final Delivery delivery; + delivery = sender.delivery(tag, 0, tag.length); + delivery.setContext(context); + + // this will avoid a copy.. patch provided by Norman using buffer.array() + sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes()); + + if (preSettle) + { + delivery.settle(); + } + else + { + sender.advance(); + } + } + + connection.flush(); + + return size; + } + finally + { + nettyBuffer.release(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java new file mode 100644 index 0000000..fb4bb07 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java @@ -0,0 +1,67 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context; + +import org.apache.qpid.proton.engine.Receiver; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.exceptions.HornetQAMQPException; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * <p/> + * handles incoming messages via a Proton Receiver and forwards them to HornetQ + */ +public abstract class AbstractProtonReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler +{ + protected final AbstractConnectionContext connection; + + protected final AbstractProtonSessionContext protonSession; + + protected final Receiver receiver; + + protected final String address; + + protected final AMQPSessionCallback sessionSPI; + + public AbstractProtonReceiverContext(AMQPSessionCallback sessionSPI, AbstractConnectionContext connection, AbstractProtonSessionContext protonSession, Receiver receiver) + { + this.connection = connection; + this.protonSession = protonSession; + this.receiver = receiver; + if (receiver.getRemoteTarget() != null) + { + this.address = receiver.getRemoteTarget().getAddress(); + } + else + { + this.address = null; + } + this.sessionSPI = sessionSPI; + } + + @Override + public void close() throws HornetQAMQPException + { + protonSession.removeReceiver(receiver); + } + + public void flow(int credits) + { + synchronized (connection.getLock()) + { + receiver.flow(credits); + } + connection.flush(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java new file mode 100644 index 0000000..8f28039 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java @@ -0,0 +1,189 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.AMQPSessionContext; +import org.proton.plug.exceptions.HornetQAMQPException; +import org.proton.plug.exceptions.HornetQAMQPInternalErrorException; + +/** + * ProtonSession is a direct representation of the session on the broker. + * It has a link between a ProtonSession and a Broker or Client Session + * The Broker Session is linked through the ProtonSessionSPI + * + * @author Clebert Suconic + */ +public abstract class AbstractProtonSessionContext extends ProtonInitializable implements AMQPSessionContext +{ + protected final AbstractConnectionContext connection; + + protected final AMQPSessionCallback sessionSPI; + + protected final Session session; + + private long currentTag = 0; + + protected Map<Receiver, AbstractProtonReceiverContext> receivers = new HashMap<Receiver, AbstractProtonReceiverContext>(); + + protected Map<Sender, AbstractProtonContextSender> senders = new HashMap<Sender, AbstractProtonContextSender>(); + + protected boolean closed = false; + + public AbstractProtonSessionContext(AMQPSessionCallback sessionSPI, AbstractConnectionContext connection, Session session) + { + this.connection = connection; + this.sessionSPI = sessionSPI; + this.session = session; + } + + public void initialise() throws Exception + { + if (!isInitialized()) + { + super.initialise(); + + if (sessionSPI != null) + { + try + { + sessionSPI.init(this, connection.getSASLResult()); + } + catch (Exception e) + { + throw new HornetQAMQPInternalErrorException(e.getMessage(), e); + } + } + } + } + + + /** + * TODO: maybe it needs to go? + * + * @param consumer + * @param queueName + */ + public void disconnect(Object consumer, String queueName) + { + AbstractProtonContextSender protonConsumer = senders.remove(consumer); + if (protonConsumer != null) + { + try + { + protonConsumer.close(); + } + catch (HornetQAMQPException e) + { + protonConsumer.getSender().setTarget(null); + protonConsumer.getSender().setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); + } + } + } + + + @Override + public byte[] getTag() + { + return Long.toHexString(currentTag++).getBytes(); + } + + @Override + public void replaceTag(byte[] tag) + { + // TODO: do we need to reuse this? + } + + @Override + public void close() + { + if (closed) + { + return; + } + + + + // Making a copy to avoid ConcurrentModificationException during the iteration + Set<AbstractProtonReceiverContext> receiversCopy = new HashSet<>(); + receiversCopy.addAll(receivers.values()); + + + for (AbstractProtonReceiverContext protonProducer : receiversCopy) + { + try + { + protonProducer.close(); + } + catch (Exception e) + { + e.printStackTrace(); + // TODO Logging + } + } + receivers.clear(); + + Set<AbstractProtonContextSender> protonSendersClone = new HashSet<>(); + protonSendersClone.addAll(senders.values()); + + for (AbstractProtonContextSender protonConsumer : protonSendersClone) + { + try + { + protonConsumer.close(); + } + catch (Exception e) + { + e.printStackTrace(); + // TODO Logging + } + } + senders.clear(); + try + { + if (sessionSPI != null) + { + sessionSPI.rollbackCurrentTX(); + sessionSPI.close(); + } + } + catch (Exception e) + { + e.printStackTrace(); + // TODO logging + } + closed = true; + } + + @Override + public void removeSender(Sender sender) throws HornetQAMQPException + { + senders.remove(sender); + } + + @Override + public void removeReceiver(Receiver receiver) + { + receivers.remove(receiver); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java new file mode 100644 index 0000000..3436c5c --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java @@ -0,0 +1,31 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context; + +import org.apache.qpid.proton.engine.Delivery; +import org.proton.plug.exceptions.HornetQAMQPException; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * <p/> + * An interface to handle deliveries, either messages, acks or transaction calls + */ +public interface ProtonDeliveryHandler +{ + void onFlow(int currentCredits); + + void onMessage(Delivery delivery) throws HornetQAMQPException; + + void close() throws HornetQAMQPException; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java new file mode 100644 index 0000000..0dad6d7 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java @@ -0,0 +1,83 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context; + +import java.util.concurrent.TimeUnit; + +import org.proton.plug.exceptions.HornetQAMQPException; +import org.proton.plug.exceptions.HornetQAMQPIllegalStateException; +import org.proton.plug.exceptions.HornetQAMQPTimeoutException; +import org.proton.plug.util.FutureRunnable; + +/** + * @author Clebert Suconic + */ + +public class ProtonInitializable +{ + + private Runnable afterInit; + + private boolean initialized = false; + + public void afterInit(Runnable afterInit) + { + this.afterInit = afterInit; + } + + + public boolean isInitialized() + { + return initialized; + } + + + public void initialise() throws Exception + { + if (!initialized) + { + initialized = false; + try + { + if (afterInit != null) + { + afterInit.run(); + } + } + finally + { + afterInit = null; + } + } + } + + + public void waitWithTimeout(FutureRunnable latch) throws HornetQAMQPException + { + try + { + // TODO Configure this + if (!latch.await(30, TimeUnit.SECONDS)) + { + throw new HornetQAMQPTimeoutException("Timed out waiting for response"); + } + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new HornetQAMQPIllegalStateException(e.getMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonPlugSender.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonPlugSender.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonPlugSender.java new file mode 100644 index 0000000..eff114a --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonPlugSender.java @@ -0,0 +1,27 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context; + +import org.apache.qpid.proton.engine.Sender; + +/** + * @author Clebert Suconic + */ + +public interface ProtonPlugSender +{ + int deliverMessage(Object message, int deliveryCount) throws Exception; + + Sender getSender(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java new file mode 100644 index 0000000..9f53085 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java @@ -0,0 +1,134 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.proton.plug.context; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.transaction.Declare; +import org.apache.qpid.proton.amqp.transaction.Declared; +import org.apache.qpid.proton.amqp.transaction.Discharge; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.message.impl.MessageImpl; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.exceptions.HornetQAMQPException; +import org.proton.plug.logger.HornetQAMQPProtocolMessageBundle; + +import static org.proton.plug.util.DeliveryUtil.decodeMessageImpl; +import static org.proton.plug.util.DeliveryUtil.readDelivery; + +/** + * handles an amqp Coordinator to deal with transaction boundaries etc + */ +public class ProtonTransactionHandler implements ProtonDeliveryHandler +{ + + final AMQPSessionCallback sessionSPI; + + public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) + { + this.sessionSPI = sessionSPI; + } + + @Override + public void onMessage(Delivery delivery) throws HornetQAMQPException + { + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); + + final Receiver receiver; + try + { + receiver = ((Receiver) delivery.getLink()); + + if (!delivery.isReadable()) + { + return; + } + + readDelivery(receiver, buffer); + + receiver.advance(); + + MessageImpl msg = decodeMessageImpl(buffer); + + Object action = ((AmqpValue) msg.getBody()).getValue(); + + if (action instanceof Declare) + { + Binary txID = sessionSPI.getCurrentTXID(); + Declared declared = new Declared(); + declared.setTxnId(txID); + delivery.disposition(declared); + delivery.settle(); + } + else if (action instanceof Discharge) + { + Discharge discharge = (Discharge) action; + if (discharge.getFail()) + { + try + { + sessionSPI.rollbackCurrentTX(); + } + catch (Exception e) + { + throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage()); + } + } + else + { + try + { + sessionSPI.commitCurrentTX(); + } + catch (Exception e) + { + throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage()); + } + } + delivery.settle(); + } + + } + catch (Exception e) + { + e.printStackTrace(); + Rejected rejected = new Rejected(); + ErrorCondition condition = new ErrorCondition(); + condition.setCondition(Symbol.valueOf("failed")); + condition.setDescription(e.getMessage()); + rejected.setError(condition); + delivery.disposition(rejected); + } + finally + { + buffer.release(); + } + } + + public void onFlow(int credits) + { + + } + + @Override + public void close() throws HornetQAMQPException + { + //noop + } +}
