Author: rgodfrey
Date: Sun Jun  7 21:02:02 2015
New Revision: 1684078

URL: http://svn.apache.org/r1684078
Log:
QPID-6576 : End-to-end message encryption

Added:
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BrokerDetails.java
      - copied, changed from r1684077, 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted010MessageFactory.java
   (with props)
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
   (with props)
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/MessageEncryptionHelper.java
   (with props)
    
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted010MessageFactoryTest.java
   (with props)
    
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java
   (with props)
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/messageencryption/
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/messageencryption/MessageEncryptionTest.java
   (with props)
Removed:
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
    
qpid/java/trunk/common/src/test/java/org/apache/qpid/util/PropertyUtilsTest.java
Modified:
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/url/URLParser.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/failover/FailoverMethod.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/failover/NoFailover.java
    
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQConnectionFactoryTest.java
    
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
    
qpid/java/trunk/client/src/test/java/org/apache/qpid/jms/FailoverPolicyTest.java
    
qpid/java/trunk/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
    
qpid/java/trunk/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
    
qpid/java/trunk/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/DeliveryProperties.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/MessageProperties.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/url/BindingURL.java
    
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java 
(original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java 
Sun Jun  7 21:02:02 2015
@@ -20,12 +20,18 @@
  */
 package org.apache.qpid.client;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.channels.UnresolvedAddressException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
@@ -35,27 +41,18 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
+import javax.jms.*;
 import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
 import javax.naming.NamingException;
 import javax.naming.Reference;
 import javax.naming.Referenceable;
@@ -80,13 +77,12 @@ import org.apache.qpid.common.QpidProper
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.Connection;
 import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.jms.FailoverPolicy;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.url.URLSyntaxException;
 
 public class AMQConnection extends Closeable implements CommonConnection, 
Referenceable
@@ -241,6 +237,11 @@ public class AMQConnection extends Close
            _logger.debug("Loaded mechanisms " + registry.getMechanisms());
         }
     }
+
+    private ConnectionSettings _connectionSettings;
+    private final ConcurrentMap<String, KeyStore> _brokerTrustStores = new 
ConcurrentHashMap<>();
+    private Session _brokerTrustStoreSession;
+
     /**
      * @param broker      brokerdetails
      * @param username    username
@@ -257,7 +258,7 @@ public class AMQConnection extends Close
         this(new AMQConnectionURL(
                 ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password 
+ "@"
                 + ((clientName == null) ? "" : clientName) + "/" + virtualHost 
+ "?brokerlist='"
-                + AMQBrokerDetails.checkTransport(broker) + "'"));
+                + BrokerDetails.checkTransport(broker) + "'"));
     }
 
     public AMQConnection(String host, int port, String username, String 
password, String clientName, String virtualHost)
@@ -518,7 +519,7 @@ public class AMQConnection extends Close
                 {
                     ConnectionRedirectException redirect = 
(ConnectionRedirectException) connectionException;
                     retryAllowed = true;
-                    brokerDetails = new AMQBrokerDetails(brokerDetails);
+                    brokerDetails = new BrokerDetails(brokerDetails);
                     brokerDetails.setHost(redirect.getHost());
                     brokerDetails.setPort(redirect.getPort());
                     _protocolHandler.setStateManager(new 
AMQStateManager(_protocolHandler.getProtocolSession()));
@@ -652,7 +653,7 @@ public class AMQConnection extends Close
 
     public boolean attemptReconnection(String host, int port, final boolean 
useFailoverConfigOnFailure)
     {
-        BrokerDetails bd = new 
AMQBrokerDetails(_failoverPolicy.getCurrentBrokerDetails());
+        BrokerDetails bd = new 
BrokerDetails(_failoverPolicy.getCurrentBrokerDetails());
         bd.setHost(host);
         bd.setPort(port);
 
@@ -824,6 +825,76 @@ public class AMQConnection extends Close
         }
     }
 
+    public KeyStore getBrokerSuppliedTrustStore(final String name) throws 
JMSException
+    {
+        synchronized(_brokerTrustStores)
+        {
+            if(!_brokerTrustStores.containsKey(name))
+            {
+                if(_brokerTrustStoreSession == null)
+                {
+                    _brokerTrustStoreSession = _delegate.createSession(false, 
AMQSession.AUTO_ACKNOWLEDGE, 1, 1);
+                    try
+                    {
+                        ((AMQSession) _brokerTrustStoreSession).start();
+                    }
+                    catch (AMQException e)
+                    {
+                        throw JMSExceptionHelper.chainJMSException(new 
JMSException(
+                                "Failed to retrieve virtual host properties"), 
e);
+                    }
+                }
+                final MessageConsumer consumer = 
_brokerTrustStoreSession.createConsumer(_brokerTrustStoreSession.createQueue(
+                        "ADDR: " + name + "; {assert: never, create: never, 
node:{ type: queue }}"));
+                final Message message  = consumer.receive(2000l);
+                if(message != null)
+                {
+                    StreamMessage streamMessage = (StreamMessage) message;
+                    List<X509Certificate> certs = new ArrayList<>();
+                    try
+                    {
+                        try
+                        {
+
+                            final CertificateFactory certFactory = 
CertificateFactory.getInstance("X.509");
+                            byte[] bytes;
+                            while ((bytes = (byte[]) 
streamMessage.readObject()) != null)
+                            {
+                                certs.add((X509Certificate) 
certFactory.generateCertificate(new ByteArrayInputStream(
+                                        bytes)));
+                            }
+                        }
+                        catch (MessageEOFException e)
+                        {
+                            // end of message
+                        }
+                        KeyStore keyStore = 
KeyStore.getInstance(KeyStore.getDefaultType());
+
+                        char[] encryptionTrustStorePassword =
+                                
getConnectionSettings().getEncryptionTrustStorePassword() == null
+                                        ? null
+                                        : 
getConnectionSettings().getEncryptionTrustStorePassword().toCharArray();
+
+                        keyStore.load(null, encryptionTrustStorePassword);
+                        int i = 1;
+                        for (X509Certificate cert : certs)
+                        {
+                            keyStore.setCertificateEntry(String.valueOf(i++), 
cert);
+                        }
+                        _brokerTrustStores.put(name, keyStore);
+                    }
+                    catch (JMSException | GeneralSecurityException | 
IOException e)
+                    {
+                        _logger.error(e.getMessage(), e);
+                    }
+                }
+
+            }
+            return _brokerTrustStores.get(name);
+
+        }
+    }
+
     public void setFailoverPolicy(FailoverPolicy policy)
     {
         _failoverPolicy = policy;
@@ -1802,4 +1873,13 @@ public class AMQConnection extends Close
         return _virtualHostProperties.get(propertyName);
     }
 
+    public void setConnectionSettings(final ConnectionSettings 
connectionSettings)
+    {
+        _connectionSettings = connectionSettings;
+    }
+
+    public ConnectionSettings getConnectionSettings()
+    {
+        return _connectionSettings;
+    }
 }

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
 Sun Jun  7 21:02:02 2015
@@ -29,7 +29,6 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.Session;
 
 public interface AMQConnectionDelegate

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
 Sun Jun  7 21:02:02 2015
@@ -43,7 +43,6 @@ import org.apache.qpid.client.transport.
 import org.apache.qpid.client.util.JMSExceptionHelper;
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ChannelLimitReachedException;
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.jms.Session;
@@ -229,6 +228,7 @@ public class AMQConnectionDelegate_0_10
             _conn.setMaximumChannelCount(_qpidConnection.getChannelMax());
             _conn.getFailoverPolicy().attainedConnection();
             _conn.logConnected(_qpidConnection.getLocalAddress(), 
_qpidConnection.getRemoteAddress());
+            _conn.setConnectionSettings(conSettings);
         }
         catch (ProtocolVersionException pe)
         {

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
 Sun Jun  7 21:02:02 2015
@@ -52,7 +52,6 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.framing.TxSelectBody;
 import org.apache.qpid.framing.TxSelectOkBody;
-import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ChannelLimitReachedException;
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.jms.Session;
@@ -174,6 +173,7 @@ public class AMQConnectionDelegate_8_0 i
                 _confirmedPublishSupported =
                         
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED);
                 _confirmedPublishNonTransactionalSupported = 
checkConfirmedPublishNonTransactionalSupported();
+                _conn.setConnectionSettings(settings);
                 return null;
             }
             else

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
 Sun Jun  7 21:02:02 2015
@@ -21,7 +21,6 @@
 package org.apache.qpid.client;
 
 import org.apache.qpid.client.url.URLParser;
-import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.url.URLHelper;
 import org.apache.qpid.url.URLSyntaxException;

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java 
(original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java 
Sun Jun  7 21:02:02 2015
@@ -95,6 +95,8 @@ public abstract class AMQDestination imp
     public static final int QUEUE_TYPE = 1;
     public static final int TOPIC_TYPE = 2;
     public static final int UNKNOWN_TYPE = 3;
+    private boolean _sendEncrypted;
+    private String _encryptedRecipients;
 
     protected void setExclusive(boolean exclusive)
     {
@@ -126,6 +128,16 @@ public abstract class AMQDestination imp
         return false;
     }
 
+    public boolean sendEncrypted()
+    {
+        return _sendEncrypted;
+    }
+
+    public String getEncryptedRecipients()
+    {
+        return _encryptedRecipients;
+    }
+
     // ----- Fields required to support new address syntax -------
 
     public enum DestSyntax {
@@ -303,6 +315,8 @@ public abstract class AMQDestination imp
         final String rejectBehaviourValue = 
binding.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR);
         _rejectBehaviour = rejectBehaviourValue == null ? null : 
RejectBehaviour.valueOf(rejectBehaviourValue.toUpperCase());
         _consumerArguments = binding.getConsumerOptions();
+        _sendEncrypted = 
Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_SEND_ENCRYPTED));
+        _encryptedRecipients = 
binding.getOption(BindingURL.OPTION_ENCRYPTED_RECIPIENTS);
     }
 
     protected AMQDestination(String exchangeName, String exchangeClass, String 
routingKey, String queueName)
@@ -960,6 +974,9 @@ public abstract class AMQDestination imp
         _addressType = _addrHelper.getNodeType();
         _node =  _addrHelper.getNode();
         _link = _addrHelper.getLink();
+
+        _sendEncrypted = _addrHelper.getSendEncrypted();
+        _encryptedRecipients = _addrHelper.getEncryptedRecipients();
     }
 
     // ----- / new address syntax -----------

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java 
(original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java 
Sun Jun  7 21:02:02 2015
@@ -62,6 +62,7 @@ import org.apache.qpid.client.message.JM
 import org.apache.qpid.client.message.JMSObjectMessage;
 import org.apache.qpid.client.message.JMSStreamMessage;
 import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.MessageEncryptionHelper;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.messaging.address.Node;
@@ -94,6 +95,7 @@ public abstract class AMQSession<C exten
     /** Used for debugging. */
     private static final Logger _logger = 
LoggerFactory.getLogger(AMQSession.class);
 
+
     /** System property to enable strict AMQP compliance. */
     public static final String STRICT_AMQP = "STRICT_AMQP";
 
@@ -247,6 +249,8 @@ public abstract class AMQSession<C exten
     /** Has failover occured on this session with outstanding actions to 
commit? */
     private boolean _failedOverDirty;
 
+    private MessageEncryptionHelper _messageEncryptionHelper;
+
     /** Holds the highest received delivery tag. */
     protected AtomicLong getHighestDeliveryTag()
     {
@@ -313,17 +317,19 @@ public abstract class AMQSession<C exten
 
     /**
      * Creates a new session on a connection.
-     *
      * @param con                     The connection on which to create the 
session.
      * @param channelId               The unique identifier for the session.
      * @param transacted              Indicates whether or not the session is 
transactional.
      * @param acknowledgeMode         The acknowledgement mode for the session.
-     * @param messageFactoryRegistry  The message factory factory for the 
session.
      * @param defaultPrefetchHighMark The maximum number of messages to 
prefetched before suspending the session.
      * @param defaultPrefetchLowMark  The number of prefetched messages at 
which to resume the session.
      */
-    protected AMQSession(AMQConnection con, int channelId, boolean transacted, 
int acknowledgeMode,
-               MessageFactoryRegistry messageFactoryRegistry, int 
defaultPrefetchHighMark, int defaultPrefetchLowMark)
+    protected AMQSession(AMQConnection con,
+                         int channelId,
+                         boolean transacted,
+                         int acknowledgeMode,
+                         int defaultPrefetchHighMark,
+                         int defaultPrefetchLowMark)
     {
         _useAMQPEncodedMapMessage = con == null || 
!con.isUseLegacyMapMessageFormat();
         _useAMQPEncodedStreamMessage = con != null && 
!con.isUseLegacyStreamMessageFormat();
@@ -344,9 +350,9 @@ public abstract class AMQSession<C exten
         {
             _acknowledgeMode = acknowledgeMode;
         }
-
+        _messageEncryptionHelper = new MessageEncryptionHelper(this);
         _channelId = channelId;
-        _messageFactoryRegistry = messageFactoryRegistry;
+        _messageFactoryRegistry = 
MessageFactoryRegistry.newDefaultRegistry(this);
         _prefetchHighMark = defaultPrefetchHighMark;
         _prefetchLowMark = defaultPrefetchLowMark;
 
@@ -428,28 +434,6 @@ public abstract class AMQSession<C exten
         }
     }
 
-    /**
-     * Creates a new session on a connection with the default message factory 
factory.
-     *
-     * @param con                 The connection on which to create the 
session.
-     * @param channelId           The unique identifier for the session.
-     * @param transacted          Indicates whether or not the session is 
transactional.
-     * @param acknowledgeMode     The acknowledgement mode for the session.
-     * @param defaultPrefetchHigh The maximum number of messages to prefetched 
before suspending the session.
-     * @param defaultPrefetchLow  The number of prefetched messages at which 
to resume the session.
-     */
-    AMQSession(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode, int defaultPrefetchHigh,
-               int defaultPrefetchLow)
-    {
-        this(con,
-             channelId,
-             transacted,
-             acknowledgeMode,
-             MessageFactoryRegistry.newDefaultRegistry(),
-             defaultPrefetchHigh,
-             defaultPrefetchLow);
-    }
-
     // ===== JMS Session methods.
 
     /**
@@ -3679,5 +3663,10 @@ public abstract class AMQSession<C exten
         }
     }
 
+
+    public MessageEncryptionHelper getMessageEncryptionHelper()
+    {
+        return _messageEncryptionHelper;
+    }
 }
 

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Sun Jun  7 21:02:02 2015
@@ -140,17 +140,15 @@ public class AMQSession_0_10 extends AMQ
      * @param channelId               The unique identifier for the session.
      * @param transacted              Indicates whether or not the session is 
transactional.
      * @param acknowledgeMode         The acknowledgement mode for the session.
-     * @param messageFactoryRegistry  The message factory factory for the 
session.
      * @param defaultPrefetchHighMark The maximum number of messages to 
prefetched before suspending the session.
      * @param defaultPrefetchLowMark  The number of prefetched messages at 
which to resume the session.
      * @param qpidConnection          The qpid connection
      */
     AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, 
AMQConnection con, int channelId,
-                    boolean transacted, int acknowledgeMode, 
MessageFactoryRegistry messageFactoryRegistry,
-                    int defaultPrefetchHighMark, int 
defaultPrefetchLowMark,String name)
+                    boolean transacted, int acknowledgeMode, int 
defaultPrefetchHighMark, int defaultPrefetchLowMark,String name)
     {
 
-        super(con, channelId, transacted, acknowledgeMode, 
messageFactoryRegistry, defaultPrefetchHighMark,
+        super(con, channelId, transacted, acknowledgeMode, 
defaultPrefetchHighMark,
               defaultPrefetchLowMark);
         _qpidConnection = qpidConnection;
         _name = name;
@@ -184,27 +182,6 @@ public class AMQSession_0_10 extends AMQ
         return qpidSession;
     }
 
-
-    /**
-     * Creates a new session on a connection with the default 0-10 message 
factory.
-     *
-     * @param con                 The connection on which to create the 
session.
-     * @param channelId           The unique identifier for the session.
-     * @param transacted          Indicates whether or not the session is 
transactional.
-     * @param acknowledgeMode     The acknowledgement mode for the session.
-     * @param defaultPrefetchHigh The maximum number of messages to prefetched 
before suspending the session.
-     * @param defaultPrefetchLow  The number of prefetched messages at which 
to resume the session.
-     * @param qpidConnection      The connection
-     */
-    AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, 
AMQConnection con, int channelId,
-                    boolean transacted, int acknowledgeMode, int 
defaultPrefetchHigh, int defaultPrefetchLow,
-                    String name)
-    {
-
-        this(qpidConnection, con, channelId, transacted, acknowledgeMode, 
MessageFactoryRegistry.newDefaultRegistry(),
-             defaultPrefetchHigh, defaultPrefetchLow,name);
-    }
-
     private void addUnacked(int id)
     {
         synchronized (unacked)

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java 
(original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java 
Sun Jun  7 21:02:02 2015
@@ -102,44 +102,25 @@ public class AMQSession_0_8 extends AMQS
 
     /**
      * Creates a new session on a connection.
-     *
      * @param con                     The connection on which to create the 
session.
      * @param channelId               The unique identifier for the session.
      * @param transacted              Indicates whether or not the session is 
transactional.
      * @param acknowledgeMode         The acknowledgement mode for the session.
-     * @param messageFactoryRegistry  The message factory factory for the 
session.
      * @param defaultPrefetchHighMark The maximum number of messages to 
prefetched before suspending the session.
      * @param defaultPrefetchLowMark  The number of prefetched messages at 
which to resume the session.
      */
-    protected AMQSession_0_8(AMQConnection con, int channelId, boolean 
transacted, int acknowledgeMode,
-                             MessageFactoryRegistry messageFactoryRegistry, 
int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+    protected AMQSession_0_8(AMQConnection con,
+                             int channelId,
+                             boolean transacted,
+                             int acknowledgeMode,
+                             int defaultPrefetchHighMark,
+                             int defaultPrefetchLowMark)
     {
 
-        
super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
+        super(con,channelId,transacted,acknowledgeMode, 
defaultPrefetchHighMark,defaultPrefetchLowMark);
         _currentPrefetch.set(0);
     }
 
-    /**
-     * Creates a new session on a connection with the default message factory 
factory.
-     *
-     * @param con                     The connection on which to create the 
session.
-     * @param channelId               The unique identifier for the session.
-     * @param transacted              Indicates whether or not the session is 
transactional.
-     * @param acknowledgeMode         The acknowledgement mode for the session.
-     * @param defaultPrefetchHigh     The maximum number of messages to 
prefetched before suspending the session.
-     * @param defaultPrefetchLow      The number of prefetched messages at 
which to resume the session.
-     */
-    AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode, int defaultPrefetchHigh,
-               int defaultPrefetchLow)
-    {
-        this(con,
-             channelId,
-             transacted,
-             acknowledgeMode,
-             MessageFactoryRegistry.newDefaultRegistry(),
-             defaultPrefetchHigh,
-             defaultPrefetchLow);
-    }
 
     ProtocolVersion getProtocolVersion()
     {

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
 Sun Jun  7 21:02:02 2015
@@ -21,11 +21,18 @@ import static org.apache.qpid.transport.
 import static org.apache.qpid.transport.Option.SYNC;
 import static org.apache.qpid.transport.Option.UNRELIABLE;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import javax.crypto.spec.SecretKeySpec;
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -37,6 +44,8 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
 import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.Encrypted010MessageFactory;
+import org.apache.qpid.client.message.MessageEncryptionHelper;
 import org.apache.qpid.client.message.QpidMessageProperties;
 import org.apache.qpid.client.messaging.address.Link.Reliability;
 import org.apache.qpid.client.util.JMSExceptionHelper;
@@ -48,6 +57,8 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageDeliveryPriority;
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.util.BytesDataOutput;
 import org.apache.qpid.util.GZIPUtils;
 import org.apache.qpid.util.Strings;
 
@@ -206,22 +217,126 @@ public class BasicMessageProducer_0_10 e
         }
 
         ByteBuffer data = message.getData();
+        boolean encrypt = 
message.getBooleanProperty(MessageEncryptionHelper.ENCRYPT_HEADER) || 
destination.sendEncrypted();
+        if(encrypt)
+        {
+            MessageEncryptionHelper encryptionHelper = 
getSession().getMessageEncryptionHelper();
+            try
+            {
+                MessageProperties origMessageProps = messageProps;
+                DeliveryProperties origDeliveryProps = deliveryProp;
+                messageProps = new MessageProperties(messageProps);
+                deliveryProp = new DeliveryProperties(deliveryProp);
+                SecretKeySpec secretKey = encryptionHelper.createSecretKey();
+
+                final Map<String, Object> origApplicationHeaders = 
origMessageProps.getApplicationHeaders();
+                if(origApplicationHeaders != null)
+                {
+                    
origApplicationHeaders.remove(MessageEncryptionHelper.ENCRYPT_HEADER);
+                }
+
+                String recipientString = 
message.getStringProperty(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER);
+                if(recipientString == null)
+                {
+                    recipientString = destination.getEncryptedRecipients();
+                }
+                if(origApplicationHeaders != null)
+                {
+                    
origApplicationHeaders.remove(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER);
+                }
+
+                String unencryptedProperties = 
message.getStringProperty(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER);
+                if(origApplicationHeaders != null)
+                {
+                    
origApplicationHeaders.remove(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER);
+                }
+
+                BBEncoder encoder = new BBEncoder(1024);
+                encoder.writeStruct32(origDeliveryProps);
+                encoder.writeStruct32(origMessageProps);
+                ByteBuffer buf = encoder.buffer();
+
+                final int headerLength = buf.remaining();
+                byte[] unencryptedBytes = new byte[headerLength + (data == 
null ? 0 : data.remaining())];
+                BytesDataOutput output = new BytesDataOutput(unencryptedBytes);
+
+                output.write(buf.array(), buf.arrayOffset()+buf.position(), 
buf.remaining());
+
+                if (data != null)
+                {
+                    data.get(unencryptedBytes, headerLength, data.remaining());
+                }
+
+                byte[] ivbytes = encryptionHelper.getInitialisationVector();
+
+                byte[] encryptedBytes = encryptionHelper.encrypt(secretKey, 
unencryptedBytes, ivbytes);
+                data = ByteBuffer.wrap(encryptedBytes);
+
+                if (recipientString == null)
+                {
+                    throw new JMSException("When sending an encrypted message, 
recipients must be supplied");
+                }
+                String[] recipients = recipientString.split(";");
+                List<List<Object>> encryptedKeys = new ArrayList<>();
+                for(MessageEncryptionHelper.KeyTransportRecipientInfo info : 
encryptionHelper.getKeyTransportRecipientInfo(
+                        Arrays.asList(recipients), secretKey))
+                {
+                    encryptedKeys.add(info.asList());
+                }
+
+                Map<String,Object>  newHeaders = 
messageProps.getApplicationHeaders();
+                if(newHeaders != null)
+                {
+                    newHeaders.clear();
+                }
+                else
+                {
+                    newHeaders = new LinkedHashMap<>();
+                    messageProps.setApplicationHeaders(newHeaders);
+                }
+
+                if(unencryptedProperties != null)
+                {
+                    List<String> unencryptedPropertyNames = 
Arrays.asList(unencryptedProperties.split(" *; *"));
+                    for (String propertyName : unencryptedPropertyNames)
+                    {
+                        if (origApplicationHeaders.containsKey(propertyName))
+                        {
+                            newHeaders.put(propertyName, 
origApplicationHeaders.get(propertyName));
+                        }
+                    }
+                }
+
+                
newHeaders.put(MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY, encryptedKeys);
+                
newHeaders.put(MessageEncryptionHelper.ENCRYPTION_ALGORITHM_PROPERTY,
+                               
encryptionHelper.getMessageEncryptionCipherName());
+                
newHeaders.put(MessageEncryptionHelper.KEY_INIT_VECTOR_PROPERTY, ivbytes);
+                
messageProps.setContentType(Encrypted010MessageFactory.ENCRYPTED_0_10_CONTENT_TYPE);
+
+            }
+            catch (GeneralSecurityException | IOException e)
+            {
+                throw JMSExceptionHelper.chainJMSException(new 
JMSException("Unexpected Exception while encrypting message"), e);
+            }
 
-        if(data != null
-           && data.remaining() > 
getConnection().getMessageCompressionThresholdSize()
-           && getConnection().getDelegate().isMessageCompressionSupported()
-           && getConnection().isMessageCompressionDesired()
-           && messageProps.getContentEncoding() == null)
+        }
+        else
         {
-            byte[] compressed = GZIPUtils.compressBufferToArray(data);
-            if(compressed != null)
+            if (data != null
+                && data.remaining() > 
getConnection().getMessageCompressionThresholdSize()
+                && 
getConnection().getDelegate().isMessageCompressionSupported()
+                && getConnection().isMessageCompressionDesired()
+                && messageProps.getContentEncoding() == null)
             {
-                
messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
-                data = ByteBuffer.wrap(compressed);
+                byte[] compressed = GZIPUtils.compressBufferToArray(data);
+                if (compressed != null)
+                {
+                    
messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
+                    data = ByteBuffer.wrap(compressed);
+                }
             }
         }
 
-
         messageProps.setContentLength(data == null ? 0 : data.remaining());
 
         // send the message

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
 Sun Jun  7 21:02:02 2015
@@ -20,9 +20,15 @@
  */
 package org.apache.qpid.client;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.UUID;
 
+import javax.crypto.spec.SecretKeySpec;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Queue;
@@ -35,6 +41,8 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
 import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.Encrypted091MessageFactory;
+import org.apache.qpid.client.message.MessageEncryptionHelper;
 import org.apache.qpid.client.message.QpidMessageProperties;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
@@ -52,6 +60,7 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.framing.ExchangeDeclareBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.util.BytesDataOutput;
 import org.apache.qpid.util.GZIPUtils;
 
 public class BasicMessageProducer_0_8 extends BasicMessageProducer
@@ -104,7 +113,6 @@ public class BasicMessageProducer_0_8 ex
     {
 
 
-
         AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) 
message.getDelegate();
         BasicContentHeaderProperties contentHeaderProperties = 
delegate.getContentHeaderProperties();
 
@@ -125,15 +133,15 @@ public class BasicMessageProducer_0_8 ex
 
             if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
             {
-               routingKey = 
headers.getString(QpidMessageProperties.QPID_SUBJECT);
+                routingKey = 
headers.getString(QpidMessageProperties.QPID_SUBJECT);
             }
         }
 
         BasicPublishBody body = 
getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
-                                                                               
     destination.getExchangeName(),
-                                                                               
     routingKey,
-                                                                               
     mandatory,
-                                                                               
     immediate);
+                                                                               
         destination.getExchangeName(),
+                                                                               
         routingKey,
+                                                                               
         mandatory,
+                                                                               
         immediate);
 
         AMQFrame publishFrame = body.generateFrame(getChannelId());
 
@@ -158,7 +166,9 @@ public class BasicMessageProducer_0_8 ex
         }
 
         //Set JMS_QPID_DESTTYPE
-        
delegate.getContentHeaderProperties().getHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(),
 type);
+        delegate.getContentHeaderProperties()
+                .getHeaders()
+                
.setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
 
         if (!isDisableTimestamps())
         {
@@ -167,7 +177,7 @@ public class BasicMessageProducer_0_8 ex
 
             if (timeToLive > 0)
             {
-                if(!SET_EXPIRATION_AS_TTL)
+                if (!SET_EXPIRATION_AS_TTL)
                 {
                     //default behaviour used by Qpid
                     contentHeaderProperties.setExpiration(currentTime + 
timeToLive);
@@ -188,37 +198,121 @@ public class BasicMessageProducer_0_8 ex
         contentHeaderProperties.setPriority((byte) priority);
 
         int size = (payload != null) ? payload.remaining() : 0;
+        AMQFrame contentHeaderFrame;
+        final AMQFrame[] frames;
+        boolean encrypt = 
message.getBooleanProperty(MessageEncryptionHelper.ENCRYPT_HEADER) || 
destination.sendEncrypted();
+        if(encrypt)
+        {
+            MessageEncryptionHelper encryptionHelper = 
getSession().getMessageEncryptionHelper();
+            try
+            {
+                SecretKeySpec secretKey = encryptionHelper.createSecretKey();
+
+                
contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.ENCRYPT_HEADER);
+
+                String recipientString = 
message.getStringProperty(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER);
+                if(recipientString == null)
+                {
+                    recipientString = destination.getEncryptedRecipients();
+                }
+                
contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER);
+
+                String unencryptedProperties = 
message.getStringProperty(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER);
+                
contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER);
+
+                final int headerLength = 
contentHeaderProperties.getPropertyListSize() + 2;
+                byte[] unencryptedBytes = new byte[headerLength + size];
+                BytesDataOutput output = new BytesDataOutput(unencryptedBytes);
+                output.writeShort((short) 
(contentHeaderProperties.getPropertyFlags() & 0xffff));
+                contentHeaderProperties.writePropertyListPayload(output);
+
+                if (size != 0)
+                {
+                    payload.get(unencryptedBytes, headerLength, 
payload.remaining());
+                }
+
+                byte[] ivbytes = encryptionHelper.getInitialisationVector();
+
+                byte[] encryptedBytes = encryptionHelper.encrypt(secretKey, 
unencryptedBytes, ivbytes);
+                payload = ByteBuffer.wrap(encryptedBytes);
+
+                if (recipientString == null)
+                {
+                    throw new JMSException("When sending an encrypted message, 
recipients must be supplied");
+                }
+                String[] recipients = recipientString.split(";");
+                List<List<Object>> encryptedKeys = new ArrayList<>();
+                for(MessageEncryptionHelper.KeyTransportRecipientInfo info : 
encryptionHelper.getKeyTransportRecipientInfo(Arrays.asList(recipients), 
secretKey))
+                {
+                    encryptedKeys.add(info.asList());
+                }
 
-        byte[] compressed;
-        if(size > getConnection().getMessageCompressionThresholdSize()
-               && getConnection().getDelegate().isMessageCompressionSupported()
-               && getConnection().isMessageCompressionDesired()
-               && contentHeaderProperties.getEncoding() == null
-               && (compressed = GZIPUtils.compressBufferToArray(payload)) != 
null)
-        {
-            contentHeaderProperties.setEncoding("gzip");
-            payload = ByteBuffer.wrap(compressed);
-            size = compressed.length;
+                BasicContentHeaderProperties oldProps = 
contentHeaderProperties;
+                contentHeaderProperties = new 
BasicContentHeaderProperties(oldProps);
+                final FieldTable oldHeaders = oldProps.getHeaders();
+                final FieldTable newHeaders = 
contentHeaderProperties.getHeaders();
+                newHeaders.clear();
+
+                if(unencryptedProperties != null)
+                {
+                    List<String> unencryptedPropertyNames = 
Arrays.asList(unencryptedProperties.split(" *; *"));
+                    for (String propertyName : unencryptedPropertyNames)
+                    {
+                        if (oldHeaders.propertyExists(propertyName))
+                        {
+                            newHeaders.setObject(propertyName, 
oldHeaders.get(propertyName));
+                        }
+                    }
+                }
+
+                
newHeaders.setObject(MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY, 
encryptedKeys);
+                
newHeaders.setString(MessageEncryptionHelper.ENCRYPTION_ALGORITHM_PROPERTY,
+                                     
encryptionHelper.getMessageEncryptionCipherName());
+                
newHeaders.setBytes(MessageEncryptionHelper.KEY_INIT_VECTOR_PROPERTY, ivbytes);
+                
contentHeaderProperties.setContentType(Encrypted091MessageFactory.ENCRYPTED_0_9_1_CONTENT_TYPE);
+                size = encryptedBytes.length;
+
+            }
+            catch (GeneralSecurityException | IOException e)
+            {
+                throw JMSExceptionHelper.chainJMSException(new 
JMSException("Unexpected Exception while encrypting message"), e);
+            }
 
         }
+        else
+        {
+            byte[] compressed;
+            if (size > getConnection().getMessageCompressionThresholdSize()
+                && 
getConnection().getDelegate().isMessageCompressionSupported()
+                && getConnection().isMessageCompressionDesired()
+                && contentHeaderProperties.getEncoding() == null
+                && (compressed = GZIPUtils.compressBufferToArray(payload)) != 
null)
+            {
+                contentHeaderProperties.setEncoding("gzip");
+                payload = ByteBuffer.wrap(compressed);
+                size = compressed.length;
+
+            }
+        }
         final int contentBodyFrameCount = 
calculateContentBodyFrameCount(payload);
-        final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+        frames = new AMQFrame[2 + contentBodyFrameCount];
 
         if (payload != null)
         {
             createContentBodies(payload, frames, 2, getChannelId());
         }
 
-        if ((contentBodyFrameCount != 0) && getLogger().isDebugEnabled())
+        contentHeaderFrame =
+                ContentHeaderBody.createAMQFrame(getChannelId(),
+                                                 contentHeaderProperties, 
size);
+
+
+        if (getLogger().isDebugEnabled())
         {
-            getLogger().debug("Sending content body frames to " + destination);
+            getLogger().debug("Sending " + (frames.length-2) + " content body 
frames to " + destination);
         }
 
-
-        AMQFrame contentHeaderFrame =
-            ContentHeaderBody.createAMQFrame(getChannelId(),
-                                             contentHeaderProperties, size);
-        if(contentHeaderFrame.getSize() > 
getSession().getAMQConnection().getMaximumFrameSize())
+        if (contentHeaderFrame.getSize() > 
getSession().getAMQConnection().getMaximumFrameSize())
         {
             throw new JMSException("Unable to send message as the headers are 
too large ("
                                    + contentHeaderFrame.getSize()
@@ -231,6 +325,7 @@ public class BasicMessageProducer_0_8 ex
             getLogger().debug("Sending content header frame to " + 
destination);
         }
 
+
         frames[0] = publishFrame;
         frames[1] = contentHeaderFrame;
         final CompositeAMQDataBlock compositeFrame = new 
CompositeAMQDataBlock(frames);

Copied: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BrokerDetails.java 
(from r1684077, 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java)
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BrokerDetails.java?p2=qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BrokerDetails.java&p1=qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java&r1=1684077&r2=1684078&rev=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BrokerDetails.java 
Sun Jun  7 21:02:02 2015
@@ -27,32 +27,80 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.url.URLHelper;
 import org.apache.qpid.url.URLSyntaxException;
 
-public class AMQBrokerDetails implements BrokerDetails, Serializable
+public class BrokerDetails implements Serializable
 {
-    private static final long serialVersionUID = 8450786374975932890L;
-
+    /*
+     * Known URL Options
+     * @see ConnectionURL
+    */
+    public static final String OPTIONS_RETRY = "retries";
+    public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
+    public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
+    public static final String OPTIONS_HEARTBEAT = "heartbeat";
+    @Deprecated
+    public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout";
+    public static final String OPTIONS_SASL_MECHS = "sasl_mechs";
+    public static final String OPTIONS_SASL_ENCRYPTION = "sasl_encryption";
+    public static final String OPTIONS_SSL = "ssl";
+    public static final String OPTIONS_TCP_NO_DELAY = "tcp_nodelay";
+    public static final String OPTIONS_SASL_PROTOCOL_NAME = "sasl_protocol";
+    public static final String OPTIONS_SASL_SERVER_NAME = "sasl_server";
+    public static final String OPTIONS_TRUST_STORE = "trust_store";
+    public static final String OPTIONS_TRUST_STORE_PASSWORD = 
"trust_store_password";
+    public static final String OPTIONS_KEY_STORE = "key_store";
+    public static final String OPTIONS_KEY_STORE_PASSWORD = 
"key_store_password";
+    public static final String OPTIONS_SSL_VERIFY_HOSTNAME = 
"ssl_verify_hostname";
+    public static final String OPTIONS_SSL_CERT_ALIAS = "ssl_cert_alias";
+    public static final String OPTIONS_CLIENT_CERT_PRIV_KEY_PATH = 
"client_cert_priv_key_path";
+    public static final String OPTIONS_CLIENT_CERT_PATH = "client_cert_path";
+    public static final String OPTIONS_CLIENT_CERT_INTERMEDIARY_CERT_PATH = 
"client_cert_intermediary_cert_path" ;
+    public static final String OPTIONS_TRUSTED_CERTIFICATES_PATH = 
"trusted_certs_path";
+
+    public static final String OPTIONS_ENCRYPTION_TRUST_STORE = 
"encryption_trust_store";
+    public static final String OPTIONS_ENCRYPTION_TRUST_STORE_PASSWORD = 
"encryption_trust_store_password";
+    public static final String OPTIONS_ENCRYPTION_REMOTE_TRUST_STORE = 
"encryption_remote_trust_store";
+    public static final String OPTIONS_ENCRYPTION_KEY_STORE = 
"encryption_key_store";
+    public static final String OPTIONS_ENCRYPTION_KEY_STORE_PASSWORD = 
"encryption_key_store_password";
+
+
+    public static final int DEFAULT_PORT = 5672;
+    public static final String TCP = "tcp";
+    public static final String DEFAULT_TRANSPORT = BrokerDetails.TCP;
+    public static final String URL_FORMAT_EXAMPLE =
+            "<transport>://<hostname>[:<port Default=\"" + 
BrokerDetails.DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]";
+    public static final int DEFAULT_CONNECT_TIMEOUT = 30000;
+    public static final boolean USE_SSL_DEFAULT = false;
+    // pulled these properties from the new BrokerDetails class in the qpid 
package
+    public static final String PROTOCOL_TCP = "tcp";
+    public static final String PROTOCOL_TLS = "tls";
+    public static final String VIRTUAL_HOST = "virtualhost";
+    public static final String CLIENT_ID = "client_id";
+    public static final String USERNAME = "username";
+    public static final String PASSWORD = "password";
+    private static final long serialVersionUID = 8762219750300869355L;
     private String _host;
     private int _port;
     private String _transport;
 
     private Map<String, String> _options = new HashMap<String, String>();
+    private AMQConnectionURL _connectionUrl;
 
-    public AMQBrokerDetails(BrokerDetails details)
+    public BrokerDetails(BrokerDetails details)
     {
         _host = details.getHost();
         _port = details.getPort();
         _transport = details.getTransport();
-        _options = new HashMap<>(details.getProperties());
+        _options = new HashMap<>(details._options);
+        _connectionUrl = details._connectionUrl;
     }
 
-    public AMQBrokerDetails(){}
+    public BrokerDetails(){}
     
-    public AMQBrokerDetails(String url) throws URLSyntaxException
+    public BrokerDetails(String url) throws URLSyntaxException
     {        
       
         // URL should be of format 
tcp://host:port?option='value',option='value'
@@ -210,7 +258,7 @@ public class AMQBrokerDetails implements
         }
     }
 
-    public AMQBrokerDetails(String host, int port)
+    public BrokerDetails(String host, int port)
     {
         _host = host;
         _port = port;
@@ -249,7 +297,12 @@ public class AMQBrokerDetails implements
 
     public String getProperty(String key)
     {
-        return _options.get(key);
+        String value = _options.get(key);
+        if(value == null && _connectionUrl != null)
+        {
+            value = _connectionUrl.getOption(key);
+        }
+        return value;
     }
 
     public void setProperty(String key, String value)
@@ -402,16 +455,6 @@ public class AMQBrokerDetails implements
         }
     }
 
-    public Map<String, String> getProperties()
-    {
-        return _options;
-    }
-
-    public void setProperties(Map<String, String> props)
-    {
-        _options = props;
-    }
-
     public ConnectionSettings buildConnectionSettings()
     {
         ConnectionSettings conSettings = new ConnectionSettings();
@@ -508,6 +551,42 @@ public class AMQBrokerDetails implements
                     
String.valueOf(ClientProperties.DEFAULT_CONNECTION_OPTION_SSL_VERIFY_HOST_NAME)));
         
conSettings.setVerifyHostname(getBooleanProperty(BrokerDetails.OPTIONS_SSL_VERIFY_HOSTNAME,
 defaultSSLVerifyHostName ));
 
+        // ----------------------------
+
+        if (getProperty(BrokerDetails.OPTIONS_ENCRYPTION_KEY_STORE) != null)
+        {
+            conSettings.setEncryptionKeyStorePath(
+                    getProperty(BrokerDetails.OPTIONS_ENCRYPTION_KEY_STORE));
+        }
+
+        if (getProperty(BrokerDetails.OPTIONS_ENCRYPTION_KEY_STORE_PASSWORD) 
!= null)
+        {
+            conSettings.setEncryptionKeyStorePassword(
+                    
getProperty(BrokerDetails.OPTIONS_ENCRYPTION_KEY_STORE_PASSWORD));
+        }
+
+
+        if (getProperty(BrokerDetails.OPTIONS_ENCRYPTION_TRUST_STORE) != null)
+        {
+            conSettings.setEncryptionTrustStorePath(
+                    getProperty(BrokerDetails.OPTIONS_ENCRYPTION_TRUST_STORE));
+        }
+
+        if (getProperty(BrokerDetails.OPTIONS_ENCRYPTION_TRUST_STORE_PASSWORD) 
!= null)
+        {
+            conSettings.setEncryptionKeyStorePassword(
+                    
getProperty(BrokerDetails.OPTIONS_ENCRYPTION_TRUST_STORE_PASSWORD));
+        }
+
+
+        if (getProperty(BrokerDetails.OPTIONS_ENCRYPTION_REMOTE_TRUST_STORE) 
!= null)
+        {
+            conSettings.setEncryptionRemoteTrustStoreName(
+                    
getProperty(BrokerDetails.OPTIONS_ENCRYPTION_REMOTE_TRUST_STORE));
+        }
+
+        // ----------------------------
+
         if (getProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY) != null)
         {
             conSettings.setTcpNodelay(
@@ -528,4 +607,8 @@ public class AMQBrokerDetails implements
         return conSettings;
     }
 
+    public void setConnectionUrl(final AMQConnectionURL connectionUrl)
+    {
+        _connectionUrl = connectionUrl;
+    }
 }

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/XASessionImpl.java 
(original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/XASessionImpl.java 
Sun Jun  7 21:02:02 2015
@@ -59,20 +59,19 @@ public class XASessionImpl extends AMQSe
     public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, 
AMQConnection con, int channelId,
                          int defaultPrefetchHigh, int defaultPrefetchLow)
     {
-        this(qpidConnection, con, channelId, false, Session.AUTO_ACKNOWLEDGE,
-             MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, 
defaultPrefetchLow, null);
+        this(qpidConnection, con, channelId, false, Session.AUTO_ACKNOWLEDGE, 
defaultPrefetchHigh, defaultPrefetchLow, null);
      }
 
      public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, 
AMQConnection con, int channelId,
                 int ackMode, int defaultPrefetchHigh, int defaultPrefetchLow)
      {
-        this(qpidConnection, con, channelId, false, ackMode, 
MessageFactoryRegistry.newDefaultRegistry(),
+        this(qpidConnection, con, channelId, false, ackMode,
                         defaultPrefetchHigh, defaultPrefetchLow, null);
 
      }
 
      public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, 
AMQConnection con, int channelId,
-               boolean transacted, int ackMode, MessageFactoryRegistry 
registry, int defaultPrefetchHigh, int defaultPrefetchLow,
+               boolean transacted, int ackMode, int defaultPrefetchHigh, int 
defaultPrefetchLow,
                String name)
      {
         super(qpidConnection,
@@ -80,7 +79,6 @@ public class XASessionImpl extends AMQSe
               channelId,
               transacted,
               ackMode,
-              registry,
               defaultPrefetchHigh,
               defaultPrefetchLow,
               name);

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
 Sun Jun  7 21:02:02 2015
@@ -52,7 +52,7 @@ public interface AMQMessageDelegate
 
     void setJMSReplyTo(Destination destination) throws JMSException;
 
-    Destination getJMSDestination() throws JMSException;
+    Destination getJMSDestination();
 
     int getJMSDeliveryMode() throws JMSException;
 
@@ -134,4 +134,5 @@ public interface AMQMessageDelegate
     long getDeliveryTag();
 
     void setJMSMessageID(final UUID messageId) throws JMSException;
+
 }

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
 Sun Jun  7 21:02:02 2015
@@ -337,7 +337,7 @@ public class AMQMessageDelegate_0_10 ext
         _messageProps.setReplyTo(replyTo);
     }
 
-    public Destination getJMSDestination() throws JMSException
+    public Destination getJMSDestination()
     {
         return _destination;
     }
@@ -995,4 +995,16 @@ public class AMQMessageDelegate_0_10 ext
     {
         return _deliveryProps;
     }
+
+    @Override
+    Object getProperty(final String name)
+    {
+        return _messageProps.getApplicationHeaders().get(name);
+    }
+
+    @Override
+    boolean hasProperty(final String name)
+    {
+        return _messageProps.getApplicationHeaders().containsKey(name);
+    }
 }

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
 Sun Jun  7 21:02:02 2015
@@ -78,7 +78,7 @@ public class AMQMessageDelegate_0_8 exte
     private BasicContentHeaderProperties _contentHeaderProperties;
 
     // The base set of items that needs to be set. 
-    private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, 
long deliveryTag)
+    public AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, 
long deliveryTag)
     {
         super(deliveryTag);
         _contentHeaderProperties = properties;
@@ -333,7 +333,7 @@ public class AMQMessageDelegate_0_8 exte
         getContentHeaderProperties().setReplyTo(encodedDestination);
     }
 
-    public Destination getJMSDestination() throws JMSException
+    public Destination getJMSDestination()
     {
         return _destination;
     }
@@ -631,6 +631,18 @@ public class AMQMessageDelegate_0_8 exte
         _readableProperties = false;
     }
 
+    @Override
+    Object getProperty(final String name)
+    {
+        return getContentHeaderProperties().getHeaders().get(name);
+    }
+
+    @Override
+    boolean hasProperty(final String name)
+    {
+        return getContentHeaderProperties().getHeaders().containsKey(name);
+    }
+
     private static class DefaultRouterDestination extends AMQDestination 
implements Queue
     {
         private static final long serialVersionUID = -5042408431861384536L;

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java
 Sun Jun  7 21:02:02 2015
@@ -23,22 +23,16 @@ package org.apache.qpid.client.message;
 
 import org.apache.qpid.AMQException;
 
-import javax.jms.JMSException;
 import java.nio.ByteBuffer;
 
 public class AMQPEncodedListMessageFactory extends AbstractJMSMessageFactory
 {
     @Override
-    protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate,
+    protected AbstractJMSMessage createMessage(AbstractAMQMessageDelegate 
delegate,
             ByteBuffer data) throws AMQException
     {
         return new AMQPEncodedListMessage(delegate,data);
     }
 
 
-    public AbstractJMSMessage createMessage(
-            AMQMessageDelegateFactory delegateFactory) throws JMSException
-    {
-        return new AMQPEncodedListMessage(delegateFactory);
-    }
 }

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
 Sun Jun  7 21:02:02 2015
@@ -23,24 +23,17 @@ package org.apache.qpid.client.message;
 
 import org.apache.qpid.AMQException;
 
-import javax.jms.JMSException;
 import java.nio.ByteBuffer;
 
 public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory
 {
 
     @Override
-    protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate,
+    protected AbstractJMSMessage createMessage(AbstractAMQMessageDelegate 
delegate,
             ByteBuffer data) throws AMQException
     {
         return new AMQPEncodedMapMessage(delegate,data);
     }
 
 
-    public AbstractJMSMessage createMessage(
-            AMQMessageDelegateFactory delegateFactory) throws JMSException
-    {
-        return new AMQPEncodedMapMessage(delegateFactory);
-    }
-
 }

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
 Sun Jun  7 21:02:02 2015
@@ -305,6 +305,9 @@ public abstract class AbstractAMQMessage
             return generateDestination(exchange, routingKey);
         }
     }
+
+    abstract Object getProperty(String name);
+    abstract boolean hasProperty(String name);
 }
 
 class ExchangeInfo

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 Sun Jun  7 21:02:02 2015
@@ -42,7 +42,7 @@ import org.apache.qpid.transport.Deliver
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.util.GZIPUtils;
 
-public abstract class AbstractJMSMessageFactory implements MessageFactory
+public abstract class AbstractJMSMessageFactory
 {
     private static final Logger _logger = 
LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
 
@@ -117,15 +117,15 @@ public abstract class AbstractJMSMessage
                     .remaining());
         }
 
-        AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
-                                                                 
contentHeader.getProperties(),
-                                                                 exchange, 
routingKey, queueDestinationCache,
-                                                                 
topicDestinationCache, addressType);
+        AMQMessageDelegate_0_8 delegate = new 
AMQMessageDelegate_0_8(messageNbr,
+                                                                     
contentHeader.getProperties(),
+                                                                     exchange, 
routingKey, queueDestinationCache,
+                                                                     
topicDestinationCache, addressType);
 
         return createMessage(delegate, data);
     }
 
-    protected abstract AbstractJMSMessage createMessage(AMQMessageDelegate 
delegate, ByteBuffer data) throws AMQException;
+    protected abstract AbstractJMSMessage 
createMessage(AbstractAMQMessageDelegate delegate, ByteBuffer data) throws 
AMQException;
 
 
     protected AbstractJMSMessage create010MessageWithBody(long messageNbr, 
MessageProperties msgProps,
@@ -159,13 +159,12 @@ public abstract class AbstractJMSMessage
                 data = ByteBuffer.wrap(uncompressed);
             }
         }
-        AMQMessageDelegate delegate = new AMQMessageDelegate_0_10(msgProps, 
deliveryProps, messageNbr);
+        AMQMessageDelegate_0_10 delegate = new 
AMQMessageDelegate_0_10(msgProps, deliveryProps, messageNbr);
 
         AbstractJMSMessage message = createMessage(delegate, data);
         return message;
     }
 
-    @Override
     public AbstractJMSMessage createMessage(long messageNbr, boolean 
redelivered, ContentHeaderBody contentHeader,
                                             String exchange, String 
routingKey, List bodies,
                                                          
AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,

Added: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted010MessageFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted010MessageFactory.java?rev=1684078&view=auto
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted010MessageFactory.java
 (added)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted010MessageFactory.java
 Sun Jun  7 21:02:02 2015
@@ -0,0 +1,265 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.security.PrivateKey;
+import java.util.Collection;
+import java.util.Iterator;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import javax.security.auth.x500.X500Principal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.codec.BBDecoder;
+
+public class Encrypted010MessageFactory extends AbstractJMSMessageFactory
+{
+    public static final String ENCRYPTED_0_10_CONTENT_TYPE = 
"application/qpid-0-10-encrypted";
+    private final MessageFactoryRegistry _messageFactoryRegistry;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Encrypted010MessageFactory.class);
+
+    public Encrypted010MessageFactory(final MessageFactoryRegistry 
messageFactoryRegistry)
+    {
+        _messageFactoryRegistry = messageFactoryRegistry;
+    }
+
+    @Override
+    protected AbstractJMSMessage createMessage(final 
AbstractAMQMessageDelegate delegate, final ByteBuffer data)
+            throws AMQException
+    {
+        SecretKeySpec secretKeySpec;
+        String algorithm;
+        byte[] initVector;
+        try
+        {
+
+
+            try
+            {
+                if 
(delegate.hasProperty(MessageEncryptionHelper.ENCRYPTION_ALGORITHM_PROPERTY))
+                {
+                    algorithm = 
delegate.getProperty(MessageEncryptionHelper.ENCRYPTION_ALGORITHM_PROPERTY).toString();
+
+                    if 
(delegate.hasProperty(MessageEncryptionHelper.KEY_INIT_VECTOR_PROPERTY))
+                    {
+                        Object ivObj = 
delegate.getProperty(MessageEncryptionHelper.KEY_INIT_VECTOR_PROPERTY);
+                        if (ivObj instanceof byte[])
+                        {
+                            initVector = (byte[]) ivObj;
+                        }
+                        else
+                        {
+                            throw new AMQException("If the property '"
+                                                   + 
MessageEncryptionHelper.KEY_INIT_VECTOR_PROPERTY
+                                                   + "' is present, it must 
contain a byte array");
+                        }
+                    }
+                    else
+                    {
+                        initVector = null;
+                    }
+                    if 
(delegate.hasProperty(MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY))
+                    {
+                        Object keyInfoObj = 
delegate.getProperty(MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY);
+                        if (keyInfoObj instanceof Collection)
+                        {
+                            secretKeySpec = 
getContentEncryptionKey((Collection) keyInfoObj,
+                                                                    algorithm,
+                                                                    
_messageFactoryRegistry.getSession());
+                            if (secretKeySpec == null)
+                            {
+                                throw new AMQException("Could not locate key 
information to decrypt the message");
+                            }
+                        }
+                        else
+                        {
+                            throw new AMQException("An encrypted message must 
contain the property '"
+                                                   + 
MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY
+                                                   + "'");
+                        }
+                    }
+                    else
+                    {
+                        throw new AMQException("An encrypted message must 
contain the property '"
+                                               + 
MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY
+                                               + "'");
+                    }
+
+                }
+                else
+                {
+                    throw new AMQException("Encrypted message must carry the 
encryption algorithm in the property '"
+                                           + 
MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY
+                                           + "'");
+                }
+
+                Cipher cipher = Cipher.getInstance(algorithm);
+                cipher.init(Cipher.DECRYPT_MODE, secretKeySpec, new 
IvParameterSpec(initVector));
+                byte[] encryptedData;
+                int offset;
+                int length;
+                if (data.hasArray())
+                {
+                    encryptedData = data.array();
+                    offset = data.arrayOffset() + data.position();
+                    length = data.remaining();
+                }
+                else
+                {
+                    encryptedData = new byte[data.remaining()];
+                    data.duplicate().get(encryptedData);
+                    offset = 0;
+                    length = encryptedData.length;
+                }
+                final byte[] unencryptedBytes = decryptData(cipher, 
encryptedData, offset, length);
+                ByteBuffer buf = ByteBuffer.wrap(unencryptedBytes);
+
+                BBDecoder decoder = new BBDecoder();
+                decoder.init(buf);
+                DeliveryProperties deliveryProperties = (DeliveryProperties) 
decoder.readStruct32();
+                MessageProperties messageProperties = (MessageProperties) 
decoder.readStruct32();
+
+                int payloadOffset = buf.position();
+
+                final ByteBuffer unencryptedData =
+                        ByteBuffer.wrap(unencryptedBytes, payloadOffset, 
unencryptedBytes.length - payloadOffset);
+
+                final AbstractAMQMessageDelegate newDelegate =
+                        new AMQMessageDelegate_0_10(messageProperties, 
deliveryProperties, delegate.getDeliveryTag());
+                newDelegate.setJMSDestination(delegate.getJMSDestination());
+
+
+                final AbstractJMSMessageFactory unencryptedMessageFactory =
+                        
_messageFactoryRegistry.getMessageFactory(messageProperties.getContentType());
+
+                return unencryptedMessageFactory.createMessage(newDelegate, 
unencryptedData);
+            }
+            catch (GeneralSecurityException | IOException e)
+            {
+                throw new AMQException("Could not decode encrypted message", 
e);
+
+            }
+
+        }
+        catch(AMQException e)
+        {
+            LOGGER.error("Error when attempting to decrypt message " + 
delegate.getDeliveryTag() + " to address ("+delegate.getJMSDestination()+").  
Message will be delivered to the client encrypted", e);
+            return 
_messageFactoryRegistry.getDefaultFactory().createMessage(delegate, data);
+        }
+    }
+
+    private byte[] decryptData(final Cipher cipher, final byte[] 
encryptedData, final int offset, final int length)
+            throws IOException
+    {
+        final byte[] unencryptedBytes;
+        try (CipherInputStream cipherInputStream = new CipherInputStream(new 
ByteArrayInputStream(encryptedData,
+                                                                               
                   offset,
+                                                                               
                   length), cipher))
+        {
+            byte[] buf = new byte[512];
+            int pos = 0;
+            int read;
+            while ((read = cipherInputStream.read(buf, pos, buf.length - pos)) 
!= -1)
+            {
+                pos += read;
+                if (pos == buf.length)
+                {
+                    byte[] tmp = buf;
+                    buf = new byte[buf.length + 512];
+                    System.arraycopy(tmp, 0, buf, 0, tmp.length);
+                }
+            }
+            unencryptedBytes= new byte[pos];
+            System.arraycopy(buf, 0, unencryptedBytes, 0, pos);
+        }
+        return unencryptedBytes;
+    }
+
+    private SecretKeySpec getContentEncryptionKey(final Collection 
keyInfoObjList,
+                                                  final String algorithm,
+                                                  final AMQSession<?, ?> 
session)
+            throws AMQException, GeneralSecurityException, IOException
+    {
+
+        for(Object keyInfoObject : keyInfoObjList)
+        {
+            try
+            {
+                Iterator iter = ((Collection)keyInfoObject).iterator();
+
+                int type = ((Number)iter.next()).intValue();
+                switch(type)
+                {
+                    case 1:
+                        String keyEncryptionAlgorithm = (String) iter.next();
+                        X500Principal issuer = new 
X500Principal((String)iter.next());
+                        BigInteger serialNumber = new 
BigInteger((String)iter.next());
+                        byte[] encryptedKey = (byte[])iter.next();
+
+                        PrivateKey privateKey = getPrivateKey(session, issuer, 
serialNumber);
+                        if(privateKey != null)
+                        {
+                            Cipher cipher = 
Cipher.getInstance(keyEncryptionAlgorithm);
+                            cipher.init(Cipher.DECRYPT_MODE, privateKey);
+                            byte[] decryptedData = decryptData(cipher, 
encryptedKey, 0, encryptedKey.length);
+                            SecretKeySpec keySpec = new 
SecretKeySpec(decryptedData, algorithm.split("/")[0]);
+                            return keySpec;
+                        }
+                        break;
+                    default:
+                        throw new AMQException("Invalid format of 
'x-qpid-encrypted-keys' - unknown key info type: " + type);
+
+                }
+            }
+            catch(ClassCastException e)
+            {
+                throw new AMQException("Invalid format of 
'x-qpid-encrypted-keys'");
+            }
+        }
+        return null;
+    }
+
+    private PrivateKey getPrivateKey(final AMQSession<?, ?> session,
+                                     final X500Principal issuer,
+                                     final BigInteger serialNumber)
+            throws GeneralSecurityException, IOException
+    {
+        return 
session.getMessageEncryptionHelper().getEncryptionPrivateKey(issuer, 
serialNumber);
+    }
+
+}

Propchange: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted010MessageFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to