http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java new file mode 100644 index 0000000..ee01add --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.meta; + + +/** + * Base class for the JMS object representing JMS resources such as Connection, Session, etc. + */ +public interface JmsResource { + + /** + * Allows a visitor object to walk the resources and process them. + * + * @param visitor + * The visitor instance that is processing this resource. + * + * @throws Exception if an error occurs while visiting this resource. + */ + void visit(JmsResourceVistor visitor) throws Exception; + +}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java new file mode 100644 index 0000000..32ee783 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.meta; + +/** + * Base for all Id type classes used in the JMS Framework + */ +public interface JmsResourceId { + + /** + * Allows a Provider to embed a hint in this Id value for later use. The + * hint can allow the provider to more easier locate state data for a resource + * + * @param hint + * The value to add into this Id. + */ + void setProviderHint(Object hint); + + /** + * Return the previously stored Provider hint object. + * + * @return the previously stored Provider hint object. + */ + Object getProviderHint(); + + /** + * Allows a provider to set it's own internal Id object for this resource + * in the case where the JMS framework Id cannot be used directly by the + * Provider implementation. + * + * @param id + */ + void setProviderId(Object id); + + /** + * Returns the previously stored Provider Id value. + * + * @return the previously stored Provider Id value. + */ + Object getProviderId(); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceVistor.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceVistor.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceVistor.java new file mode 100644 index 0000000..1e963d9 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceVistor.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.meta; + +import org.apache.qpid.jms.JmsDestination; + +/** + * Visitor interface to make processing JmsResources simpler. + */ +public interface JmsResourceVistor { + + void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception; + + void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception; + + void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception; + + void processProducerInfo(JmsProducerInfo producerInfo) throws Exception; + + void processTransactionInfo(JmsTransactionInfo transactionInfo) throws Exception; + + void processDestination(JmsDestination destination) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionId.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionId.java new file mode 100644 index 0000000..de0d452 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionId.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.meta; + +public final class JmsSessionId extends JmsAbstractResourceId implements Comparable<JmsSessionId> { + + private final String connectionId; + private final long value; + + protected transient String key; + protected transient JmsConnectionId parentId; + + public JmsSessionId(String connectionId, long value) { + this.connectionId = connectionId; + this.value = value; + } + + public JmsSessionId(JmsConnectionId connectionId, long sessionId) { + this.connectionId = connectionId.getValue(); + this.value = sessionId; + this.parentId = connectionId; + } + + public JmsSessionId(JmsSessionId id) { + this.connectionId = id.getConnectionId(); + this.value = id.getValue(); + } + + public JmsSessionId(JmsProducerId id) { + this.connectionId = id.getConnectionId(); + this.value = id.getSessionId(); + } + + public JmsSessionId(JmsConsumerId id) { + this.connectionId = id.getConnectionId(); + this.value = id.getSessionId(); + } + + public JmsConnectionId getParentId() { + if (parentId == null) { + parentId = new JmsConnectionId(this); + } + return parentId; + } + + @Override + public int hashCode() { + if (hashCode == 0) { + hashCode = connectionId.hashCode() ^ (int)value; + } + return hashCode; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || o.getClass() != JmsSessionId.class) { + return false; + } + JmsSessionId id = (JmsSessionId)o; + return value == id.value && connectionId.equals(id.connectionId); + } + + public String getConnectionId() { + return connectionId; + } + + public long getValue() { + return value; + } + + @Override + public String toString() { + if (key == null) { + key = connectionId + ":" + value; + } + return key; + } + + @Override + public int compareTo(JmsSessionId other) { + return toString().compareTo(other.toString()); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java new file mode 100644 index 0000000..04f57d8 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.meta; + +import javax.jms.Session; + +public final class JmsSessionInfo implements JmsResource { + + private final JmsSessionId sessionId; + private int acknowledgementMode; + private boolean sendAcksAsync; + + public JmsSessionInfo(JmsConnectionInfo connectionMeta, long sessionId) { + this.sessionId = new JmsSessionId(connectionMeta.getConnectionId(), sessionId); + } + + public JmsSessionInfo(JmsSessionId sessionId) { + this.sessionId = sessionId; + } + + public JmsSessionId getSessionId() { + return sessionId; + } + + @Override + public void visit(JmsResourceVistor vistor) throws Exception { + vistor.processSessionInfo(this); + } + + public int getAcknowledgementMode() { + return acknowledgementMode; + } + + public void setAcknowledgementMode(int acknowledgementMode) { + this.acknowledgementMode = acknowledgementMode; + } + + public boolean isTransacted() { + return this.acknowledgementMode == Session.SESSION_TRANSACTED; + } + + public boolean isSendAcksAsync() { + return sendAcksAsync; + } + + public void setSendAcksAsync(boolean sendAcksAsync) { + this.sendAcksAsync = sendAcksAsync; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionId.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionId.java new file mode 100644 index 0000000..226aedf --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionId.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.meta; + +public final class JmsTransactionId extends JmsAbstractResourceId implements Comparable<JmsTransactionId> { + + private final JmsConnectionId connectionId; + private final long value; + + private transient String transactionKey; + + public JmsTransactionId(JmsConnectionId connectionId, long transactionId) { + this.connectionId = connectionId; + this.value = transactionId; + } + + public String getTransactionKey() { + if (transactionKey == null) { + transactionKey = "TX:" + connectionId + ":" + value; + } + return transactionKey; + } + + @Override + public String toString() { + return getTransactionKey(); + } + + @Override + public int hashCode() { + if (hashCode == 0) { + hashCode = connectionId.hashCode() ^ (int)value; + } + return hashCode; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || other.getClass() != JmsTransactionId.class) { + return false; + } + + JmsTransactionId tx = (JmsTransactionId) other; + + return value == tx.value && connectionId.equals(tx.connectionId); + } + + @Override + public int compareTo(JmsTransactionId o) { + int result = connectionId.compareTo(o.connectionId); + if (result == 0) { + result = (int)(value - o.value); + } + return result; + } + + public long getValue() { + return value; + } + + public JmsConnectionId getConnectionId() { + return connectionId; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java new file mode 100644 index 0000000..5fca50f --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.meta; + +public final class JmsTransactionInfo implements JmsResource, Comparable<JmsTransactionInfo> { + + protected final JmsSessionId sessionId; + protected JmsTransactionId transactionId; + + public JmsTransactionInfo(JmsSessionId sessionId, JmsTransactionId transactionId) { + this.sessionId = sessionId; + this.transactionId = transactionId; + } + + public JmsTransactionInfo copy() { + return new JmsTransactionInfo(sessionId, transactionId); + } + + public JmsSessionId getSessionId() { + return sessionId; + } + + public JmsTransactionId getTransactionId() { + return transactionId; + } + + public void setTransactionId(JmsTransactionId transactionId) { + this.transactionId = transactionId; + } + + public JmsSessionId getParentId() { + return this.sessionId; + } + + @Override + public int hashCode() { + return (transactionId == null) ? 0 : transactionId.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + + JmsTransactionInfo other = (JmsTransactionInfo) obj; + + if (transactionId == null && other.transactionId != null) { + return false; + } else if (!transactionId.equals(other.transactionId)) { + return false; + } + return true; + } + + @Override + public int compareTo(JmsTransactionInfo other) { + return this.transactionId.compareTo(other.transactionId); + } + + @Override + public String toString() { + return "JmsTransactionInfo { " + this.transactionId + " }"; + } + + @Override + public void visit(JmsResourceVistor visitor) throws Exception { + visitor.processTransactionInfo(this); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/package.html ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/package.html b/qpid-jms-client/src/main/java/org/apache/qpid/jms/package.html new file mode 100644 index 0000000..3d678ce --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/package.html @@ -0,0 +1,25 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<html> +<head> +</head> +<body> + +The core AMQP JMS client implementation classes. + +</body> +</html> http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java new file mode 100644 index 0000000..9d743d2 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.jms.util.IOExceptionSupport; + +/** + * Base class used to implement the most common features of a Provider instance.. + * + * Methods that are fully optional such as transaction commit and rollback are implemented + * here to throw an UnsupportedOperationException. + */ +public abstract class AbstractProvider implements Provider { + + protected final URI remoteURI; + protected final AtomicBoolean closed = new AtomicBoolean(); + protected final ScheduledExecutorService serializer; + + protected ProviderListener listener; + + public AbstractProvider(URI remoteURI) { + this.remoteURI = remoteURI; + + this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + + @Override + public Thread newThread(Runnable runner) { + Thread serial = new Thread(runner); + serial.setDaemon(true); + serial.setName(toString()); + return serial; + } + }); + } + + @Override + public void start() throws IOException, IllegalStateException { + checkClosed(); + + if (listener == null) { + throw new IllegalStateException("No ProviderListener registered."); + } + } + + @Override + public void setProviderListener(ProviderListener listener) { + this.listener = listener; + } + + @Override + public ProviderListener getProviderListener() { + return listener; + } + + @Override + public URI getRemoteURI() { + return remoteURI; + } + + public void fireProviderException(Throwable ex) { + ProviderListener listener = this.listener; + if (listener != null) { + listener.onConnectionFailure(IOExceptionSupport.create(ex)); + } + } + + protected void checkClosed() throws ProviderClosedException { + if (closed.get()) { + throw new ProviderClosedException("The Provider is already closed"); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AsyncResult.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AsyncResult.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AsyncResult.java new file mode 100644 index 0000000..b79a3cc --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AsyncResult.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +/** + * Defines a result interface for Asynchronous operations. + */ +public interface AsyncResult { + + /** + * If the operation fails this method is invoked with the Exception + * that caused the failure. + * + * @param result + * The error that resulted in this asynchronous operation failing. + */ + void onFailure(Throwable result); + + /** + * If the operation succeeds the resulting value produced is set to null and + * the waiting parties are signaled. + */ + void onSuccess(); + + /** + * Returns true if the AsyncResult has completed. The task is considered complete + * regardless if it succeeded or failed. + * + * @return returns true if the asynchronous operation has completed. + */ + boolean isComplete(); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java new file mode 100644 index 0000000..e0d3e01 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import java.io.IOException; +import java.net.URI; + +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; + +/** + * Default implementation that does nothing for all callbacks. + */ +public class DefaultProviderListener implements ProviderListener { + + @Override + public void onMessage(JmsInboundMessageDispatch envelope) { + } + + @Override + public void onConnectionInterrupted(URI remoteURI) { + } + + @Override + public void onConnectionFailure(IOException ex) { + } + + @Override + public void onConnectionRecovery(Provider provider) { + } + + @Override + public void onConnectionRecovered(Provider provider) { + } + + @Override + public void onConnectionRestored(URI remoteURI) { + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java new file mode 100644 index 0000000..b8634ea --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import java.io.IOException; +import java.net.URI; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.message.JmsMessageFactory; +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; +import org.apache.qpid.jms.meta.JmsConsumerId; +import org.apache.qpid.jms.meta.JmsResource; +import org.apache.qpid.jms.meta.JmsSessionId; +import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; + +/** + * Defines the interface that an Implementation of a Specific wire level protocol + * provider must implement. This Provider interface requires that the implementation + * methods all operate in an asynchronous manner. + */ +public interface Provider { + + /** + * Performs the initial low level connection for this provider such as TCP or + * SSL connection to a remote Broker. If this operation fails then the Provider + * is considered to be unusable and no further operations should be attempted + * using this Provider. + * + * @throws IOException if the remote resource can not be contacted. + */ + void connect() throws IOException; + + /** + * Starts the Provider. The start method provides a place for the Provider to perform + * and pre-start configuration checks to ensure that the current state is valid and that + * all contracts have been met prior to starting. + * + * @throws IOException if an error occurs during start processing. + * @throws IllegalStateException if the Provider is improperly configured. + */ + void start() throws IOException, IllegalStateException; + + /** + * Closes this Provider terminating all connections and canceling any pending + * operations. The Provider is considered unusable after this call. This call + * is a blocking call and will not return until the Provider has closed or an + * error occurs. + */ + void close(); + + /** + * Returns the URI used to configure this Provider and specify the remote address of the + * Broker it connects to. + * + * @return the URI used to configure this Provider. + */ + URI getRemoteURI(); + + /** + * Create the Provider version of the given JmsResource. + * + * For each JMS Resource type the Provider implementation must create it's own internal + * representation and upon successful creation provide the caller with a response. The + * Provider should examine the given JmsResource to determine if the given configuration + * is supported or can be simulated, or is not supported in which case an exception should be + * thrown. + * + * It is possible for a Provider to indicate that it cannot complete a requested create + * either due to some mis-configuration such as bad login credentials on connection create + * by throwing a JMSException. If the Provider does not support creating of the indicated + * resource such as a Temporary Queue etc the provider may throw an UnsupportedOperationException + * to indicate this. + * + * @param resource + * The JmsResouce instance that indicates what is being created. + * @param request + * The request object that should be signaled when this operation completes. + * + * @throws IOException if an error occurs or the Provider is already closed. + * @throws JMSException if an error occurs due to JMS violation such as bad credentials. + */ + void create(JmsResource resource, AsyncResult request) throws IOException, JMSException; + + /** + * Starts the Provider version of the given JmsResource. + * + * For some JMS Resources it is necessary or advantageous to have a started state that + * must be triggered prior to it's normal use. + * + * An example of this would be a MessageConsumer which should not receive any incoming + * messages until the JMS layer is in a state to handle them. One such time would be + * after connection recovery. A JMS consumer should normally recover with it's prefetch + * value set to zero, or an AMQP link credit of zero and only open up the credit window + * once all Connection resources are restored. + * + * The provider is required to implement this method and not throw any error other than + * an IOException if a communication error occurs. The start operation is not required to + * have any effect on the provider resource but must not throw UnsupportedOperation etc. + * + * @param resource + * The JmsResouce instance that indicates what is being started. + * @param request + * The request object that should be signaled when this operation completes. + * + * @throws IOException if an error occurs or the Provider is already closed. + * @throws JMSException if an error occurs due to JMS violation such as already closed resource. + */ + void start(JmsResource resource, AsyncResult request) throws IOException, JMSException; + + /** + * Instruct the Provider to dispose of a given JmsResource. + * + * The provider is given a JmsResource which it should use to remove any associated + * resources and inform the remote Broker instance of the removal of this resource. + * + * If the Provider cannot destroy the resource due to a non-communication error such as + * the logged in user not have role access to destroy the given resource it may throw an + * instance of JMSException to indicate such an error. + * + * @param resource + * The JmsResouce that identifies a previously created JmsResource. + * @param request + * The request object that should be signaled when this operation completes. + * + * @throws IOException if an error occurs or the Provider is already closed. + * @throws JMSException if an error occurs due to JMS violation such as not authorized. + */ + void destroy(JmsResource resourceId, AsyncResult request) throws IOException, JMSException; + + /** + * Sends the JmsMessage contained in the outbound dispatch envelope. + * + * @param envelope + * the message envelope containing the JmsMessage to send. + * @param request + * The request object that should be signaled when this operation completes. + * + * @throws IOException if an error occurs or the Provider is already closed. + * @throws JMSException if an error that maps to JMS occurs such as not authorized. + */ + void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException; + + /** + * Called to acknowledge all messages that have been delivered in a given session. + * + * This method is typically used by a Session that is configured for client acknowledge + * mode. The acknowledgment should only be applied to Messages that have been marked + * as delivered and not those still awaiting dispatch. + * + * @param sessionId + * the ID of the Session whose delivered messages should be acknowledged. + * @param request + * The request object that should be signaled when this operation completes. + * + * @throws IOException if an error occurs or the Provider is already closed. + * @throws JMSException if an error occurs due to JMS violation such as unmatched ack. + */ + void acknowledge(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException; + + /** + * Called to acknowledge a JmsMessage has been delivered, consumed, re-delivered...etc. + * + * The provider should perform an acknowledgment for the message based on the configured + * mode of the consumer that it was dispatched to and the capabilities of the protocol. + * + * @param envelope + * The message dispatch envelope containing the Message delivery information. + * @param ackType + * The type of acknowledgment being done. + * @param request + * The request object that should be signaled when this operation completes. + * + * @throws IOException if an error occurs or the Provider is already closed. + * @throws JMSException if an error occurs due to JMS violation such as unmatched ack. + */ + void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType, AsyncResult request) + throws IOException, JMSException; + + /** + * Called to commit an open transaction. + * + * If the provider is unable to support transactions then it should throw an + * UnsupportedOperationException to indicate this. The Provider may also throw a + * JMSException to indicate a transaction was already rolled back etc. + * + * @param sessionId + * the Id of the JmsSession that is committing the current transaction. + * @param request + * The request object that should be signaled when this operation completes. + * + * @throws IOException if an error occurs or the Provider is already closed. + * @throws JMSException if an error occurs due to JMS violation such not authorized. + */ + void commit(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException; + + /** + * Called to roll back an open transaction. + * + * @param sessionId + * the Id of the JmsSession that is rolling back the current transaction. + * @param request + * The request object that should be signaled when this operation completes. + * + * @throws IOException if an error occurs or the Provider is already closed. + * @throws JMSException if an error occurs due to JMS violation such not authorized. + */ + void rollback(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException; + + /** + * Called to recover all unacknowledged messages for a Session in client Ack mode. + * + * @param sessionId + * the Id of the JmsSession that is recovering unacknowledged messages.. + * @param request + * The request object that should be signaled when this operation completes. + * + * @throws IOException if an error occurs or the Provider is already closed. + */ + void recover(JmsSessionId sessionId, AsyncResult request) throws IOException; + + /** + * Remove a durable topic subscription by name. + * + * A provider can throw an instance of JMSException to indicate that it cannot perform the + * un-subscribe operation due to bad security credentials etc. + * + * @param subscription + * the name of the durable subscription that is to be removed. + * @param request + * The request object that should be signaled when this operation completes. + * + * @throws IOException if an error occurs or the Provider is already closed. + * @throws JMSException if an error occurs due to JMS violation such not authorized. + */ + void unsubscribe(String subscription, AsyncResult request) throws IOException, JMSException; + + /** + * Request a remote peer send a Message to this client. A message pull request is + * usually only needed in the case where the client sets a zero prefetch limit on the + * consumer. If the consumer has a set prefetch that's greater than zero this method + * should just return without performing and action. + * + * @param timeout + * the amount of time to tell the remote peer to keep this pull request valid. + * @param request + * The request object that should be signaled when this operation completes. + * + * @throws IOException if an error occurs or the Provider is already closed. + */ + void pull(JmsConsumerId consumerId, long timeout, AsyncResult request) throws IOException; + + /** + * Gets the Provider specific Message factory for use in the JMS layer when a Session + * is asked to create a Message type. The Provider should implement it's own internal + * JmsMessage core to optimize read / write and marshal operations for the connection. + * + * @returns a JmsMessageFactory instance for use by the JMS layer. + */ + JmsMessageFactory getMessageFactory(); + + /** + * Sets the listener of events from this Provider instance. + * + * @param listener + * The listener instance that will receive all event callbacks. + */ + void setProviderListener(ProviderListener listener); + + /** + * Gets the currently set ProdiverListener instance. + * + * @return the currently set ProviderListener instance. + */ + ProviderListener getProviderListener(); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java new file mode 100644 index 0000000..1e00b58 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import java.io.IOException; + +public class ProviderClosedException extends IOException { + + private static final long serialVersionUID = 1L; + + public ProviderClosedException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java new file mode 100644 index 0000000..4e3f90c --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +/** + * Set of Provider specific constants used when interacting with the Provider API. + */ +public final class ProviderConstants { + + private ProviderConstants() {} + + public enum ACK_TYPE { + DELIVERED(0), + CONSUMED(1), + REDELIVERED(2), + POISONED(3), + EXPIRED(4); + + private final int value; + + private ACK_TYPE(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java new file mode 100644 index 0000000..c07518e --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import java.io.IOException; +import java.net.URI; + +import org.apache.qpid.jms.util.FactoryFinder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Interface that all JMS Providers must implement. + */ +public abstract class ProviderFactory { + + private static final Logger LOG = LoggerFactory.getLogger(ProviderFactory.class); + + private static final FactoryFinder<ProviderFactory> PROVIDER_FACTORY_FINDER = + new FactoryFinder<ProviderFactory>(ProviderFactory.class, + "META-INF/services/" + ProviderFactory.class.getPackage().getName().replace(".", "/") + "/"); + + /** + * Creates an instance of the given AsyncProvider and configures it using the + * properties set on the given remote broker URI. + * + * @param remoteURI + * The URI used to connect to a remote Broker. + * + * @return a new AsyncProvider instance. + * + * @throws Exception if an error occurs while creating the Provider instance. + */ + public abstract Provider createAsyncProvider(URI remoteURI) throws Exception; + + /** + * @return the name of this JMS Provider, e.g. STOMP, AMQP, MQTT...etc + */ + public abstract String getName(); + + /** + * Static create method that performs the ProviderFactory search and handles the + * configuration and setup. + * + * @param remoteURI + * the URI of the remote peer. + * + * @return a new AsyncProvider instance that is ready for use. + * + * @throws Exception if an error occurs while creating the AsyncProvider instance. + */ + public static Provider createAsync(URI remoteURI) throws Exception { + Provider result = null; + + try { + ProviderFactory factory = findProviderFactory(remoteURI); + result = factory.createAsyncProvider(remoteURI); + result.connect(); + } catch (Exception ex) { + LOG.error("Failed to create BlockingProvider instance for: {}", remoteURI.getScheme()); + LOG.trace("Error: ", ex); + throw ex; + } + + return result; + } + + /** + * Searches for a ProviderFactory by using the scheme from the given URI. + * + * The search first checks the local cache of provider factories before moving on + * to search in the class path. + * + * @param location + * The URI whose scheme will be used to locate a ProviderFactory. + * + * @return a provider factory instance matching the URI's scheme. + * + * @throws IOException if an error occurs while locating the factory. + */ + public static ProviderFactory findProviderFactory(URI location) throws IOException { + String scheme = location.getScheme(); + if (scheme == null) { + throw new IOException("No Provider scheme specified: [" + location + "]"); + } + + ProviderFactory factory = null; + try { + factory = PROVIDER_FACTORY_FINDER.newInstance(scheme); + } catch (Throwable e) { + throw new IOException("Provider scheme NOT recognized: [" + scheme + "]", e); + } + + return factory; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java new file mode 100644 index 0000000..7a52ad3 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.jms.util.IOExceptionSupport; + +/** + * Asynchronous Provider Future class. + */ +public class ProviderFuture implements AsyncResult { + + protected final CountDownLatch latch = new CountDownLatch(1); + protected Throwable error; + protected final AsyncResult watcher; + + public ProviderFuture() { + this.watcher = null; + } + + public ProviderFuture(AsyncResult watcher) { + this.watcher = watcher; + } + + @Override + public boolean isComplete() { + return latch.getCount() == 0; + } + + @Override + public void onFailure(Throwable result) { + error = result; + latch.countDown(); + if (watcher != null) { + watcher.onFailure(error); + } + } + + @Override + public void onSuccess() { + latch.countDown(); + if (watcher != null) { + watcher.onSuccess(); + } + } + + /** + * Timed wait for a response to a Provider operation. + * + * @param amount + * The amount of time to wait before abandoning the wait. + * @param unit + * The unit to use for this wait period. + * + * @return the result of this operation or null if the wait timed out. + * + * @throws IOException if an error occurs while waiting for the response. + */ + public void sync(long amount, TimeUnit unit) throws IOException { + try { + latch.await(amount, unit); + } catch (InterruptedException e) { + Thread.interrupted(); + throw IOExceptionSupport.create(e); + } + failOnError(); + } + + /** + * Waits for a response to some Provider requested operation. + * + * @return the response from the Provider for this operation. + * + * @throws IOException if an error occurs while waiting for the response. + */ + public void sync() throws IOException { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.interrupted(); + throw IOExceptionSupport.create(e); + } + failOnError(); + } + + private void failOnError() throws IOException { + Throwable cause = error; + if (cause != null) { + throw IOExceptionSupport.create(cause); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java new file mode 100644 index 0000000..18938d5 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import java.io.IOException; +import java.net.URI; + +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; + +/** + * Events interface used to update the listener with changes in provider state. + */ +public interface ProviderListener { + + /** + * Called when a new Message has arrived for a registered consumer. + * + * @param envelope + * The dispatch object containing the message and delivery information. + */ + void onMessage(JmsInboundMessageDispatch envelope); + + /** + * Called from a fault tolerant Provider instance to signal that the underlying + * connection to the Broker has been lost. The Provider will attempt to reconnect + * following this event unless closed. + * + * It is considered a programming error to allow any exceptions to be thrown from + * this notification method. + * + * @param remoteURI + * The URI of the Broker whose connection was lost. + */ + void onConnectionInterrupted(URI remoteURI); + + /** + * Called to indicate that a connection to the Broker has been reestablished and + * that notified listener should start to recover it's state. The provider will + * not transition to the recovered state until the listener notifies the provider + * that recovery is complete. + * + * @param provider + * The new Provider instance that will become active after the state + * has been recovered. + * + * @throws Exception if an error occurs during recovery attempt, this will fail + * the Provider that's being used for recovery. + */ + void onConnectionRecovery(Provider provider) throws Exception; + + /** + * Called to indicate that a connection to the Broker has been reestablished and + * that all recovery operations have succeeded and the connection will now be + * transitioned to a recovered state. This method gives the listener a chance + * so send any necessary post recovery commands such as consumer start or message + * pull for a zero prefetch consumer etc. + * + * @param provider + * The new Provider instance that will become active after the state + * has been recovered. + * + * @throws Exception if an error occurs during recovery attempt, this will fail + * the Provider that's being used for recovery. + */ + void onConnectionRecovered(Provider provider) throws Exception; + + /** + * Called to signal that all recovery operations are now complete and the Provider + * is again in a normal connected state. + * + * It is considered a programming error to allow any exceptions to be thrown from + * this notification method. + * + * @param remoteURI + * The URI of the Broker that the client has now connected to. + */ + void onConnectionRestored(URI remoteURI); + + /** + * Called to indicate that the underlying connection to the Broker has been lost and + * the Provider will not perform any reconnect. Following this call the provider is + * in a failed state and further calls to it will throw an Exception. + * + * @param ex + * The exception that indicates the cause of this Provider failure. + */ + void onConnectionFailure(IOException ex); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java new file mode 100644 index 0000000..1381dba --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider; + +import java.io.IOException; +import java.net.URI; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.message.JmsMessageFactory; +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; +import org.apache.qpid.jms.meta.JmsConsumerId; +import org.apache.qpid.jms.meta.JmsResource; +import org.apache.qpid.jms.meta.JmsSessionId; +import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; + +/** + * Allows one AsyncProvider instance to wrap around another and provide some additional + * features beyond the normal AsyncProvider interface. + * + * This wrapper is meant primarily for Providers that are adding some additional feature + * on-top of an existing provider such as a discovery based provider that only needs to + * pass along discovered remote peer information. + */ +public class ProviderWrapper<E extends Provider> implements Provider, ProviderListener { + + protected final E next; + protected ProviderListener listener; + + public ProviderWrapper(E next) { + this.next = next; + this.next.setProviderListener(this); + } + + @Override + public void connect() throws IOException { + next.connect(); + } + + @Override + public void start() throws IOException, IllegalStateException { + if (this.listener == null) { + throw new IllegalStateException("Cannot start with null ProviderListener"); + } + next.start(); + } + + @Override + public void close() { + next.close(); + } + + @Override + public URI getRemoteURI() { + return next.getRemoteURI(); + } + + @Override + public void create(JmsResource resource, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { + next.create(resource, request); + } + + @Override + public void start(JmsResource resource, AsyncResult request) throws IOException, JMSException { + next.start(resource, request); + } + + @Override + public void destroy(JmsResource resourceId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { + next.destroy(resourceId, request); + } + + @Override + public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { + next.send(envelope, request); + } + + @Override + public void acknowledge(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException { + next.acknowledge(sessionId, request); + } + + @Override + public void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType, AsyncResult request) throws IOException, JMSException { + next.acknowledge(envelope, ackType, request); + } + + @Override + public void commit(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { + next.commit(sessionId, request); + } + + @Override + public void rollback(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { + next.rollback(sessionId, request); + } + + @Override + public void recover(JmsSessionId sessionId, AsyncResult request) throws IOException, UnsupportedOperationException { + next.recover(sessionId, request); + } + + @Override + public void unsubscribe(String subscription, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException { + next.unsubscribe(subscription, request); + } + + @Override + public void pull(JmsConsumerId consumerId, long timeout, AsyncResult request) throws IOException, UnsupportedOperationException { + next.pull(consumerId, timeout, request); + } + + @Override + public JmsMessageFactory getMessageFactory() { + return next.getMessageFactory(); + } + + @Override + public void setProviderListener(ProviderListener listener) { + this.listener = listener; + } + + @Override + public ProviderListener getProviderListener() { + return this.listener; + } + + @Override + public void onMessage(JmsInboundMessageDispatch envelope) { + this.listener.onMessage(envelope); + } + + @Override + public void onConnectionInterrupted(URI remoteURI) { + this.listener.onConnectionInterrupted(remoteURI); + } + + @Override + public void onConnectionRecovery(Provider provider) throws Exception { + this.listener.onConnectionRecovery(provider); + } + + @Override + public void onConnectionRecovered(Provider provider) throws Exception { + this.listener.onConnectionRecovered(provider); + } + + @Override + public void onConnectionRestored(URI remoteURI) { + this.listener.onConnectionRestored(remoteURI); + } + + @Override + public void onConnectionFailure(IOException ex) { + this.listener.onConnectionInterrupted(this.next.getRemoteURI()); + } + + /** + * @return the wrapped AsyncProvider. + */ + public Provider getNext() { + return this.next; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java new file mode 100644 index 0000000..1175b34 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import java.io.IOException; + +import javax.jms.JMSException; +import javax.jms.JMSSecurityException; + +import org.apache.qpid.jms.meta.JmsResource; +import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.AmqpError; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Endpoint; +import org.apache.qpid.proton.engine.EndpointState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base for all AmqpResource implementations to extend. + * + * This abstract class wraps up the basic state management bits so that the concrete + * object don't have to reproduce it. Provides hooks for the subclasses to initialize + * and shutdown. + */ +public abstract class AbstractAmqpResource<R extends JmsResource, E extends Endpoint> implements AmqpResource { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractAmqpResource.class); + + protected AsyncResult openRequest; + protected AsyncResult closeRequest; + + protected E endpoint; + protected R info; + + /** + * Creates a new AbstractAmqpResource instance with the JmsResource provided, and + * sets the Endpoint to null. + * + * @param info + * The JmsResource instance that this AmqpResource is managing. + */ + public AbstractAmqpResource(R info) { + this(info, null); + } + + /** + * Creates a new AbstractAmqpResource instance with the JmsResource provided, and + * sets the Endpoint to the given value. + * + * @param info + * The JmsResource instance that this AmqpResource is managing. + * @param endpoint + * The Proton Endpoint instance that this object maps to. + */ + public AbstractAmqpResource(R info, E endpoint) { + this.info = info; + this.endpoint = endpoint; + } + + @Override + public void open(AsyncResult request) { + this.openRequest = request; + doOpen(); + this.endpoint.setContext(this); + this.endpoint.open(); + } + + @Override + public boolean isOpen() { + return this.endpoint.getRemoteState() == EndpointState.ACTIVE; + } + + @Override + public boolean isAwaitingOpen() { + return this.openRequest != null; + } + + @Override + public void opened() { + if (this.openRequest != null) { + this.openRequest.onSuccess(); + this.openRequest = null; + } + } + + @Override + public void close(AsyncResult request) { + // If already closed signal success or else the caller might never get notified. + if (endpoint.getLocalState() == EndpointState.CLOSED) { + request.onSuccess(); + return; + } + + this.closeRequest = request; + doClose(); + this.endpoint.close(); + } + + @Override + public boolean isClosed() { + return this.endpoint.getLocalState() == EndpointState.CLOSED; + } + + @Override + public boolean isAwaitingClose() { + return this.closeRequest != null; + } + + @Override + public void closed() { + if (this.closeRequest != null) { + this.closeRequest.onSuccess(); + this.closeRequest = null; + } + + this.endpoint.close(); + this.endpoint.free(); + } + + @Override + public void failed() { + failed(new JMSException("Remote request failed.")); + } + + @Override + public void failed(Exception cause) { + if (openRequest != null) { + openRequest.onFailure(cause); + openRequest = null; + } + + if (closeRequest != null) { + closeRequest.onFailure(cause); + closeRequest = null; + } + } + + public E getEndpoint() { + return this.endpoint; + } + + public R getJmsResource() { + return this.info; + } + + public EndpointState getLocalState() { + if (endpoint == null) { + return EndpointState.UNINITIALIZED; + } + return this.endpoint.getLocalState(); + } + + public EndpointState getRemoteState() { + if (endpoint == null) { + return EndpointState.UNINITIALIZED; + } + return this.endpoint.getRemoteState(); + } + + @Override + public Exception getRemoteError() { + String message = getRemoteErrorMessage(); + Exception remoteError = null; + Symbol error = endpoint.getRemoteCondition().getCondition(); + if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) { + remoteError = new JMSSecurityException(message); + } else { + remoteError = new JMSException(message); + } + + return remoteError; + } + + @Override + public String getRemoteErrorMessage() { + String message = "Received unkown error from remote peer"; + if (endpoint.getRemoteCondition() != null) { + ErrorCondition error = endpoint.getRemoteCondition(); + if (error.getDescription() != null && !error.getDescription().isEmpty()) { + message = error.getDescription(); + } + } + + return message; + } + + @Override + public void processStateChange() throws IOException { + EndpointState remoteState = endpoint.getRemoteState(); + + if (remoteState == EndpointState.ACTIVE) { + if (isAwaitingOpen()) { + LOG.debug("{} is now open: ", this); + opened(); + } + + // Should not receive an ACTIVE event if not awaiting the open state. + } else if (remoteState == EndpointState.CLOSED) { + if (isAwaitingClose()) { + LOG.debug("{} is now closed: ", this); + closed(); + } else if (isAwaitingOpen()) { + // Error on Open, create exception and signal failure. + LOG.warn("Open of {} failed: ", this); + Exception remoteError = this.getRemoteError(); + failed(remoteError); + } else { + // TODO - Handle remote asynchronous close. + LOG.warn("{} was closed remotely.", this); + } + } + } + + @Override + public void processDeliveryUpdates() throws IOException { + } + + @Override + public void processFlowUpdates() throws IOException { + } + + protected abstract void doOpen(); + + protected abstract void doClose(); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java new file mode 100644 index 0000000..7da3143 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import java.io.IOException; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; +import org.apache.qpid.jms.meta.JmsProducerId; +import org.apache.qpid.jms.meta.JmsProducerInfo; +import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.util.IdGenerator; +import org.apache.qpid.proton.engine.EndpointState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles the case of anonymous JMS MessageProducers. + * + * In order to simulate the anonymous producer we must create a sender for each message + * send attempt and close it following a successful send. + */ +public class AmqpAnonymousProducer extends AmqpProducer { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousProducer.class); + private static final IdGenerator producerIdGenerator = new IdGenerator(); + + private final String producerIdKey = producerIdGenerator.generateId(); + private long producerIdCount; + + /** + * Creates the Anonymous Producer object. + * + * @param session + * the session that owns this producer + * @param info + * the JmsProducerInfo for this producer. + */ + public AmqpAnonymousProducer(AmqpSession session, JmsProducerInfo info) { + super(session, info); + } + + @Override + public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { + + LOG.trace("Started send chain for anonymous producer: {}", getProducerId()); + + // Create a new ProducerInfo for the short lived producer that's created to perform the + // send to the given AMQP target. + JmsProducerInfo info = new JmsProducerInfo(getNextProducerId()); + info.setDestination(envelope.getDestination()); + + // We open a Fixed Producer instance with the target destination. Once it opens + // it will trigger the open event which will in turn trigger the send event and + // when that succeeds it will trigger a close which completes the send chain. + AmqpFixedProducer producer = new AmqpFixedProducer(session, info); + producer.setPresettle(isPresettle()); + AnonymousOpenRequest open = new AnonymousOpenRequest(request, producer, envelope); + producer.open(open); + + return true; + } + + @Override + public void open(AsyncResult request) { + // Trigger an immediate open, we don't talk to the Broker until + // a send occurs so we must not let the client block. + request.onSuccess(); + } + + @Override + public void close(AsyncResult request) { + // Trigger an immediate close, the internal producers that are currently in a send + // will track their own state and close as the send completes or fails. + request.onSuccess(); + } + + @Override + protected void doOpen() { + } + + @Override + protected void doClose() { + } + + @Override + public boolean isAnonymous() { + return true; + } + + @Override + public EndpointState getLocalState() { + return EndpointState.ACTIVE; + } + + @Override + public EndpointState getRemoteState() { + return EndpointState.ACTIVE; + } + + private JmsProducerId getNextProducerId() { + return new JmsProducerId(producerIdKey, -1, producerIdCount++); + } + + private abstract class AnonymousRequest implements AsyncResult { + + protected final AsyncResult sendResult; + protected final AmqpProducer producer; + protected final JmsOutboundMessageDispatch envelope; + + public AnonymousRequest(AsyncResult sendResult, AmqpProducer producer, JmsOutboundMessageDispatch envelope) { + this.sendResult = sendResult; + this.producer = producer; + this.envelope = envelope; + } + + @Override + public boolean isComplete() { + return sendResult.isComplete(); + } + + /** + * In all cases of the chain of events that make up the send for an anonymous + * producer a failure will trigger the original send request to fail. + */ + @Override + public void onFailure(Throwable result) { + LOG.debug("Send failed during {} step in chain: {}", this.getClass().getName(), getProducerId()); + sendResult.onFailure(result); + } + } + + private final class AnonymousOpenRequest extends AnonymousRequest { + + public AnonymousOpenRequest(AsyncResult sendResult, AmqpProducer producer, JmsOutboundMessageDispatch envelope) { + super(sendResult, producer, envelope); + } + + @Override + public void onSuccess() { + LOG.trace("Open phase of anonymous send complete: {} ", getProducerId()); + AnonymousSendRequest send = new AnonymousSendRequest(this); + try { + producer.send(envelope, send); + } catch (Exception e) { + sendResult.onFailure(e); + } + } + } + + private final class AnonymousSendRequest extends AnonymousRequest { + + public AnonymousSendRequest(AnonymousOpenRequest open) { + super(open.sendResult, open.producer, open.envelope); + } + + @Override + public void onSuccess() { + LOG.trace("Send phase of anonymous send complete: {} ", getProducerId()); + AnonymousCloseRequest close = new AnonymousCloseRequest(this); + producer.close(close); + } + } + + private final class AnonymousCloseRequest extends AnonymousRequest { + + public AnonymousCloseRequest(AnonymousSendRequest send) { + super(send.sendResult, send.producer, send.envelope); + } + + @Override + public void onSuccess() { + LOG.trace("Close phase of anonymous send complete: {} ", getProducerId()); + sendResult.onSuccess(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
