update anonymous producers to attempt using the anonymous relay and then fall back to opening and closing links if unsuccessfull
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ab870c8e Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ab870c8e Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ab870c8e Branch: refs/heads/master Commit: ab870c8edb9f5fc2f77b9cba004ab1441441db0f Parents: b8bb34b Author: Robert Gemmell <[email protected]> Authored: Tue Nov 11 14:40:29 2014 +0000 Committer: Robert Gemmell <[email protected]> Committed: Tue Nov 11 16:29:44 2014 +0000 ---------------------------------------------------------------------- .../amqp/AmqpAnonymousFallbackProducer.java | 252 +++++++++++++++++++ .../provider/amqp/AmqpAnonymousProducer.java | 252 ------------------- .../amqp/AmqpAnonymousProducerWrapper.java | 106 ++++++++ .../provider/amqp/AmqpConnectionProperties.java | 16 +- .../qpid/jms/provider/amqp/AmqpSession.java | 10 +- 5 files changed, 364 insertions(+), 272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ab870c8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java new file mode 100644 index 0000000..f34f5f9 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import java.io.IOException; +import java.util.Map; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; +import org.apache.qpid.jms.meta.JmsProducerId; +import org.apache.qpid.jms.meta.JmsProducerInfo; +import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.WrappedAsyncResult; +import org.apache.qpid.jms.util.IdGenerator; +import org.apache.qpid.jms.util.LRUCache; +import org.apache.qpid.proton.engine.EndpointState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles the case of anonymous JMS MessageProducers. + * + * In order to simulate the anonymous producer we must create a sender for each message + * send attempt and close it following a successful send. + */ +public class AmqpAnonymousFallbackProducer extends AmqpProducer { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousFallbackProducer.class); + private static final IdGenerator producerIdGenerator = new IdGenerator(); + + private final AnonymousProducerCache producerCache = new AnonymousProducerCache(10); + private final String producerIdKey = producerIdGenerator.generateId(); + private long producerIdCount; + + /** + * Creates the Anonymous Producer object. + * + * @param session + * the session that owns this producer + * @param info + * the JmsProducerInfo for this producer. + */ + public AmqpAnonymousFallbackProducer(AmqpSession session, JmsProducerInfo info) { + super(session, info); + + if (connection.isAnonymousProducerCache()) { + producerCache.setMaxCacheSize(connection.getAnonymousProducerCacheSize()); + } + } + + @Override + public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { + LOG.trace("Started send chain for anonymous producer: {}", getProducerId()); + + AmqpProducer producer = null; + if (connection.isAnonymousProducerCache()) { + producer = producerCache.get(envelope.getDestination()); + } + + if (producer == null) { + // Create a new ProducerInfo for the short lived producer that's created to perform the + // send to the given AMQP target. + JmsProducerInfo info = new JmsProducerInfo(getNextProducerId()); + info.setDestination(envelope.getDestination()); + + // We open a Fixed Producer instance with the target destination. Once it opens + // it will trigger the open event which will in turn trigger the send event. + producer = new AmqpFixedProducer(session, info); + producer.setPresettle(isPresettle()); + AnonymousOpenRequest open = new AnonymousOpenRequest(request, producer, envelope); + producer.open(open); + + if (connection.isAnonymousProducerCache()) { + // Cache it in hopes of not needing to create large numbers of producers. + producerCache.put(envelope.getDestination(), producer); + } + + return true; + } else { + return producer.send(envelope, request); + } + } + + @Override + public void open(AsyncResult request) { + // Trigger an immediate open, we don't talk to the Broker until + // a send occurs so we must not let the client block. + request.onSuccess(); + } + + @Override + public void close(AsyncResult request) { + // Trigger an immediate close, the internal producers that are currently in the cache + for (AmqpProducer producer : producerCache.values()) { + producer.close(new CloseRequest(producer)); + } + + request.onSuccess(); + } + + @Override + public boolean isAnonymous() { + return true; + } + + @Override + public EndpointState getLocalState() { + return EndpointState.ACTIVE; + } + + @Override + public EndpointState getRemoteState() { + return EndpointState.ACTIVE; + } + + private JmsProducerId getNextProducerId() { + return new JmsProducerId(producerIdKey, -1, producerIdCount++); + } + + private abstract class AnonymousRequest extends WrappedAsyncResult { + + protected final AmqpProducer producer; + protected final JmsOutboundMessageDispatch envelope; + + public AnonymousRequest(AsyncResult sendResult, AmqpProducer producer, JmsOutboundMessageDispatch envelope) { + super(sendResult); + this.producer = producer; + this.envelope = envelope; + } + + /** + * In all cases of the chain of events that make up the send for an anonymous + * producer a failure will trigger the original send request to fail. + */ + @Override + public void onFailure(Throwable result) { + LOG.debug("Send failed during {} step in chain: {}", this.getClass().getName(), getProducerId()); + super.onFailure(result); + } + } + + private final class AnonymousOpenRequest extends AnonymousRequest { + + public AnonymousOpenRequest(AsyncResult sendResult, AmqpProducer producer, JmsOutboundMessageDispatch envelope) { + super(sendResult, producer, envelope); + } + + @Override + public void onSuccess() { + LOG.trace("Open phase of anonymous send complete: {} ", getProducerId()); + AnonymousSendRequest send = new AnonymousSendRequest(this); + try { + producer.send(envelope, send); + } catch (Exception e) { + super.onFailure(e); + } + } + } + + private final class AnonymousSendRequest extends AnonymousRequest { + + public AnonymousSendRequest(AnonymousOpenRequest open) { + super(open.getWrappedRequest(), open.producer, open.envelope); + } + + @Override + public void onFailure(Throwable result) { + // Ensure that cache get purged of any failed producers. + AmqpAnonymousFallbackProducer.this.producerCache.remove(producer.getJmsResource().getDestination()); + super.onFailure(result); + } + + @Override + public void onSuccess() { + LOG.trace("Send phase of anonymous send complete: {} ", getProducerId()); + if (!connection.isAnonymousProducerCache()) { + AnonymousCloseRequest close = new AnonymousCloseRequest(this); + producer.close(close); + } else { + super.onSuccess(); + } + } + } + + private final class AnonymousCloseRequest extends AnonymousRequest { + + public AnonymousCloseRequest(AnonymousSendRequest send) { + super(send.getWrappedRequest(), send.producer, send.envelope); + } + + @Override + public void onSuccess() { + LOG.trace("Close phase of anonymous send complete: {} ", getProducerId()); + super.onSuccess(); + } + } + + private final class CloseRequest implements AsyncResult { + + private final AmqpProducer producer; + + public CloseRequest(AmqpProducer producer) { + this.producer = producer; + } + + @Override + public void onFailure(Throwable result) { + AmqpAnonymousFallbackProducer.this.connection.getProvider().fireProviderException(result); + } + + @Override + public void onSuccess() { + LOG.trace("Close of anonymous producer {} complete", producer); + } + + @Override + public boolean isComplete() { + return producer.isClosed(); + } + } + + private final class AnonymousProducerCache extends LRUCache<JmsDestination, AmqpProducer> { + + private static final long serialVersionUID = 1L; + + public AnonymousProducerCache(int cacheSize) { + super(cacheSize); + } + + @Override + protected void onCacheEviction(Map.Entry<JmsDestination, AmqpProducer> cached) { + LOG.trace("Producer: {} evicted from producer cache", cached.getValue()); + cached.getValue().close(new CloseRequest(cached.getValue())); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ab870c8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java deleted file mode 100644 index 69bfdf7..0000000 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java +++ /dev/null @@ -1,252 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.provider.amqp; - -import java.io.IOException; -import java.util.Map; - -import javax.jms.JMSException; - -import org.apache.qpid.jms.JmsDestination; -import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; -import org.apache.qpid.jms.meta.JmsProducerId; -import org.apache.qpid.jms.meta.JmsProducerInfo; -import org.apache.qpid.jms.provider.AsyncResult; -import org.apache.qpid.jms.provider.WrappedAsyncResult; -import org.apache.qpid.jms.util.IdGenerator; -import org.apache.qpid.jms.util.LRUCache; -import org.apache.qpid.proton.engine.EndpointState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Handles the case of anonymous JMS MessageProducers. - * - * In order to simulate the anonymous producer we must create a sender for each message - * send attempt and close it following a successful send. - */ -public class AmqpAnonymousProducer extends AmqpProducer { - - private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousProducer.class); - private static final IdGenerator producerIdGenerator = new IdGenerator(); - - private final AnonymousProducerCache producerCache = new AnonymousProducerCache(10); - private final String producerIdKey = producerIdGenerator.generateId(); - private long producerIdCount; - - /** - * Creates the Anonymous Producer object. - * - * @param session - * the session that owns this producer - * @param info - * the JmsProducerInfo for this producer. - */ - public AmqpAnonymousProducer(AmqpSession session, JmsProducerInfo info) { - super(session, info); - - if (connection.isAnonymousProducerCache()) { - producerCache.setMaxCacheSize(connection.getAnonymousProducerCacheSize()); - } - } - - @Override - public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { - LOG.trace("Started send chain for anonymous producer: {}", getProducerId()); - - AmqpProducer producer = null; - if (connection.isAnonymousProducerCache()) { - producer = producerCache.get(envelope.getDestination()); - } - - if (producer == null) { - // Create a new ProducerInfo for the short lived producer that's created to perform the - // send to the given AMQP target. - JmsProducerInfo info = new JmsProducerInfo(getNextProducerId()); - info.setDestination(envelope.getDestination()); - - // We open a Fixed Producer instance with the target destination. Once it opens - // it will trigger the open event which will in turn trigger the send event. - producer = new AmqpFixedProducer(session, info); - producer.setPresettle(isPresettle()); - AnonymousOpenRequest open = new AnonymousOpenRequest(request, producer, envelope); - producer.open(open); - - if (connection.isAnonymousProducerCache()) { - // Cache it in hopes of not needing to create large numbers of producers. - producerCache.put(envelope.getDestination(), producer); - } - - return true; - } else { - return producer.send(envelope, request); - } - } - - @Override - public void open(AsyncResult request) { - // Trigger an immediate open, we don't talk to the Broker until - // a send occurs so we must not let the client block. - request.onSuccess(); - } - - @Override - public void close(AsyncResult request) { - // Trigger an immediate close, the internal producers that are currently in the cache - for (AmqpProducer producer : producerCache.values()) { - producer.close(new CloseRequest(producer)); - } - - request.onSuccess(); - } - - @Override - public boolean isAnonymous() { - return true; - } - - @Override - public EndpointState getLocalState() { - return EndpointState.ACTIVE; - } - - @Override - public EndpointState getRemoteState() { - return EndpointState.ACTIVE; - } - - private JmsProducerId getNextProducerId() { - return new JmsProducerId(producerIdKey, -1, producerIdCount++); - } - - private abstract class AnonymousRequest extends WrappedAsyncResult { - - protected final AmqpProducer producer; - protected final JmsOutboundMessageDispatch envelope; - - public AnonymousRequest(AsyncResult sendResult, AmqpProducer producer, JmsOutboundMessageDispatch envelope) { - super(sendResult); - this.producer = producer; - this.envelope = envelope; - } - - /** - * In all cases of the chain of events that make up the send for an anonymous - * producer a failure will trigger the original send request to fail. - */ - @Override - public void onFailure(Throwable result) { - LOG.debug("Send failed during {} step in chain: {}", this.getClass().getName(), getProducerId()); - super.onFailure(result); - } - } - - private final class AnonymousOpenRequest extends AnonymousRequest { - - public AnonymousOpenRequest(AsyncResult sendResult, AmqpProducer producer, JmsOutboundMessageDispatch envelope) { - super(sendResult, producer, envelope); - } - - @Override - public void onSuccess() { - LOG.trace("Open phase of anonymous send complete: {} ", getProducerId()); - AnonymousSendRequest send = new AnonymousSendRequest(this); - try { - producer.send(envelope, send); - } catch (Exception e) { - super.onFailure(e); - } - } - } - - private final class AnonymousSendRequest extends AnonymousRequest { - - public AnonymousSendRequest(AnonymousOpenRequest open) { - super(open.getWrappedRequest(), open.producer, open.envelope); - } - - @Override - public void onFailure(Throwable result) { - // Ensure that cache get purged of any failed producers. - AmqpAnonymousProducer.this.producerCache.remove(producer.getJmsResource().getDestination()); - super.onFailure(result); - } - - @Override - public void onSuccess() { - LOG.trace("Send phase of anonymous send complete: {} ", getProducerId()); - if (!connection.isAnonymousProducerCache()) { - AnonymousCloseRequest close = new AnonymousCloseRequest(this); - producer.close(close); - } else { - super.onSuccess(); - } - } - } - - private final class AnonymousCloseRequest extends AnonymousRequest { - - public AnonymousCloseRequest(AnonymousSendRequest send) { - super(send.getWrappedRequest(), send.producer, send.envelope); - } - - @Override - public void onSuccess() { - LOG.trace("Close phase of anonymous send complete: {} ", getProducerId()); - super.onSuccess(); - } - } - - private final class CloseRequest implements AsyncResult { - - private final AmqpProducer producer; - - public CloseRequest(AmqpProducer producer) { - this.producer = producer; - } - - @Override - public void onFailure(Throwable result) { - AmqpAnonymousProducer.this.connection.getProvider().fireProviderException(result); - } - - @Override - public void onSuccess() { - LOG.trace("Close of anonymous producer {} complete", producer); - } - - @Override - public boolean isComplete() { - return producer.isClosed(); - } - } - - private final class AnonymousProducerCache extends LRUCache<JmsDestination, AmqpProducer> { - - private static final long serialVersionUID = 1L; - - public AnonymousProducerCache(int cacheSize) { - super(cacheSize); - } - - @Override - protected void onCacheEviction(Map.Entry<JmsDestination, AmqpProducer> cached) { - LOG.trace("Producer: {} evicted from producer cache", cached.getValue()); - cached.getValue().close(new CloseRequest(cached.getValue())); - } - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ab870c8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java new file mode 100644 index 0000000..3c61f2f --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp; + +import java.io.IOException; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; +import org.apache.qpid.jms.meta.JmsProducerInfo; +import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.WrappedAsyncResult; +import org.apache.qpid.proton.engine.EndpointState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles the case of anonymous JMS MessageProducers. + * + * In order to simulate the anonymous producer we must create a sender for each message + * send attempt and close it following a successful send. + */ +public class AmqpAnonymousProducerWrapper extends AmqpProducer { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousProducerWrapper.class); + AmqpProducer delegate; + + /** + * Creates the Anonymous Producer object. + * + * @param session + * the session that owns this producer + * @param info + * the JmsProducerInfo for this producer. + */ + public AmqpAnonymousProducerWrapper(AmqpSession session, JmsProducerInfo info) { + super(session, info); + + delegate = new AmqpFixedProducer(session, info); + } + + @Override + public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { + LOG.trace("Delegating anonymous send to underlying producer: {}", getProducerId()); + + return delegate.send(envelope, request); + } + + @Override + public void open(AsyncResult request) { + AnonymousRelayRequest anonRelayRequest = new AnonymousRelayRequest(request); + delegate.open(anonRelayRequest); + } + + @Override + public void close(AsyncResult request) { + delegate.close(request); + } + + @Override + public boolean isAnonymous() { + return true; + } + + @Override + public EndpointState getLocalState() { + return delegate.getLocalState(); + } + + @Override + public EndpointState getRemoteState() { + return delegate.getRemoteState(); + } + + private class AnonymousRelayRequest extends WrappedAsyncResult { + + public AnonymousRelayRequest(AsyncResult openResult) { + super(openResult); + } + + /** + * If creation of the producer to the anonymous-relay failed, we try to + * enter fallback mode rather than immediately failing. + */ + @Override + public void onFailure(Throwable result) { + LOG.debug("Attempt to open producer to anonymous relay failed, entering fallback mode"); + delegate = new AmqpAnonymousFallbackProducer(session, getJmsResource()); + delegate.open(getWrappedRequest()); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ab870c8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java index 9708bda..7cbe58b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java @@ -30,10 +30,6 @@ public class AmqpConnectionProperties { public static final Symbol JMS_MAPPING_VERSION_KEY = Symbol.valueOf("x-opt-jms-mapping-version"); public static final short JMS_MAPPING_VERSION_VALUE = 0; - private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("x-opt-anonymous-relay"); - - private String anonymousRelayName; - /** * Creates a new instance of this class from the given remote capabilities and properties. * @@ -52,21 +48,11 @@ public class AmqpConnectionProperties { } } - public boolean isAnonymousRelaySupported() { - return anonymousRelayName != null; - } - - public String getAnonymousRelayName() { - return anonymousRelayName; - } - protected void processCapabilities(Symbol[] capabilities) { // TODO - Inspect capabilities for configuration options } protected void processProperties(Map<Symbol, Object> properties) { - if (properties.containsKey(ANONYMOUS_RELAY)) { - anonymousRelayName = (String) properties.get(ANONYMOUS_RELAY); - } + // TODO - Inspect properties for configuration options } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ab870c8e/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index 2882db8..d707f35 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -102,13 +102,13 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> { public AmqpProducer createProducer(JmsProducerInfo producerInfo) { AmqpProducer producer = null; - // if (producerInfo.getDestination() != null || connection.getProperties().isAnonymousRelaySupported()) { + if (producerInfo.getDestination() != null) { LOG.debug("Creating AmqpFixedProducer for: {}", producerInfo.getDestination()); producer = new AmqpFixedProducer(this, producerInfo); -// } else { -// LOG.debug("Creating an AmqpAnonymousProducer Producer: "); -// producer = new AmqpAnonymousProducer(this, producerInfo); -// } + } else { + LOG.debug("Creating an AmqpAnonymousProducerWrapper"); + producer = new AmqpAnonymousProducerWrapper(this, producerInfo); + } producer.setPresettle(connection.isPresettleProducers()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
