update anonymous producers to attempt using the anonymous relay and then fall 
back to opening and closing links if unsuccessfull


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ab870c8e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ab870c8e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ab870c8e

Branch: refs/heads/master
Commit: ab870c8edb9f5fc2f77b9cba004ab1441441db0f
Parents: b8bb34b
Author: Robert Gemmell <[email protected]>
Authored: Tue Nov 11 14:40:29 2014 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Tue Nov 11 16:29:44 2014 +0000

----------------------------------------------------------------------
 .../amqp/AmqpAnonymousFallbackProducer.java     | 252 +++++++++++++++++++
 .../provider/amqp/AmqpAnonymousProducer.java    | 252 -------------------
 .../amqp/AmqpAnonymousProducerWrapper.java      | 106 ++++++++
 .../provider/amqp/AmqpConnectionProperties.java |  16 +-
 .../qpid/jms/provider/amqp/AmqpSession.java     |  10 +-
 5 files changed, 364 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ab870c8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
new file mode 100644
index 0000000..f34f5f9
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.WrappedAsyncResult;
+import org.apache.qpid.jms.util.IdGenerator;
+import org.apache.qpid.jms.util.LRUCache;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the case of anonymous JMS MessageProducers.
+ *
+ * In order to simulate the anonymous producer we must create a sender for 
each message
+ * send attempt and close it following a successful send.
+ */
+public class AmqpAnonymousFallbackProducer extends AmqpProducer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AmqpAnonymousFallbackProducer.class);
+    private static final IdGenerator producerIdGenerator = new IdGenerator();
+
+    private final AnonymousProducerCache producerCache = new 
AnonymousProducerCache(10);
+    private final String producerIdKey = producerIdGenerator.generateId();
+    private long producerIdCount;
+
+    /**
+     * Creates the Anonymous Producer object.
+     *
+     * @param session
+     *        the session that owns this producer
+     * @param info
+     *        the JmsProducerInfo for this producer.
+     */
+    public AmqpAnonymousFallbackProducer(AmqpSession session, JmsProducerInfo 
info) {
+        super(session, info);
+
+        if (connection.isAnonymousProducerCache()) {
+            
producerCache.setMaxCacheSize(connection.getAnonymousProducerCacheSize());
+        }
+    }
+
+    @Override
+    public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult 
request) throws IOException, JMSException {
+        LOG.trace("Started send chain for anonymous producer: {}", 
getProducerId());
+
+        AmqpProducer producer = null;
+        if (connection.isAnonymousProducerCache()) {
+            producer = producerCache.get(envelope.getDestination());
+        }
+
+        if (producer == null) {
+            // Create a new ProducerInfo for the short lived producer that's 
created to perform the
+            // send to the given AMQP target.
+            JmsProducerInfo info = new JmsProducerInfo(getNextProducerId());
+            info.setDestination(envelope.getDestination());
+
+            // We open a Fixed Producer instance with the target destination.  
Once it opens
+            // it will trigger the open event which will in turn trigger the 
send event.
+            producer = new AmqpFixedProducer(session, info);
+            producer.setPresettle(isPresettle());
+            AnonymousOpenRequest open = new AnonymousOpenRequest(request, 
producer, envelope);
+            producer.open(open);
+
+            if (connection.isAnonymousProducerCache()) {
+                // Cache it in hopes of not needing to create large numbers of 
producers.
+                producerCache.put(envelope.getDestination(), producer);
+            }
+
+            return true;
+        } else {
+            return producer.send(envelope, request);
+        }
+    }
+
+    @Override
+    public void open(AsyncResult request) {
+        // Trigger an immediate open, we don't talk to the Broker until
+        // a send occurs so we must not let the client block.
+        request.onSuccess();
+    }
+
+    @Override
+    public void close(AsyncResult request) {
+        // Trigger an immediate close, the internal producers that are 
currently in the cache
+        for (AmqpProducer producer : producerCache.values()) {
+            producer.close(new CloseRequest(producer));
+        }
+
+        request.onSuccess();
+    }
+
+    @Override
+    public boolean isAnonymous() {
+        return true;
+    }
+
+    @Override
+    public EndpointState getLocalState() {
+        return EndpointState.ACTIVE;
+    }
+
+    @Override
+    public EndpointState getRemoteState() {
+        return EndpointState.ACTIVE;
+    }
+
+    private JmsProducerId getNextProducerId() {
+        return new JmsProducerId(producerIdKey, -1, producerIdCount++);
+    }
+
+    private abstract class AnonymousRequest extends WrappedAsyncResult {
+
+        protected final AmqpProducer producer;
+        protected final JmsOutboundMessageDispatch envelope;
+
+        public AnonymousRequest(AsyncResult sendResult, AmqpProducer producer, 
JmsOutboundMessageDispatch envelope) {
+            super(sendResult);
+            this.producer = producer;
+            this.envelope = envelope;
+        }
+
+        /**
+         * In all cases of the chain of events that make up the send for an 
anonymous
+         * producer a failure will trigger the original send request to fail.
+         */
+        @Override
+        public void onFailure(Throwable result) {
+            LOG.debug("Send failed during {} step in chain: {}", 
this.getClass().getName(), getProducerId());
+            super.onFailure(result);
+        }
+    }
+
+    private final class AnonymousOpenRequest extends AnonymousRequest {
+
+        public AnonymousOpenRequest(AsyncResult sendResult, AmqpProducer 
producer, JmsOutboundMessageDispatch envelope) {
+            super(sendResult, producer, envelope);
+        }
+
+        @Override
+        public void onSuccess() {
+            LOG.trace("Open phase of anonymous send complete: {} ", 
getProducerId());
+            AnonymousSendRequest send = new AnonymousSendRequest(this);
+            try {
+                producer.send(envelope, send);
+            } catch (Exception e) {
+                super.onFailure(e);
+            }
+        }
+    }
+
+    private final class AnonymousSendRequest extends AnonymousRequest {
+
+        public AnonymousSendRequest(AnonymousOpenRequest open) {
+            super(open.getWrappedRequest(), open.producer, open.envelope);
+        }
+
+        @Override
+        public void onFailure(Throwable result) {
+            // Ensure that cache get purged of any failed producers.
+            
AmqpAnonymousFallbackProducer.this.producerCache.remove(producer.getJmsResource().getDestination());
+            super.onFailure(result);
+        }
+
+        @Override
+        public void onSuccess() {
+            LOG.trace("Send phase of anonymous send complete: {} ", 
getProducerId());
+            if (!connection.isAnonymousProducerCache()) {
+                AnonymousCloseRequest close = new AnonymousCloseRequest(this);
+                producer.close(close);
+            } else {
+                super.onSuccess();
+            }
+        }
+    }
+
+    private final class AnonymousCloseRequest extends AnonymousRequest {
+
+        public AnonymousCloseRequest(AnonymousSendRequest send) {
+            super(send.getWrappedRequest(), send.producer, send.envelope);
+        }
+
+        @Override
+        public void onSuccess() {
+            LOG.trace("Close phase of anonymous send complete: {} ", 
getProducerId());
+            super.onSuccess();
+        }
+    }
+
+    private final class CloseRequest implements AsyncResult {
+
+        private final AmqpProducer producer;
+
+        public CloseRequest(AmqpProducer producer) {
+            this.producer = producer;
+        }
+
+        @Override
+        public void onFailure(Throwable result) {
+            
AmqpAnonymousFallbackProducer.this.connection.getProvider().fireProviderException(result);
+        }
+
+        @Override
+        public void onSuccess() {
+            LOG.trace("Close of anonymous producer {} complete", producer);
+        }
+
+        @Override
+        public boolean isComplete() {
+            return producer.isClosed();
+        }
+    }
+
+    private final class AnonymousProducerCache extends 
LRUCache<JmsDestination, AmqpProducer> {
+
+        private static final long serialVersionUID = 1L;
+
+        public AnonymousProducerCache(int cacheSize) {
+            super(cacheSize);
+        }
+
+        @Override
+        protected void onCacheEviction(Map.Entry<JmsDestination, AmqpProducer> 
cached) {
+            LOG.trace("Producer: {} evicted from producer cache", 
cached.getValue());
+            cached.getValue().close(new CloseRequest(cached.getValue()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ab870c8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
deleted file mode 100644
index 69bfdf7..0000000
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.amqp;
-
-import java.io.IOException;
-import java.util.Map;
-
-import javax.jms.JMSException;
-
-import org.apache.qpid.jms.JmsDestination;
-import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
-import org.apache.qpid.jms.meta.JmsProducerId;
-import org.apache.qpid.jms.meta.JmsProducerInfo;
-import org.apache.qpid.jms.provider.AsyncResult;
-import org.apache.qpid.jms.provider.WrappedAsyncResult;
-import org.apache.qpid.jms.util.IdGenerator;
-import org.apache.qpid.jms.util.LRUCache;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles the case of anonymous JMS MessageProducers.
- *
- * In order to simulate the anonymous producer we must create a sender for 
each message
- * send attempt and close it following a successful send.
- */
-public class AmqpAnonymousProducer extends AmqpProducer {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(AmqpAnonymousProducer.class);
-    private static final IdGenerator producerIdGenerator = new IdGenerator();
-
-    private final AnonymousProducerCache producerCache = new 
AnonymousProducerCache(10);
-    private final String producerIdKey = producerIdGenerator.generateId();
-    private long producerIdCount;
-
-    /**
-     * Creates the Anonymous Producer object.
-     *
-     * @param session
-     *        the session that owns this producer
-     * @param info
-     *        the JmsProducerInfo for this producer.
-     */
-    public AmqpAnonymousProducer(AmqpSession session, JmsProducerInfo info) {
-        super(session, info);
-
-        if (connection.isAnonymousProducerCache()) {
-            
producerCache.setMaxCacheSize(connection.getAnonymousProducerCacheSize());
-        }
-    }
-
-    @Override
-    public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult 
request) throws IOException, JMSException {
-        LOG.trace("Started send chain for anonymous producer: {}", 
getProducerId());
-
-        AmqpProducer producer = null;
-        if (connection.isAnonymousProducerCache()) {
-            producer = producerCache.get(envelope.getDestination());
-        }
-
-        if (producer == null) {
-            // Create a new ProducerInfo for the short lived producer that's 
created to perform the
-            // send to the given AMQP target.
-            JmsProducerInfo info = new JmsProducerInfo(getNextProducerId());
-            info.setDestination(envelope.getDestination());
-
-            // We open a Fixed Producer instance with the target destination.  
Once it opens
-            // it will trigger the open event which will in turn trigger the 
send event.
-            producer = new AmqpFixedProducer(session, info);
-            producer.setPresettle(isPresettle());
-            AnonymousOpenRequest open = new AnonymousOpenRequest(request, 
producer, envelope);
-            producer.open(open);
-
-            if (connection.isAnonymousProducerCache()) {
-                // Cache it in hopes of not needing to create large numbers of 
producers.
-                producerCache.put(envelope.getDestination(), producer);
-            }
-
-            return true;
-        } else {
-            return producer.send(envelope, request);
-        }
-    }
-
-    @Override
-    public void open(AsyncResult request) {
-        // Trigger an immediate open, we don't talk to the Broker until
-        // a send occurs so we must not let the client block.
-        request.onSuccess();
-    }
-
-    @Override
-    public void close(AsyncResult request) {
-        // Trigger an immediate close, the internal producers that are 
currently in the cache
-        for (AmqpProducer producer : producerCache.values()) {
-            producer.close(new CloseRequest(producer));
-        }
-
-        request.onSuccess();
-    }
-
-    @Override
-    public boolean isAnonymous() {
-        return true;
-    }
-
-    @Override
-    public EndpointState getLocalState() {
-        return EndpointState.ACTIVE;
-    }
-
-    @Override
-    public EndpointState getRemoteState() {
-        return EndpointState.ACTIVE;
-    }
-
-    private JmsProducerId getNextProducerId() {
-        return new JmsProducerId(producerIdKey, -1, producerIdCount++);
-    }
-
-    private abstract class AnonymousRequest extends WrappedAsyncResult {
-
-        protected final AmqpProducer producer;
-        protected final JmsOutboundMessageDispatch envelope;
-
-        public AnonymousRequest(AsyncResult sendResult, AmqpProducer producer, 
JmsOutboundMessageDispatch envelope) {
-            super(sendResult);
-            this.producer = producer;
-            this.envelope = envelope;
-        }
-
-        /**
-         * In all cases of the chain of events that make up the send for an 
anonymous
-         * producer a failure will trigger the original send request to fail.
-         */
-        @Override
-        public void onFailure(Throwable result) {
-            LOG.debug("Send failed during {} step in chain: {}", 
this.getClass().getName(), getProducerId());
-            super.onFailure(result);
-        }
-    }
-
-    private final class AnonymousOpenRequest extends AnonymousRequest {
-
-        public AnonymousOpenRequest(AsyncResult sendResult, AmqpProducer 
producer, JmsOutboundMessageDispatch envelope) {
-            super(sendResult, producer, envelope);
-        }
-
-        @Override
-        public void onSuccess() {
-            LOG.trace("Open phase of anonymous send complete: {} ", 
getProducerId());
-            AnonymousSendRequest send = new AnonymousSendRequest(this);
-            try {
-                producer.send(envelope, send);
-            } catch (Exception e) {
-                super.onFailure(e);
-            }
-        }
-    }
-
-    private final class AnonymousSendRequest extends AnonymousRequest {
-
-        public AnonymousSendRequest(AnonymousOpenRequest open) {
-            super(open.getWrappedRequest(), open.producer, open.envelope);
-        }
-
-        @Override
-        public void onFailure(Throwable result) {
-            // Ensure that cache get purged of any failed producers.
-            
AmqpAnonymousProducer.this.producerCache.remove(producer.getJmsResource().getDestination());
-            super.onFailure(result);
-        }
-
-        @Override
-        public void onSuccess() {
-            LOG.trace("Send phase of anonymous send complete: {} ", 
getProducerId());
-            if (!connection.isAnonymousProducerCache()) {
-                AnonymousCloseRequest close = new AnonymousCloseRequest(this);
-                producer.close(close);
-            } else {
-                super.onSuccess();
-            }
-        }
-    }
-
-    private final class AnonymousCloseRequest extends AnonymousRequest {
-
-        public AnonymousCloseRequest(AnonymousSendRequest send) {
-            super(send.getWrappedRequest(), send.producer, send.envelope);
-        }
-
-        @Override
-        public void onSuccess() {
-            LOG.trace("Close phase of anonymous send complete: {} ", 
getProducerId());
-            super.onSuccess();
-        }
-    }
-
-    private final class CloseRequest implements AsyncResult {
-
-        private final AmqpProducer producer;
-
-        public CloseRequest(AmqpProducer producer) {
-            this.producer = producer;
-        }
-
-        @Override
-        public void onFailure(Throwable result) {
-            
AmqpAnonymousProducer.this.connection.getProvider().fireProviderException(result);
-        }
-
-        @Override
-        public void onSuccess() {
-            LOG.trace("Close of anonymous producer {} complete", producer);
-        }
-
-        @Override
-        public boolean isComplete() {
-            return producer.isClosed();
-        }
-    }
-
-    private final class AnonymousProducerCache extends 
LRUCache<JmsDestination, AmqpProducer> {
-
-        private static final long serialVersionUID = 1L;
-
-        public AnonymousProducerCache(int cacheSize) {
-            super(cacheSize);
-        }
-
-        @Override
-        protected void onCacheEviction(Map.Entry<JmsDestination, AmqpProducer> 
cached) {
-            LOG.trace("Producer: {} evicted from producer cache", 
cached.getValue());
-            cached.getValue().close(new CloseRequest(cached.getValue()));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ab870c8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java
new file mode 100644
index 0000000..3c61f2f
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.WrappedAsyncResult;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the case of anonymous JMS MessageProducers.
+ *
+ * In order to simulate the anonymous producer we must create a sender for 
each message
+ * send attempt and close it following a successful send.
+ */
+public class AmqpAnonymousProducerWrapper extends AmqpProducer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AmqpAnonymousProducerWrapper.class);
+    AmqpProducer delegate;
+
+    /**
+     * Creates the Anonymous Producer object.
+     *
+     * @param session
+     *        the session that owns this producer
+     * @param info
+     *        the JmsProducerInfo for this producer.
+     */
+    public AmqpAnonymousProducerWrapper(AmqpSession session, JmsProducerInfo 
info) {
+        super(session, info);
+
+        delegate = new AmqpFixedProducer(session, info);
+    }
+
+    @Override
+    public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult 
request) throws IOException, JMSException {
+        LOG.trace("Delegating anonymous send to underlying producer: {}", 
getProducerId());
+
+       return delegate.send(envelope, request);
+    }
+
+    @Override
+    public void open(AsyncResult request) {
+        AnonymousRelayRequest anonRelayRequest = new 
AnonymousRelayRequest(request);
+        delegate.open(anonRelayRequest);
+    }
+
+    @Override
+    public void close(AsyncResult request) {
+        delegate.close(request);
+    }
+
+    @Override
+    public boolean isAnonymous() {
+        return true;
+    }
+
+    @Override
+    public EndpointState getLocalState() {
+        return delegate.getLocalState();
+    }
+
+    @Override
+    public EndpointState getRemoteState() {
+        return delegate.getRemoteState();
+    }
+
+    private class AnonymousRelayRequest extends WrappedAsyncResult {
+
+        public AnonymousRelayRequest(AsyncResult openResult) {
+            super(openResult);
+        }
+
+        /**
+         * If creation of the producer to the anonymous-relay failed, we try to
+         * enter fallback mode rather than immediately failing.
+         */
+        @Override
+        public void onFailure(Throwable result) {
+            LOG.debug("Attempt to open producer to anonymous relay failed, 
entering fallback mode");
+            delegate = new AmqpAnonymousFallbackProducer(session, 
getJmsResource());
+            delegate.open(getWrappedRequest());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ab870c8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
index 9708bda..7cbe58b 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
@@ -30,10 +30,6 @@ public class AmqpConnectionProperties {
     public static final Symbol JMS_MAPPING_VERSION_KEY = 
Symbol.valueOf("x-opt-jms-mapping-version");
     public static final short JMS_MAPPING_VERSION_VALUE = 0;
 
-    private static final Symbol ANONYMOUS_RELAY = 
Symbol.valueOf("x-opt-anonymous-relay");
-
-    private String anonymousRelayName;
-
     /**
      * Creates a new instance of this class from the given remote capabilities 
and properties.
      *
@@ -52,21 +48,11 @@ public class AmqpConnectionProperties {
         }
     }
 
-    public boolean isAnonymousRelaySupported() {
-        return anonymousRelayName != null;
-    }
-
-    public String getAnonymousRelayName() {
-        return anonymousRelayName;
-    }
-
     protected void processCapabilities(Symbol[] capabilities) {
         // TODO - Inspect capabilities for configuration options
     }
 
     protected void processProperties(Map<Symbol, Object> properties) {
-        if (properties.containsKey(ANONYMOUS_RELAY)) {
-            anonymousRelayName = (String) properties.get(ANONYMOUS_RELAY);
-        }
+        // TODO - Inspect properties for configuration options
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ab870c8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 2882db8..d707f35 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -102,13 +102,13 @@ public class AmqpSession extends 
AmqpAbstractResource<JmsSessionInfo, Session> {
     public AmqpProducer createProducer(JmsProducerInfo producerInfo) {
         AmqpProducer producer = null;
 
-      //  if (producerInfo.getDestination() != null || 
connection.getProperties().isAnonymousRelaySupported()) {
+        if (producerInfo.getDestination() != null) {
             LOG.debug("Creating AmqpFixedProducer for: {}", 
producerInfo.getDestination());
             producer = new AmqpFixedProducer(this, producerInfo);
-//        } else {
-//            LOG.debug("Creating an AmqpAnonymousProducer Producer: ");
-//            producer = new AmqpAnonymousProducer(this, producerInfo);
-//        }
+        } else {
+            LOG.debug("Creating an AmqpAnonymousProducerWrapper");
+            producer = new AmqpAnonymousProducerWrapper(this, producerInfo);
+        }
 
         producer.setPresettle(connection.isPresettleProducers());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to