Author: rgodfrey
Date: Sun Jun 7 21:02:02 2015
New Revision: 1684078
URL: http://svn.apache.org/r1684078
Log:
QPID-6576 : End-to-end message encryption
Added:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BrokerDetails.java
- copied, changed from r1684077,
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted010MessageFactory.java
(with props)
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
(with props)
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/MessageEncryptionHelper.java
(with props)
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted010MessageFactoryTest.java
(with props)
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java
(with props)
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/messageencryption/
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/messageencryption/MessageEncryptionTest.java
(with props)
Removed:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/util/PropertyUtilsTest.java
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/url/URLParser.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/url/URLParser_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/failover/FailoverMethod.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/failover/NoFailover.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQConnectionFactoryTest.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/jms/FailoverPolicyTest.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/DeliveryProperties.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/MessageProperties.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/url/BindingURL.java
qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Sun Jun 7 21:02:02 2015
@@ -20,12 +20,18 @@
*/
package org.apache.qpid.client;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
@@ -35,27 +41,18 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
+import javax.jms.*;
import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
@@ -80,13 +77,12 @@ import org.apache.qpid.common.QpidProper
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.Connection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.url.URLSyntaxException;
public class AMQConnection extends Closeable implements CommonConnection,
Referenceable
@@ -241,6 +237,11 @@ public class AMQConnection extends Close
_logger.debug("Loaded mechanisms " + registry.getMechanisms());
}
}
+
+ private ConnectionSettings _connectionSettings;
+ private final ConcurrentMap<String, KeyStore> _brokerTrustStores = new
ConcurrentHashMap<>();
+ private Session _brokerTrustStoreSession;
+
/**
* @param broker brokerdetails
* @param username username
@@ -257,7 +258,7 @@ public class AMQConnection extends Close
this(new AMQConnectionURL(
ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password
+ "@"
+ ((clientName == null) ? "" : clientName) + "/" + virtualHost
+ "?brokerlist='"
- + AMQBrokerDetails.checkTransport(broker) + "'"));
+ + BrokerDetails.checkTransport(broker) + "'"));
}
public AMQConnection(String host, int port, String username, String
password, String clientName, String virtualHost)
@@ -518,7 +519,7 @@ public class AMQConnection extends Close
{
ConnectionRedirectException redirect =
(ConnectionRedirectException) connectionException;
retryAllowed = true;
- brokerDetails = new AMQBrokerDetails(brokerDetails);
+ brokerDetails = new BrokerDetails(brokerDetails);
brokerDetails.setHost(redirect.getHost());
brokerDetails.setPort(redirect.getPort());
_protocolHandler.setStateManager(new
AMQStateManager(_protocolHandler.getProtocolSession()));
@@ -652,7 +653,7 @@ public class AMQConnection extends Close
public boolean attemptReconnection(String host, int port, final boolean
useFailoverConfigOnFailure)
{
- BrokerDetails bd = new
AMQBrokerDetails(_failoverPolicy.getCurrentBrokerDetails());
+ BrokerDetails bd = new
BrokerDetails(_failoverPolicy.getCurrentBrokerDetails());
bd.setHost(host);
bd.setPort(port);
@@ -824,6 +825,76 @@ public class AMQConnection extends Close
}
}
+ public KeyStore getBrokerSuppliedTrustStore(final String name) throws
JMSException
+ {
+ synchronized(_brokerTrustStores)
+ {
+ if(!_brokerTrustStores.containsKey(name))
+ {
+ if(_brokerTrustStoreSession == null)
+ {
+ _brokerTrustStoreSession = _delegate.createSession(false,
AMQSession.AUTO_ACKNOWLEDGE, 1, 1);
+ try
+ {
+ ((AMQSession) _brokerTrustStoreSession).start();
+ }
+ catch (AMQException e)
+ {
+ throw JMSExceptionHelper.chainJMSException(new
JMSException(
+ "Failed to retrieve virtual host properties"),
e);
+ }
+ }
+ final MessageConsumer consumer =
_brokerTrustStoreSession.createConsumer(_brokerTrustStoreSession.createQueue(
+ "ADDR: " + name + "; {assert: never, create: never,
node:{ type: queue }}"));
+ final Message message = consumer.receive(2000l);
+ if(message != null)
+ {
+ StreamMessage streamMessage = (StreamMessage) message;
+ List<X509Certificate> certs = new ArrayList<>();
+ try
+ {
+ try
+ {
+
+ final CertificateFactory certFactory =
CertificateFactory.getInstance("X.509");
+ byte[] bytes;
+ while ((bytes = (byte[])
streamMessage.readObject()) != null)
+ {
+ certs.add((X509Certificate)
certFactory.generateCertificate(new ByteArrayInputStream(
+ bytes)));
+ }
+ }
+ catch (MessageEOFException e)
+ {
+ // end of message
+ }
+ KeyStore keyStore =
KeyStore.getInstance(KeyStore.getDefaultType());
+
+ char[] encryptionTrustStorePassword =
+
getConnectionSettings().getEncryptionTrustStorePassword() == null
+ ? null
+ :
getConnectionSettings().getEncryptionTrustStorePassword().toCharArray();
+
+ keyStore.load(null, encryptionTrustStorePassword);
+ int i = 1;
+ for (X509Certificate cert : certs)
+ {
+ keyStore.setCertificateEntry(String.valueOf(i++),
cert);
+ }
+ _brokerTrustStores.put(name, keyStore);
+ }
+ catch (JMSException | GeneralSecurityException |
IOException e)
+ {
+ _logger.error(e.getMessage(), e);
+ }
+ }
+
+ }
+ return _brokerTrustStores.get(name);
+
+ }
+ }
+
public void setFailoverPolicy(FailoverPolicy policy)
{
_failoverPolicy = policy;
@@ -1802,4 +1873,13 @@ public class AMQConnection extends Close
return _virtualHostProperties.get(propertyName);
}
+ public void setConnectionSettings(final ConnectionSettings
connectionSettings)
+ {
+ _connectionSettings = connectionSettings;
+ }
+
+ public ConnectionSettings getConnectionSettings()
+ {
+ return _connectionSettings;
+ }
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
Sun Jun 7 21:02:02 2015
@@ -29,7 +29,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
public interface AMQConnectionDelegate
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
Sun Jun 7 21:02:02 2015
@@ -43,7 +43,6 @@ import org.apache.qpid.client.transport.
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.Session;
@@ -229,6 +228,7 @@ public class AMQConnectionDelegate_0_10
_conn.setMaximumChannelCount(_qpidConnection.getChannelMax());
_conn.getFailoverPolicy().attainedConnection();
_conn.logConnected(_qpidConnection.getLocalAddress(),
_qpidConnection.getRemoteAddress());
+ _conn.setConnectionSettings(conSettings);
}
catch (ProtocolVersionException pe)
{
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
Sun Jun 7 21:02:02 2015
@@ -52,7 +52,6 @@ import org.apache.qpid.framing.FieldTabl
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
-import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.Session;
@@ -174,6 +173,7 @@ public class AMQConnectionDelegate_8_0 i
_confirmedPublishSupported =
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED);
_confirmedPublishNonTransactionalSupported =
checkConfirmedPublishNonTransactionalSupported();
+ _conn.setConnectionSettings(settings);
return null;
}
else
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
Sun Jun 7 21:02:02 2015
@@ -21,7 +21,6 @@
package org.apache.qpid.client;
import org.apache.qpid.client.url.URLParser;
-import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.url.URLHelper;
import org.apache.qpid.url.URLSyntaxException;
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java
Sun Jun 7 21:02:02 2015
@@ -95,6 +95,8 @@ public abstract class AMQDestination imp
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
+ private boolean _sendEncrypted;
+ private String _encryptedRecipients;
protected void setExclusive(boolean exclusive)
{
@@ -126,6 +128,16 @@ public abstract class AMQDestination imp
return false;
}
+ public boolean sendEncrypted()
+ {
+ return _sendEncrypted;
+ }
+
+ public String getEncryptedRecipients()
+ {
+ return _encryptedRecipients;
+ }
+
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
@@ -303,6 +315,8 @@ public abstract class AMQDestination imp
final String rejectBehaviourValue =
binding.getOption(BindingURL.OPTION_REJECT_BEHAVIOUR);
_rejectBehaviour = rejectBehaviourValue == null ? null :
RejectBehaviour.valueOf(rejectBehaviourValue.toUpperCase());
_consumerArguments = binding.getConsumerOptions();
+ _sendEncrypted =
Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_SEND_ENCRYPTED));
+ _encryptedRecipients =
binding.getOption(BindingURL.OPTION_ENCRYPTED_RECIPIENTS);
}
protected AMQDestination(String exchangeName, String exchangeClass, String
routingKey, String queueName)
@@ -960,6 +974,9 @@ public abstract class AMQDestination imp
_addressType = _addrHelper.getNodeType();
_node = _addrHelper.getNode();
_link = _addrHelper.getLink();
+
+ _sendEncrypted = _addrHelper.getSendEncrypted();
+ _encryptedRecipients = _addrHelper.getEncryptedRecipients();
}
// ----- / new address syntax -----------
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
Sun Jun 7 21:02:02 2015
@@ -62,6 +62,7 @@ import org.apache.qpid.client.message.JM
import org.apache.qpid.client.message.JMSObjectMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.MessageEncryptionHelper;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.messaging.address.Node;
@@ -94,6 +95,7 @@ public abstract class AMQSession<C exten
/** Used for debugging. */
private static final Logger _logger =
LoggerFactory.getLogger(AMQSession.class);
+
/** System property to enable strict AMQP compliance. */
public static final String STRICT_AMQP = "STRICT_AMQP";
@@ -247,6 +249,8 @@ public abstract class AMQSession<C exten
/** Has failover occured on this session with outstanding actions to
commit? */
private boolean _failedOverDirty;
+ private MessageEncryptionHelper _messageEncryptionHelper;
+
/** Holds the highest received delivery tag. */
protected AtomicLong getHighestDeliveryTag()
{
@@ -313,17 +317,19 @@ public abstract class AMQSession<C exten
/**
* Creates a new session on a connection.
- *
* @param con The connection on which to create the
session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is
transactional.
* @param acknowledgeMode The acknowledgement mode for the session.
- * @param messageFactoryRegistry The message factory factory for the
session.
* @param defaultPrefetchHighMark The maximum number of messages to
prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at
which to resume the session.
*/
- protected AMQSession(AMQConnection con, int channelId, boolean transacted,
int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int
defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ protected AMQSession(AMQConnection con,
+ int channelId,
+ boolean transacted,
+ int acknowledgeMode,
+ int defaultPrefetchHighMark,
+ int defaultPrefetchLowMark)
{
_useAMQPEncodedMapMessage = con == null ||
!con.isUseLegacyMapMessageFormat();
_useAMQPEncodedStreamMessage = con != null &&
!con.isUseLegacyStreamMessageFormat();
@@ -344,9 +350,9 @@ public abstract class AMQSession<C exten
{
_acknowledgeMode = acknowledgeMode;
}
-
+ _messageEncryptionHelper = new MessageEncryptionHelper(this);
_channelId = channelId;
- _messageFactoryRegistry = messageFactoryRegistry;
+ _messageFactoryRegistry =
MessageFactoryRegistry.newDefaultRegistry(this);
_prefetchHighMark = defaultPrefetchHighMark;
_prefetchLowMark = defaultPrefetchLowMark;
@@ -428,28 +434,6 @@ public abstract class AMQSession<C exten
}
}
- /**
- * Creates a new session on a connection with the default message factory
factory.
- *
- * @param con The connection on which to create the
session.
- * @param channelId The unique identifier for the session.
- * @param transacted Indicates whether or not the session is
transactional.
- * @param acknowledgeMode The acknowledgement mode for the session.
- * @param defaultPrefetchHigh The maximum number of messages to prefetched
before suspending the session.
- * @param defaultPrefetchLow The number of prefetched messages at which
to resume the session.
- */
- AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode, int defaultPrefetchHigh,
- int defaultPrefetchLow)
- {
- this(con,
- channelId,
- transacted,
- acknowledgeMode,
- MessageFactoryRegistry.newDefaultRegistry(),
- defaultPrefetchHigh,
- defaultPrefetchLow);
- }
-
// ===== JMS Session methods.
/**
@@ -3679,5 +3663,10 @@ public abstract class AMQSession<C exten
}
}
+
+ public MessageEncryptionHelper getMessageEncryptionHelper()
+ {
+ return _messageEncryptionHelper;
+ }
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Sun Jun 7 21:02:02 2015
@@ -140,17 +140,15 @@ public class AMQSession_0_10 extends AMQ
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is
transactional.
* @param acknowledgeMode The acknowledgement mode for the session.
- * @param messageFactoryRegistry The message factory factory for the
session.
* @param defaultPrefetchHighMark The maximum number of messages to
prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at
which to resume the session.
* @param qpidConnection The qpid connection
*/
AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection,
AMQConnection con, int channelId,
- boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry,
- int defaultPrefetchHighMark, int
defaultPrefetchLowMark,String name)
+ boolean transacted, int acknowledgeMode, int
defaultPrefetchHighMark, int defaultPrefetchLowMark,String name)
{
- super(con, channelId, transacted, acknowledgeMode,
messageFactoryRegistry, defaultPrefetchHighMark,
+ super(con, channelId, transacted, acknowledgeMode,
defaultPrefetchHighMark,
defaultPrefetchLowMark);
_qpidConnection = qpidConnection;
_name = name;
@@ -184,27 +182,6 @@ public class AMQSession_0_10 extends AMQ
return qpidSession;
}
-
- /**
- * Creates a new session on a connection with the default 0-10 message
factory.
- *
- * @param con The connection on which to create the
session.
- * @param channelId The unique identifier for the session.
- * @param transacted Indicates whether or not the session is
transactional.
- * @param acknowledgeMode The acknowledgement mode for the session.
- * @param defaultPrefetchHigh The maximum number of messages to prefetched
before suspending the session.
- * @param defaultPrefetchLow The number of prefetched messages at which
to resume the session.
- * @param qpidConnection The connection
- */
- AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection,
AMQConnection con, int channelId,
- boolean transacted, int acknowledgeMode, int
defaultPrefetchHigh, int defaultPrefetchLow,
- String name)
- {
-
- this(qpidConnection, con, channelId, transacted, acknowledgeMode,
MessageFactoryRegistry.newDefaultRegistry(),
- defaultPrefetchHigh, defaultPrefetchLow,name);
- }
-
private void addUnacked(int id)
{
synchronized (unacked)
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Sun Jun 7 21:02:02 2015
@@ -102,44 +102,25 @@ public class AMQSession_0_8 extends AMQS
/**
* Creates a new session on a connection.
- *
* @param con The connection on which to create the
session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is
transactional.
* @param acknowledgeMode The acknowledgement mode for the session.
- * @param messageFactoryRegistry The message factory factory for the
session.
* @param defaultPrefetchHighMark The maximum number of messages to
prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at
which to resume the session.
*/
- protected AMQSession_0_8(AMQConnection con, int channelId, boolean
transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry,
int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ protected AMQSession_0_8(AMQConnection con,
+ int channelId,
+ boolean transacted,
+ int acknowledgeMode,
+ int defaultPrefetchHighMark,
+ int defaultPrefetchLowMark)
{
-
super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
+ super(con,channelId,transacted,acknowledgeMode,
defaultPrefetchHighMark,defaultPrefetchLowMark);
_currentPrefetch.set(0);
}
- /**
- * Creates a new session on a connection with the default message factory
factory.
- *
- * @param con The connection on which to create the
session.
- * @param channelId The unique identifier for the session.
- * @param transacted Indicates whether or not the session is
transactional.
- * @param acknowledgeMode The acknowledgement mode for the session.
- * @param defaultPrefetchHigh The maximum number of messages to
prefetched before suspending the session.
- * @param defaultPrefetchLow The number of prefetched messages at
which to resume the session.
- */
- AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode, int defaultPrefetchHigh,
- int defaultPrefetchLow)
- {
- this(con,
- channelId,
- transacted,
- acknowledgeMode,
- MessageFactoryRegistry.newDefaultRegistry(),
- defaultPrefetchHigh,
- defaultPrefetchLow);
- }
ProtocolVersion getProtocolVersion()
{
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
Sun Jun 7 21:02:02 2015
@@ -21,11 +21,18 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.Option.SYNC;
import static org.apache.qpid.transport.Option.UNRELIABLE;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
+import javax.crypto.spec.SecretKeySpec;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -37,6 +44,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.Encrypted010MessageFactory;
+import org.apache.qpid.client.message.MessageEncryptionHelper;
import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.util.JMSExceptionHelper;
@@ -48,6 +57,8 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.util.BytesDataOutput;
import org.apache.qpid.util.GZIPUtils;
import org.apache.qpid.util.Strings;
@@ -206,22 +217,126 @@ public class BasicMessageProducer_0_10 e
}
ByteBuffer data = message.getData();
+ boolean encrypt =
message.getBooleanProperty(MessageEncryptionHelper.ENCRYPT_HEADER) ||
destination.sendEncrypted();
+ if(encrypt)
+ {
+ MessageEncryptionHelper encryptionHelper =
getSession().getMessageEncryptionHelper();
+ try
+ {
+ MessageProperties origMessageProps = messageProps;
+ DeliveryProperties origDeliveryProps = deliveryProp;
+ messageProps = new MessageProperties(messageProps);
+ deliveryProp = new DeliveryProperties(deliveryProp);
+ SecretKeySpec secretKey = encryptionHelper.createSecretKey();
+
+ final Map<String, Object> origApplicationHeaders =
origMessageProps.getApplicationHeaders();
+ if(origApplicationHeaders != null)
+ {
+
origApplicationHeaders.remove(MessageEncryptionHelper.ENCRYPT_HEADER);
+ }
+
+ String recipientString =
message.getStringProperty(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER);
+ if(recipientString == null)
+ {
+ recipientString = destination.getEncryptedRecipients();
+ }
+ if(origApplicationHeaders != null)
+ {
+
origApplicationHeaders.remove(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER);
+ }
+
+ String unencryptedProperties =
message.getStringProperty(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER);
+ if(origApplicationHeaders != null)
+ {
+
origApplicationHeaders.remove(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER);
+ }
+
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeStruct32(origDeliveryProps);
+ encoder.writeStruct32(origMessageProps);
+ ByteBuffer buf = encoder.buffer();
+
+ final int headerLength = buf.remaining();
+ byte[] unencryptedBytes = new byte[headerLength + (data ==
null ? 0 : data.remaining())];
+ BytesDataOutput output = new BytesDataOutput(unencryptedBytes);
+
+ output.write(buf.array(), buf.arrayOffset()+buf.position(),
buf.remaining());
+
+ if (data != null)
+ {
+ data.get(unencryptedBytes, headerLength, data.remaining());
+ }
+
+ byte[] ivbytes = encryptionHelper.getInitialisationVector();
+
+ byte[] encryptedBytes = encryptionHelper.encrypt(secretKey,
unencryptedBytes, ivbytes);
+ data = ByteBuffer.wrap(encryptedBytes);
+
+ if (recipientString == null)
+ {
+ throw new JMSException("When sending an encrypted message,
recipients must be supplied");
+ }
+ String[] recipients = recipientString.split(";");
+ List<List<Object>> encryptedKeys = new ArrayList<>();
+ for(MessageEncryptionHelper.KeyTransportRecipientInfo info :
encryptionHelper.getKeyTransportRecipientInfo(
+ Arrays.asList(recipients), secretKey))
+ {
+ encryptedKeys.add(info.asList());
+ }
+
+ Map<String,Object> newHeaders =
messageProps.getApplicationHeaders();
+ if(newHeaders != null)
+ {
+ newHeaders.clear();
+ }
+ else
+ {
+ newHeaders = new LinkedHashMap<>();
+ messageProps.setApplicationHeaders(newHeaders);
+ }
+
+ if(unencryptedProperties != null)
+ {
+ List<String> unencryptedPropertyNames =
Arrays.asList(unencryptedProperties.split(" *; *"));
+ for (String propertyName : unencryptedPropertyNames)
+ {
+ if (origApplicationHeaders.containsKey(propertyName))
+ {
+ newHeaders.put(propertyName,
origApplicationHeaders.get(propertyName));
+ }
+ }
+ }
+
+
newHeaders.put(MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY, encryptedKeys);
+
newHeaders.put(MessageEncryptionHelper.ENCRYPTION_ALGORITHM_PROPERTY,
+
encryptionHelper.getMessageEncryptionCipherName());
+
newHeaders.put(MessageEncryptionHelper.KEY_INIT_VECTOR_PROPERTY, ivbytes);
+
messageProps.setContentType(Encrypted010MessageFactory.ENCRYPTED_0_10_CONTENT_TYPE);
+
+ }
+ catch (GeneralSecurityException | IOException e)
+ {
+ throw JMSExceptionHelper.chainJMSException(new
JMSException("Unexpected Exception while encrypting message"), e);
+ }
- if(data != null
- && data.remaining() >
getConnection().getMessageCompressionThresholdSize()
- && getConnection().getDelegate().isMessageCompressionSupported()
- && getConnection().isMessageCompressionDesired()
- && messageProps.getContentEncoding() == null)
+ }
+ else
{
- byte[] compressed = GZIPUtils.compressBufferToArray(data);
- if(compressed != null)
+ if (data != null
+ && data.remaining() >
getConnection().getMessageCompressionThresholdSize()
+ &&
getConnection().getDelegate().isMessageCompressionSupported()
+ && getConnection().isMessageCompressionDesired()
+ && messageProps.getContentEncoding() == null)
{
-
messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
- data = ByteBuffer.wrap(compressed);
+ byte[] compressed = GZIPUtils.compressBufferToArray(data);
+ if (compressed != null)
+ {
+
messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
+ data = ByteBuffer.wrap(compressed);
+ }
}
}
-
messageProps.setContentLength(data == null ? 0 : data.remaining());
// send the message
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
Sun Jun 7 21:02:02 2015
@@ -20,9 +20,15 @@
*/
package org.apache.qpid.client;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.UUID;
+import javax.crypto.spec.SecretKeySpec;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
@@ -35,6 +41,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.Encrypted091MessageFactory;
+import org.apache.qpid.client.message.MessageEncryptionHelper;
import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
@@ -52,6 +60,7 @@ import org.apache.qpid.framing.ContentHe
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.util.BytesDataOutput;
import org.apache.qpid.util.GZIPUtils;
public class BasicMessageProducer_0_8 extends BasicMessageProducer
@@ -104,7 +113,6 @@ public class BasicMessageProducer_0_8 ex
{
-
AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8)
message.getDelegate();
BasicContentHeaderProperties contentHeaderProperties =
delegate.getContentHeaderProperties();
@@ -125,15 +133,15 @@ public class BasicMessageProducer_0_8 ex
if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
{
- routingKey =
headers.getString(QpidMessageProperties.QPID_SUBJECT);
+ routingKey =
headers.getString(QpidMessageProperties.QPID_SUBJECT);
}
}
BasicPublishBody body =
getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
-
destination.getExchangeName(),
-
routingKey,
-
mandatory,
-
immediate);
+
destination.getExchangeName(),
+
routingKey,
+
mandatory,
+
immediate);
AMQFrame publishFrame = body.generateFrame(getChannelId());
@@ -158,7 +166,9 @@ public class BasicMessageProducer_0_8 ex
}
//Set JMS_QPID_DESTTYPE
-
delegate.getContentHeaderProperties().getHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(),
type);
+ delegate.getContentHeaderProperties()
+ .getHeaders()
+
.setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
if (!isDisableTimestamps())
{
@@ -167,7 +177,7 @@ public class BasicMessageProducer_0_8 ex
if (timeToLive > 0)
{
- if(!SET_EXPIRATION_AS_TTL)
+ if (!SET_EXPIRATION_AS_TTL)
{
//default behaviour used by Qpid
contentHeaderProperties.setExpiration(currentTime +
timeToLive);
@@ -188,37 +198,121 @@ public class BasicMessageProducer_0_8 ex
contentHeaderProperties.setPriority((byte) priority);
int size = (payload != null) ? payload.remaining() : 0;
+ AMQFrame contentHeaderFrame;
+ final AMQFrame[] frames;
+ boolean encrypt =
message.getBooleanProperty(MessageEncryptionHelper.ENCRYPT_HEADER) ||
destination.sendEncrypted();
+ if(encrypt)
+ {
+ MessageEncryptionHelper encryptionHelper =
getSession().getMessageEncryptionHelper();
+ try
+ {
+ SecretKeySpec secretKey = encryptionHelper.createSecretKey();
+
+
contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.ENCRYPT_HEADER);
+
+ String recipientString =
message.getStringProperty(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER);
+ if(recipientString == null)
+ {
+ recipientString = destination.getEncryptedRecipients();
+ }
+
contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.ENCRYPT_RECIPIENTS_HEADER);
+
+ String unencryptedProperties =
message.getStringProperty(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER);
+
contentHeaderProperties.getHeaders().remove(MessageEncryptionHelper.UNENCRYPTED_PROPERTIES_HEADER);
+
+ final int headerLength =
contentHeaderProperties.getPropertyListSize() + 2;
+ byte[] unencryptedBytes = new byte[headerLength + size];
+ BytesDataOutput output = new BytesDataOutput(unencryptedBytes);
+ output.writeShort((short)
(contentHeaderProperties.getPropertyFlags() & 0xffff));
+ contentHeaderProperties.writePropertyListPayload(output);
+
+ if (size != 0)
+ {
+ payload.get(unencryptedBytes, headerLength,
payload.remaining());
+ }
+
+ byte[] ivbytes = encryptionHelper.getInitialisationVector();
+
+ byte[] encryptedBytes = encryptionHelper.encrypt(secretKey,
unencryptedBytes, ivbytes);
+ payload = ByteBuffer.wrap(encryptedBytes);
+
+ if (recipientString == null)
+ {
+ throw new JMSException("When sending an encrypted message,
recipients must be supplied");
+ }
+ String[] recipients = recipientString.split(";");
+ List<List<Object>> encryptedKeys = new ArrayList<>();
+ for(MessageEncryptionHelper.KeyTransportRecipientInfo info :
encryptionHelper.getKeyTransportRecipientInfo(Arrays.asList(recipients),
secretKey))
+ {
+ encryptedKeys.add(info.asList());
+ }
- byte[] compressed;
- if(size > getConnection().getMessageCompressionThresholdSize()
- && getConnection().getDelegate().isMessageCompressionSupported()
- && getConnection().isMessageCompressionDesired()
- && contentHeaderProperties.getEncoding() == null
- && (compressed = GZIPUtils.compressBufferToArray(payload)) !=
null)
- {
- contentHeaderProperties.setEncoding("gzip");
- payload = ByteBuffer.wrap(compressed);
- size = compressed.length;
+ BasicContentHeaderProperties oldProps =
contentHeaderProperties;
+ contentHeaderProperties = new
BasicContentHeaderProperties(oldProps);
+ final FieldTable oldHeaders = oldProps.getHeaders();
+ final FieldTable newHeaders =
contentHeaderProperties.getHeaders();
+ newHeaders.clear();
+
+ if(unencryptedProperties != null)
+ {
+ List<String> unencryptedPropertyNames =
Arrays.asList(unencryptedProperties.split(" *; *"));
+ for (String propertyName : unencryptedPropertyNames)
+ {
+ if (oldHeaders.propertyExists(propertyName))
+ {
+ newHeaders.setObject(propertyName,
oldHeaders.get(propertyName));
+ }
+ }
+ }
+
+
newHeaders.setObject(MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY,
encryptedKeys);
+
newHeaders.setString(MessageEncryptionHelper.ENCRYPTION_ALGORITHM_PROPERTY,
+
encryptionHelper.getMessageEncryptionCipherName());
+
newHeaders.setBytes(MessageEncryptionHelper.KEY_INIT_VECTOR_PROPERTY, ivbytes);
+
contentHeaderProperties.setContentType(Encrypted091MessageFactory.ENCRYPTED_0_9_1_CONTENT_TYPE);
+ size = encryptedBytes.length;
+
+ }
+ catch (GeneralSecurityException | IOException e)
+ {
+ throw JMSExceptionHelper.chainJMSException(new
JMSException("Unexpected Exception while encrypting message"), e);
+ }
}
+ else
+ {
+ byte[] compressed;
+ if (size > getConnection().getMessageCompressionThresholdSize()
+ &&
getConnection().getDelegate().isMessageCompressionSupported()
+ && getConnection().isMessageCompressionDesired()
+ && contentHeaderProperties.getEncoding() == null
+ && (compressed = GZIPUtils.compressBufferToArray(payload)) !=
null)
+ {
+ contentHeaderProperties.setEncoding("gzip");
+ payload = ByteBuffer.wrap(compressed);
+ size = compressed.length;
+
+ }
+ }
final int contentBodyFrameCount =
calculateContentBodyFrameCount(payload);
- final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+ frames = new AMQFrame[2 + contentBodyFrameCount];
if (payload != null)
{
createContentBodies(payload, frames, 2, getChannelId());
}
- if ((contentBodyFrameCount != 0) && getLogger().isDebugEnabled())
+ contentHeaderFrame =
+ ContentHeaderBody.createAMQFrame(getChannelId(),
+ contentHeaderProperties,
size);
+
+
+ if (getLogger().isDebugEnabled())
{
- getLogger().debug("Sending content body frames to " + destination);
+ getLogger().debug("Sending " + (frames.length-2) + " content body
frames to " + destination);
}
-
- AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(getChannelId(),
- contentHeaderProperties, size);
- if(contentHeaderFrame.getSize() >
getSession().getAMQConnection().getMaximumFrameSize())
+ if (contentHeaderFrame.getSize() >
getSession().getAMQConnection().getMaximumFrameSize())
{
throw new JMSException("Unable to send message as the headers are
too large ("
+ contentHeaderFrame.getSize()
@@ -231,6 +325,7 @@ public class BasicMessageProducer_0_8 ex
getLogger().debug("Sending content header frame to " +
destination);
}
+
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
final CompositeAMQDataBlock compositeFrame = new
CompositeAMQDataBlock(frames);
Copied:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BrokerDetails.java
(from r1684077,
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java)
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BrokerDetails.java?p2=qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BrokerDetails.java&p1=qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java&r1=1684077&r2=1684078&rev=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BrokerDetails.java
Sun Jun 7 21:02:02 2015
@@ -27,32 +27,80 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.url.URLHelper;
import org.apache.qpid.url.URLSyntaxException;
-public class AMQBrokerDetails implements BrokerDetails, Serializable
+public class BrokerDetails implements Serializable
{
- private static final long serialVersionUID = 8450786374975932890L;
-
+ /*
+ * Known URL Options
+ * @see ConnectionURL
+ */
+ public static final String OPTIONS_RETRY = "retries";
+ public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
+ public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
+ public static final String OPTIONS_HEARTBEAT = "heartbeat";
+ @Deprecated
+ public static final String OPTIONS_IDLE_TIMEOUT = "idle_timeout";
+ public static final String OPTIONS_SASL_MECHS = "sasl_mechs";
+ public static final String OPTIONS_SASL_ENCRYPTION = "sasl_encryption";
+ public static final String OPTIONS_SSL = "ssl";
+ public static final String OPTIONS_TCP_NO_DELAY = "tcp_nodelay";
+ public static final String OPTIONS_SASL_PROTOCOL_NAME = "sasl_protocol";
+ public static final String OPTIONS_SASL_SERVER_NAME = "sasl_server";
+ public static final String OPTIONS_TRUST_STORE = "trust_store";
+ public static final String OPTIONS_TRUST_STORE_PASSWORD =
"trust_store_password";
+ public static final String OPTIONS_KEY_STORE = "key_store";
+ public static final String OPTIONS_KEY_STORE_PASSWORD =
"key_store_password";
+ public static final String OPTIONS_SSL_VERIFY_HOSTNAME =
"ssl_verify_hostname";
+ public static final String OPTIONS_SSL_CERT_ALIAS = "ssl_cert_alias";
+ public static final String OPTIONS_CLIENT_CERT_PRIV_KEY_PATH =
"client_cert_priv_key_path";
+ public static final String OPTIONS_CLIENT_CERT_PATH = "client_cert_path";
+ public static final String OPTIONS_CLIENT_CERT_INTERMEDIARY_CERT_PATH =
"client_cert_intermediary_cert_path" ;
+ public static final String OPTIONS_TRUSTED_CERTIFICATES_PATH =
"trusted_certs_path";
+
+ public static final String OPTIONS_ENCRYPTION_TRUST_STORE =
"encryption_trust_store";
+ public static final String OPTIONS_ENCRYPTION_TRUST_STORE_PASSWORD =
"encryption_trust_store_password";
+ public static final String OPTIONS_ENCRYPTION_REMOTE_TRUST_STORE =
"encryption_remote_trust_store";
+ public static final String OPTIONS_ENCRYPTION_KEY_STORE =
"encryption_key_store";
+ public static final String OPTIONS_ENCRYPTION_KEY_STORE_PASSWORD =
"encryption_key_store_password";
+
+
+ public static final int DEFAULT_PORT = 5672;
+ public static final String TCP = "tcp";
+ public static final String DEFAULT_TRANSPORT = BrokerDetails.TCP;
+ public static final String URL_FORMAT_EXAMPLE =
+ "<transport>://<hostname>[:<port Default=\"" +
BrokerDetails.DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]";
+ public static final int DEFAULT_CONNECT_TIMEOUT = 30000;
+ public static final boolean USE_SSL_DEFAULT = false;
+ // pulled these properties from the new BrokerDetails class in the qpid
package
+ public static final String PROTOCOL_TCP = "tcp";
+ public static final String PROTOCOL_TLS = "tls";
+ public static final String VIRTUAL_HOST = "virtualhost";
+ public static final String CLIENT_ID = "client_id";
+ public static final String USERNAME = "username";
+ public static final String PASSWORD = "password";
+ private static final long serialVersionUID = 8762219750300869355L;
private String _host;
private int _port;
private String _transport;
private Map<String, String> _options = new HashMap<String, String>();
+ private AMQConnectionURL _connectionUrl;
- public AMQBrokerDetails(BrokerDetails details)
+ public BrokerDetails(BrokerDetails details)
{
_host = details.getHost();
_port = details.getPort();
_transport = details.getTransport();
- _options = new HashMap<>(details.getProperties());
+ _options = new HashMap<>(details._options);
+ _connectionUrl = details._connectionUrl;
}
- public AMQBrokerDetails(){}
+ public BrokerDetails(){}
- public AMQBrokerDetails(String url) throws URLSyntaxException
+ public BrokerDetails(String url) throws URLSyntaxException
{
// URL should be of format
tcp://host:port?option='value',option='value'
@@ -210,7 +258,7 @@ public class AMQBrokerDetails implements
}
}
- public AMQBrokerDetails(String host, int port)
+ public BrokerDetails(String host, int port)
{
_host = host;
_port = port;
@@ -249,7 +297,12 @@ public class AMQBrokerDetails implements
public String getProperty(String key)
{
- return _options.get(key);
+ String value = _options.get(key);
+ if(value == null && _connectionUrl != null)
+ {
+ value = _connectionUrl.getOption(key);
+ }
+ return value;
}
public void setProperty(String key, String value)
@@ -402,16 +455,6 @@ public class AMQBrokerDetails implements
}
}
- public Map<String, String> getProperties()
- {
- return _options;
- }
-
- public void setProperties(Map<String, String> props)
- {
- _options = props;
- }
-
public ConnectionSettings buildConnectionSettings()
{
ConnectionSettings conSettings = new ConnectionSettings();
@@ -508,6 +551,42 @@ public class AMQBrokerDetails implements
String.valueOf(ClientProperties.DEFAULT_CONNECTION_OPTION_SSL_VERIFY_HOST_NAME)));
conSettings.setVerifyHostname(getBooleanProperty(BrokerDetails.OPTIONS_SSL_VERIFY_HOSTNAME,
defaultSSLVerifyHostName ));
+ // ----------------------------
+
+ if (getProperty(BrokerDetails.OPTIONS_ENCRYPTION_KEY_STORE) != null)
+ {
+ conSettings.setEncryptionKeyStorePath(
+ getProperty(BrokerDetails.OPTIONS_ENCRYPTION_KEY_STORE));
+ }
+
+ if (getProperty(BrokerDetails.OPTIONS_ENCRYPTION_KEY_STORE_PASSWORD)
!= null)
+ {
+ conSettings.setEncryptionKeyStorePassword(
+
getProperty(BrokerDetails.OPTIONS_ENCRYPTION_KEY_STORE_PASSWORD));
+ }
+
+
+ if (getProperty(BrokerDetails.OPTIONS_ENCRYPTION_TRUST_STORE) != null)
+ {
+ conSettings.setEncryptionTrustStorePath(
+ getProperty(BrokerDetails.OPTIONS_ENCRYPTION_TRUST_STORE));
+ }
+
+ if (getProperty(BrokerDetails.OPTIONS_ENCRYPTION_TRUST_STORE_PASSWORD)
!= null)
+ {
+ conSettings.setEncryptionKeyStorePassword(
+
getProperty(BrokerDetails.OPTIONS_ENCRYPTION_TRUST_STORE_PASSWORD));
+ }
+
+
+ if (getProperty(BrokerDetails.OPTIONS_ENCRYPTION_REMOTE_TRUST_STORE)
!= null)
+ {
+ conSettings.setEncryptionRemoteTrustStoreName(
+
getProperty(BrokerDetails.OPTIONS_ENCRYPTION_REMOTE_TRUST_STORE));
+ }
+
+ // ----------------------------
+
if (getProperty(BrokerDetails.OPTIONS_TCP_NO_DELAY) != null)
{
conSettings.setTcpNodelay(
@@ -528,4 +607,8 @@ public class AMQBrokerDetails implements
return conSettings;
}
+ public void setConnectionUrl(final AMQConnectionURL connectionUrl)
+ {
+ _connectionUrl = connectionUrl;
+ }
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
Sun Jun 7 21:02:02 2015
@@ -59,20 +59,19 @@ public class XASessionImpl extends AMQSe
public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection,
AMQConnection con, int channelId,
int defaultPrefetchHigh, int defaultPrefetchLow)
{
- this(qpidConnection, con, channelId, false, Session.AUTO_ACKNOWLEDGE,
- MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
defaultPrefetchLow, null);
+ this(qpidConnection, con, channelId, false, Session.AUTO_ACKNOWLEDGE,
defaultPrefetchHigh, defaultPrefetchLow, null);
}
public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection,
AMQConnection con, int channelId,
int ackMode, int defaultPrefetchHigh, int defaultPrefetchLow)
{
- this(qpidConnection, con, channelId, false, ackMode,
MessageFactoryRegistry.newDefaultRegistry(),
+ this(qpidConnection, con, channelId, false, ackMode,
defaultPrefetchHigh, defaultPrefetchLow, null);
}
public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection,
AMQConnection con, int channelId,
- boolean transacted, int ackMode, MessageFactoryRegistry
registry, int defaultPrefetchHigh, int defaultPrefetchLow,
+ boolean transacted, int ackMode, int defaultPrefetchHigh, int
defaultPrefetchLow,
String name)
{
super(qpidConnection,
@@ -80,7 +79,6 @@ public class XASessionImpl extends AMQSe
channelId,
transacted,
ackMode,
- registry,
defaultPrefetchHigh,
defaultPrefetchLow,
name);
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
Sun Jun 7 21:02:02 2015
@@ -52,7 +52,7 @@ public interface AMQMessageDelegate
void setJMSReplyTo(Destination destination) throws JMSException;
- Destination getJMSDestination() throws JMSException;
+ Destination getJMSDestination();
int getJMSDeliveryMode() throws JMSException;
@@ -134,4 +134,5 @@ public interface AMQMessageDelegate
long getDeliveryTag();
void setJMSMessageID(final UUID messageId) throws JMSException;
+
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
Sun Jun 7 21:02:02 2015
@@ -337,7 +337,7 @@ public class AMQMessageDelegate_0_10 ext
_messageProps.setReplyTo(replyTo);
}
- public Destination getJMSDestination() throws JMSException
+ public Destination getJMSDestination()
{
return _destination;
}
@@ -995,4 +995,16 @@ public class AMQMessageDelegate_0_10 ext
{
return _deliveryProps;
}
+
+ @Override
+ Object getProperty(final String name)
+ {
+ return _messageProps.getApplicationHeaders().get(name);
+ }
+
+ @Override
+ boolean hasProperty(final String name)
+ {
+ return _messageProps.getApplicationHeaders().containsKey(name);
+ }
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
Sun Jun 7 21:02:02 2015
@@ -78,7 +78,7 @@ public class AMQMessageDelegate_0_8 exte
private BasicContentHeaderProperties _contentHeaderProperties;
// The base set of items that needs to be set.
- private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties,
long deliveryTag)
+ public AMQMessageDelegate_0_8(BasicContentHeaderProperties properties,
long deliveryTag)
{
super(deliveryTag);
_contentHeaderProperties = properties;
@@ -333,7 +333,7 @@ public class AMQMessageDelegate_0_8 exte
getContentHeaderProperties().setReplyTo(encodedDestination);
}
- public Destination getJMSDestination() throws JMSException
+ public Destination getJMSDestination()
{
return _destination;
}
@@ -631,6 +631,18 @@ public class AMQMessageDelegate_0_8 exte
_readableProperties = false;
}
+ @Override
+ Object getProperty(final String name)
+ {
+ return getContentHeaderProperties().getHeaders().get(name);
+ }
+
+ @Override
+ boolean hasProperty(final String name)
+ {
+ return getContentHeaderProperties().getHeaders().containsKey(name);
+ }
+
private static class DefaultRouterDestination extends AMQDestination
implements Queue
{
private static final long serialVersionUID = -5042408431861384536L;
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java
Sun Jun 7 21:02:02 2015
@@ -23,22 +23,16 @@ package org.apache.qpid.client.message;
import org.apache.qpid.AMQException;
-import javax.jms.JMSException;
import java.nio.ByteBuffer;
public class AMQPEncodedListMessageFactory extends AbstractJMSMessageFactory
{
@Override
- protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate,
+ protected AbstractJMSMessage createMessage(AbstractAMQMessageDelegate
delegate,
ByteBuffer data) throws AMQException
{
return new AMQPEncodedListMessage(delegate,data);
}
- public AbstractJMSMessage createMessage(
- AMQMessageDelegateFactory delegateFactory) throws JMSException
- {
- return new AMQPEncodedListMessage(delegateFactory);
- }
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
Sun Jun 7 21:02:02 2015
@@ -23,24 +23,17 @@ package org.apache.qpid.client.message;
import org.apache.qpid.AMQException;
-import javax.jms.JMSException;
import java.nio.ByteBuffer;
public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory
{
@Override
- protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate,
+ protected AbstractJMSMessage createMessage(AbstractAMQMessageDelegate
delegate,
ByteBuffer data) throws AMQException
{
return new AMQPEncodedMapMessage(delegate,data);
}
- public AbstractJMSMessage createMessage(
- AMQMessageDelegateFactory delegateFactory) throws JMSException
- {
- return new AMQPEncodedMapMessage(delegateFactory);
- }
-
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
Sun Jun 7 21:02:02 2015
@@ -305,6 +305,9 @@ public abstract class AbstractAMQMessage
return generateDestination(exchange, routingKey);
}
}
+
+ abstract Object getProperty(String name);
+ abstract boolean hasProperty(String name);
}
class ExchangeInfo
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1684078&r1=1684077&r2=1684078&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
Sun Jun 7 21:02:02 2015
@@ -42,7 +42,7 @@ import org.apache.qpid.transport.Deliver
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.util.GZIPUtils;
-public abstract class AbstractJMSMessageFactory implements MessageFactory
+public abstract class AbstractJMSMessageFactory
{
private static final Logger _logger =
LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
@@ -117,15 +117,15 @@ public abstract class AbstractJMSMessage
.remaining());
}
- AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
-
contentHeader.getProperties(),
- exchange,
routingKey, queueDestinationCache,
-
topicDestinationCache, addressType);
+ AMQMessageDelegate_0_8 delegate = new
AMQMessageDelegate_0_8(messageNbr,
+
contentHeader.getProperties(),
+ exchange,
routingKey, queueDestinationCache,
+
topicDestinationCache, addressType);
return createMessage(delegate, data);
}
- protected abstract AbstractJMSMessage createMessage(AMQMessageDelegate
delegate, ByteBuffer data) throws AMQException;
+ protected abstract AbstractJMSMessage
createMessage(AbstractAMQMessageDelegate delegate, ByteBuffer data) throws
AMQException;
protected AbstractJMSMessage create010MessageWithBody(long messageNbr,
MessageProperties msgProps,
@@ -159,13 +159,12 @@ public abstract class AbstractJMSMessage
data = ByteBuffer.wrap(uncompressed);
}
}
- AMQMessageDelegate delegate = new AMQMessageDelegate_0_10(msgProps,
deliveryProps, messageNbr);
+ AMQMessageDelegate_0_10 delegate = new
AMQMessageDelegate_0_10(msgProps, deliveryProps, messageNbr);
AbstractJMSMessage message = createMessage(delegate, data);
return message;
}
- @Override
public AbstractJMSMessage createMessage(long messageNbr, boolean
redelivered, ContentHeaderBody contentHeader,
String exchange, String
routingKey, List bodies,
AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
Added:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted010MessageFactory.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted010MessageFactory.java?rev=1684078&view=auto
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted010MessageFactory.java
(added)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted010MessageFactory.java
Sun Jun 7 21:02:02 2015
@@ -0,0 +1,265 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.security.PrivateKey;
+import java.util.Collection;
+import java.util.Iterator;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import javax.security.auth.x500.X500Principal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.codec.BBDecoder;
+
+public class Encrypted010MessageFactory extends AbstractJMSMessageFactory
+{
+ public static final String ENCRYPTED_0_10_CONTENT_TYPE =
"application/qpid-0-10-encrypted";
+ private final MessageFactoryRegistry _messageFactoryRegistry;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(Encrypted010MessageFactory.class);
+
+ public Encrypted010MessageFactory(final MessageFactoryRegistry
messageFactoryRegistry)
+ {
+ _messageFactoryRegistry = messageFactoryRegistry;
+ }
+
+ @Override
+ protected AbstractJMSMessage createMessage(final
AbstractAMQMessageDelegate delegate, final ByteBuffer data)
+ throws AMQException
+ {
+ SecretKeySpec secretKeySpec;
+ String algorithm;
+ byte[] initVector;
+ try
+ {
+
+
+ try
+ {
+ if
(delegate.hasProperty(MessageEncryptionHelper.ENCRYPTION_ALGORITHM_PROPERTY))
+ {
+ algorithm =
delegate.getProperty(MessageEncryptionHelper.ENCRYPTION_ALGORITHM_PROPERTY).toString();
+
+ if
(delegate.hasProperty(MessageEncryptionHelper.KEY_INIT_VECTOR_PROPERTY))
+ {
+ Object ivObj =
delegate.getProperty(MessageEncryptionHelper.KEY_INIT_VECTOR_PROPERTY);
+ if (ivObj instanceof byte[])
+ {
+ initVector = (byte[]) ivObj;
+ }
+ else
+ {
+ throw new AMQException("If the property '"
+ +
MessageEncryptionHelper.KEY_INIT_VECTOR_PROPERTY
+ + "' is present, it must
contain a byte array");
+ }
+ }
+ else
+ {
+ initVector = null;
+ }
+ if
(delegate.hasProperty(MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY))
+ {
+ Object keyInfoObj =
delegate.getProperty(MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY);
+ if (keyInfoObj instanceof Collection)
+ {
+ secretKeySpec =
getContentEncryptionKey((Collection) keyInfoObj,
+ algorithm,
+
_messageFactoryRegistry.getSession());
+ if (secretKeySpec == null)
+ {
+ throw new AMQException("Could not locate key
information to decrypt the message");
+ }
+ }
+ else
+ {
+ throw new AMQException("An encrypted message must
contain the property '"
+ +
MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY
+ + "'");
+ }
+ }
+ else
+ {
+ throw new AMQException("An encrypted message must
contain the property '"
+ +
MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY
+ + "'");
+ }
+
+ }
+ else
+ {
+ throw new AMQException("Encrypted message must carry the
encryption algorithm in the property '"
+ +
MessageEncryptionHelper.ENCRYPTED_KEYS_PROPERTY
+ + "'");
+ }
+
+ Cipher cipher = Cipher.getInstance(algorithm);
+ cipher.init(Cipher.DECRYPT_MODE, secretKeySpec, new
IvParameterSpec(initVector));
+ byte[] encryptedData;
+ int offset;
+ int length;
+ if (data.hasArray())
+ {
+ encryptedData = data.array();
+ offset = data.arrayOffset() + data.position();
+ length = data.remaining();
+ }
+ else
+ {
+ encryptedData = new byte[data.remaining()];
+ data.duplicate().get(encryptedData);
+ offset = 0;
+ length = encryptedData.length;
+ }
+ final byte[] unencryptedBytes = decryptData(cipher,
encryptedData, offset, length);
+ ByteBuffer buf = ByteBuffer.wrap(unencryptedBytes);
+
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(buf);
+ DeliveryProperties deliveryProperties = (DeliveryProperties)
decoder.readStruct32();
+ MessageProperties messageProperties = (MessageProperties)
decoder.readStruct32();
+
+ int payloadOffset = buf.position();
+
+ final ByteBuffer unencryptedData =
+ ByteBuffer.wrap(unencryptedBytes, payloadOffset,
unencryptedBytes.length - payloadOffset);
+
+ final AbstractAMQMessageDelegate newDelegate =
+ new AMQMessageDelegate_0_10(messageProperties,
deliveryProperties, delegate.getDeliveryTag());
+ newDelegate.setJMSDestination(delegate.getJMSDestination());
+
+
+ final AbstractJMSMessageFactory unencryptedMessageFactory =
+
_messageFactoryRegistry.getMessageFactory(messageProperties.getContentType());
+
+ return unencryptedMessageFactory.createMessage(newDelegate,
unencryptedData);
+ }
+ catch (GeneralSecurityException | IOException e)
+ {
+ throw new AMQException("Could not decode encrypted message",
e);
+
+ }
+
+ }
+ catch(AMQException e)
+ {
+ LOGGER.error("Error when attempting to decrypt message " +
delegate.getDeliveryTag() + " to address ("+delegate.getJMSDestination()+").
Message will be delivered to the client encrypted", e);
+ return
_messageFactoryRegistry.getDefaultFactory().createMessage(delegate, data);
+ }
+ }
+
+ private byte[] decryptData(final Cipher cipher, final byte[]
encryptedData, final int offset, final int length)
+ throws IOException
+ {
+ final byte[] unencryptedBytes;
+ try (CipherInputStream cipherInputStream = new CipherInputStream(new
ByteArrayInputStream(encryptedData,
+
offset,
+
length), cipher))
+ {
+ byte[] buf = new byte[512];
+ int pos = 0;
+ int read;
+ while ((read = cipherInputStream.read(buf, pos, buf.length - pos))
!= -1)
+ {
+ pos += read;
+ if (pos == buf.length)
+ {
+ byte[] tmp = buf;
+ buf = new byte[buf.length + 512];
+ System.arraycopy(tmp, 0, buf, 0, tmp.length);
+ }
+ }
+ unencryptedBytes= new byte[pos];
+ System.arraycopy(buf, 0, unencryptedBytes, 0, pos);
+ }
+ return unencryptedBytes;
+ }
+
+ private SecretKeySpec getContentEncryptionKey(final Collection
keyInfoObjList,
+ final String algorithm,
+ final AMQSession<?, ?>
session)
+ throws AMQException, GeneralSecurityException, IOException
+ {
+
+ for(Object keyInfoObject : keyInfoObjList)
+ {
+ try
+ {
+ Iterator iter = ((Collection)keyInfoObject).iterator();
+
+ int type = ((Number)iter.next()).intValue();
+ switch(type)
+ {
+ case 1:
+ String keyEncryptionAlgorithm = (String) iter.next();
+ X500Principal issuer = new
X500Principal((String)iter.next());
+ BigInteger serialNumber = new
BigInteger((String)iter.next());
+ byte[] encryptedKey = (byte[])iter.next();
+
+ PrivateKey privateKey = getPrivateKey(session, issuer,
serialNumber);
+ if(privateKey != null)
+ {
+ Cipher cipher =
Cipher.getInstance(keyEncryptionAlgorithm);
+ cipher.init(Cipher.DECRYPT_MODE, privateKey);
+ byte[] decryptedData = decryptData(cipher,
encryptedKey, 0, encryptedKey.length);
+ SecretKeySpec keySpec = new
SecretKeySpec(decryptedData, algorithm.split("/")[0]);
+ return keySpec;
+ }
+ break;
+ default:
+ throw new AMQException("Invalid format of
'x-qpid-encrypted-keys' - unknown key info type: " + type);
+
+ }
+ }
+ catch(ClassCastException e)
+ {
+ throw new AMQException("Invalid format of
'x-qpid-encrypted-keys'");
+ }
+ }
+ return null;
+ }
+
+ private PrivateKey getPrivateKey(final AMQSession<?, ?> session,
+ final X500Principal issuer,
+ final BigInteger serialNumber)
+ throws GeneralSecurityException, IOException
+ {
+ return
session.getMessageEncryptionHelper().getEncryptionPrivateKey(issuer,
serialNumber);
+ }
+
+}
Propchange:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted010MessageFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]