Author: tabish
Date: Tue May 28 15:04:25 2013
New Revision: 1486951
URL: http://svn.apache.org/r1486951
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4560
ensure dispatched Messages are set to Read Only mode before passing onto the
transformer.
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1486951&r1=1486950&r2=1486951&view=diff
==============================================================================
---
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
(original)
+++
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Tue May 28 15:04:25 2013
@@ -16,41 +16,91 @@
*/
package org.apache.activemq.transport.amqp;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.InvalidSelectorException;
+
import org.apache.activemq.broker.BrokerContext;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
-import org.apache.qpid.proton.amqp.*;
-import org.apache.qpid.proton.amqp.messaging.*;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.transaction.*;
-import org.apache.qpid.proton.amqp.transport.*;
-import org.apache.qpid.proton.engine.*;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
+import org.apache.qpid.proton.amqp.transaction.TransactionalState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointError;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.apache.qpid.proton.engine.impl.LinkImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
import org.apache.qpid.proton.framing.TransportFrame;
-import org.apache.qpid.proton.jms.*;
+import org.apache.qpid.proton.jms.AMQPNativeInboundTransformer;
+import org.apache.qpid.proton.jms.AMQPRawInboundTransformer;
+import org.apache.qpid.proton.jms.AutoOutboundTransformer;
+import org.apache.qpid.proton.jms.EncodedMessage;
+import org.apache.qpid.proton.jms.InboundTransformer;
+import org.apache.qpid.proton.jms.JMSMappingInboundTransformer;
+import org.apache.qpid.proton.jms.OutboundTransformer;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.ByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.InvalidSelectorException;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
-
class AmqpProtocolConverter {
public static final EnumSet<EndpointState> UNINITIALIZED_SET =
EnumSet.of(EndpointState.UNINITIALIZED);
@@ -312,7 +362,7 @@ class AmqpProtocolConverter {
private static final IdGenerator CONNECTION_ID_GENERATOR = new
IdGenerator();
private final ConnectionId connectionId = new
ConnectionId(CONNECTION_ID_GENERATOR.generateId());
- private ConnectionInfo connectionInfo = new ConnectionInfo();
+ private final ConnectionInfo connectionInfo = new ConnectionInfo();
private long nextSessionId = 0;
private long nextTempDestinationId = 0;
@@ -336,6 +386,7 @@ class AmqpProtocolConverter {
connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
sendToActiveMQ(connectionInfo, new ResponseHandler() {
+ @Override
public void onResponse(AmqpProtocolConverter converter, Response
response) throws IOException {
protonConnection.open();
pumpProtonToSocket();
@@ -609,6 +660,7 @@ class AmqpProtocolConverter {
ProducerInfo producerInfo = new ProducerInfo(producerId);
producerInfo.setDestination(dest);
sendToActiveMQ(producerInfo, new ResponseHandler() {
+ @Override
public void onResponse(AmqpProtocolConverter converter,
Response response) throws IOException {
if (response.isException()) {
receiver.setTarget(null);
@@ -658,7 +710,7 @@ class AmqpProtocolConverter {
class ConsumerContext extends AmqpDeliveryListener {
private final ConsumerId consumerId;
private final Sender sender;
- private boolean presettle;
+ private final boolean presettle;
private boolean closed;
public ConsumerContext(ConsumerId consumerId, Sender sender) {
@@ -748,6 +800,7 @@ class AmqpProtocolConverter {
sender.drained();
} else {
jms.setRedeliveryCounter(md.getRedeliveryCounter());
+ jms.setReadOnlyBody(true);
final EncodedMessage amqp =
outboundTransformer.transform(jms);
if( amqp!=null && amqp.getLength() > 0 ) {
@@ -906,6 +959,7 @@ class AmqpProtocolConverter {
consumerContext.closed=true;
sendToActiveMQ(rsi, new ResponseHandler() {
+ @Override
public void onResponse(AmqpProtocolConverter converter,
Response response) throws IOException {
if (response.isException()) {
sender.setSource(null);
@@ -957,6 +1011,7 @@ class AmqpProtocolConverter {
}
sendToActiveMQ(consumerInfo, new ResponseHandler() {
+ @Override
public void onResponse(AmqpProtocolConverter converter,
Response response) throws IOException {
if (response.isException()) {
sender.setSource(null);