Author: rgodfrey
Date: Mon Jan 13 17:17:50 2014
New Revision: 1557775

URL: http://svn.apache.org/r1557775
Log:
QPID-5475 : [Java Broker] Add test for REST api client cert auth

Added:
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestHttpsClientCertAuthTest.java
Modified:
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java?rev=1557775&r1=1557774&r2=1557775&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
 Mon Jan 13 17:17:50 2014
@@ -26,6 +26,8 @@ import java.net.URL;
 import java.net.URLConnection;
 import java.net.URLDecoder;
 import java.net.URLStreamHandler;
+import java.util.HashMap;
+import java.util.Map;
 import javax.jms.JMSException;
 import javax.jms.QueueConnection;
 import javax.jms.QueueConnectionFactory;
@@ -50,6 +52,7 @@ public class ConnectionFactoryImpl imple
     private boolean _useBinaryMessageId = 
Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
     private boolean _syncPublish = 
Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false"));
     private int _maxSessions = Integer.getInteger("qpid.max_sessions", 0);
+    private int _maxPrefetch;
 
 
     public ConnectionFactoryImpl(final String host,
@@ -135,117 +138,200 @@ public class ConnectionFactoryImpl imple
         connection.setTopicPrefix(_topicPrefix);
         connection.setUseBinaryMessageId(_useBinaryMessageId);
         connection.setSyncPublish(_syncPublish);
+        if(_maxPrefetch != 0)
+        {
+            connection.setMaxPrefetch(_maxPrefetch);
+        }
         return connection;
     }
 
-    public static ConnectionFactoryImpl createFromURL(final String urlString) 
throws MalformedURLException
+    public void setMaxPrefetch(final int maxPrefetch)
     {
-        URL url = new URL(null, urlString, new URLStreamHandler()
-                    {
-                        @Override
-                        protected URLConnection openConnection(URL u) throws 
IOException
-                        {
-                            throw new UnsupportedOperationException();
-                        }
-                    });
-        String protocol = url.getProtocol();
-        if(protocol == null || "".equals(protocol))
-        {
-            protocol = "amqp";
-        }
-/*
-        else if(!protocol.equals("amqp") && !protocol.equals("amqps"))
-        {
-            throw new MalformedURLException("Protocol '"+protocol+"' unknown. 
Must be one of 'amqp' or 'amqps'.");
-        }
-*/
-        String host = url.getHost();
-        int port = url.getPort();
+        _maxPrefetch = maxPrefetch;
+    }
 
-        boolean ssl = false;
+    private static class ConnectionOptions
+    {
+        String username;
+        String password;
+        String clientId;
+        String remoteHost;
 
-        if(port == -1)
-        {
-            if("amqps".equals(protocol))
-            {
-                port = 5671;
-                ssl = true;
-            }
-            else
-            {
-                port = 5672;
-            }
-        }
-        else if("amqps".equals(protocol))
+        boolean binaryMessageId = true;
+        boolean syncPublish;
+        int maxSessions;
+        public boolean ssl;
+        public int maxPrefetch;
+    }
+
+
+
+    private static abstract class OptionSetter
+    {
+
+        private static final Map<String, OptionSetter> OPTION_SETTER_MAP = new 
HashMap<String, OptionSetter>();
+        private final String _name;
+        private final String _description;
+
+        public OptionSetter(String name, String description)
         {
-            ssl = true;
+            OPTION_SETTER_MAP.put(name.toLowerCase(), this);
+            _name = name;
+            _description = description;
         }
 
-        String userInfo = url.getUserInfo();
-        String username = null;
-        String password = null;
-        String clientId = null;
-        String remoteHost = null;
-
-        boolean binaryMessageId = true;
-        boolean syncPublish = false;
-        int maxSessions = 0;
+        public abstract void setOption(ConnectionOptions options, String 
value) throws MalformedURLException;
 
-        if(userInfo != null)
+        public static void parseOptions(URL url, ConnectionOptions options) 
throws MalformedURLException
         {
-            String[] components = userInfo.split(":",2);
-            username = URLDecoder.decode(components[0]);
-            if(components.length == 2)
+            String query = url.getQuery();
+            if(query != null)
             {
-                password = URLDecoder.decode(components[1]);
+                for(String param : query.split("&"))
+                {
+
+                    String[] keyValuePair = param.split("=",2);
+                    OptionSetter setter = 
OPTION_SETTER_MAP.get(keyValuePair[0]);
+                    if(setter != null)
+                    {
+                        setter.setOption(options, keyValuePair[1]);
+                    }
+                    else
+                    {
+                        throw new MalformedURLException("Unknown URL option: 
'"+keyValuePair[0]+"' in connection URL");
+                    }
+
+                }
             }
         }
-        String query = url.getQuery();
-        if(query != null)
+    }
+
+    private static final OptionSetter[] _options =
         {
-            for(String param : query.split("&"))
+            new OptionSetter("clientid", "JMS client id / AMQP container id")
             {
-                String[] keyValuePair = param.split("=",2);
-                if(keyValuePair[0].equalsIgnoreCase("clientid"))
+                public void setOption(ConnectionOptions options, String value)
                 {
-                    clientId = keyValuePair[1];
+                    options.clientId = value;
                 }
-                else if(keyValuePair[0].equalsIgnoreCase("ssl"))
+            },
+            new OptionSetter("ssl", "Set to \"true\" to use SSL encryption")
+            {
+                public void setOption(ConnectionOptions options, String value)
                 {
-                    ssl = Boolean.valueOf(keyValuePair[1]);
+                    options.ssl = Boolean.valueOf(value);
                 }
-                else if(keyValuePair[0].equalsIgnoreCase("remote-host"))
+            },
+            new OptionSetter("remote-host", "AMQP remote host")
+            {
+                public void setOption(ConnectionOptions options, String value)
                 {
-                    remoteHost = keyValuePair[1];
+                    options.remoteHost = value;
                 }
-                else if (keyValuePair[0].equalsIgnoreCase("binary-messageid"))
+            },
+            new OptionSetter("binary-messageid", "Use binary (rather than 
String) message ids")
+            {
+                public void setOption(ConnectionOptions options, String value)
                 {
-                    binaryMessageId = Boolean.parseBoolean(keyValuePair[1]);
+                    options.binaryMessageId = Boolean.parseBoolean(value);
                 }
-                else if (keyValuePair[0].equalsIgnoreCase("sync-publish"))
+            },
+            new OptionSetter("sync-publish", "Wait for acknowledge when 
sending messages")
+            {
+                public void setOption(ConnectionOptions options, String value)
                 {
-                    syncPublish = Boolean.parseBoolean(keyValuePair[1]);
+                    options.syncPublish = Boolean.parseBoolean(value);
                 }
-                else if(keyValuePair[0].equalsIgnoreCase("max-sessions"))
+            },
+            new OptionSetter("max-sessions", "set maximum number of sessions 
allowed")
+            {
+                public void setOption(ConnectionOptions options, String value)
                 {
-                    maxSessions = Integer.parseInt(keyValuePair[1]);
+                    options.maxSessions = Integer.parseInt(value);
                 }
-                else
+            },
+            new OptionSetter("max-prefetch", "set maximum number of messages 
prefetched on a link")
+            {
+                public void setOption(ConnectionOptions options, String value)
                 {
-                    throw new MalformedURLException("Unknown URL option: 
'"+keyValuePair[0]+"' in connection URL: "+urlString);
+                    options.maxPrefetch = Integer.parseInt(value);
                 }
             }
+        };
+
+    public static ConnectionFactoryImpl createFromURL(final String urlString) 
throws MalformedURLException
+    {
+        URL url = new URL(null, urlString, new URLStreamHandler()
+        {
+            @Override
+            protected URLConnection openConnection(URL u) throws IOException
+            {
+                throw new UnsupportedOperationException();
+            }
+        });
+        String protocol = url.getProtocol();
+        if (protocol == null || "".equals(protocol))
+        {
+            protocol = "amqp";
         }
+        String host = url.getHost();
+        int port = url.getPort();
 
-        if(remoteHost == null)
+        final ConnectionOptions options = new ConnectionOptions();
+
+        if (port == -1)
         {
-            remoteHost = host;
+            if ("amqps".equals(protocol))
+            {
+                port = 5671;
+                options.ssl = true;
+            }
+            else
+            {
+                port = 5672;
+            }
+        }
+        else if ("amqps".equals(protocol))
+        {
+            options.ssl = true;
+        }
+
+
+        String userInfo = url.getUserInfo();
+
+        if (userInfo != null)
+        {
+            String[] components = userInfo.split(":", 2);
+            options.username = URLDecoder.decode(components[0]);
+            if (components.length == 2)
+            {
+                options.password = URLDecoder.decode(components[1]);
+            }
+        }
+
+        OptionSetter.parseOptions(url, options);
+
+        if (options.remoteHost == null)
+        {
+            options.remoteHost = host;
         }
 
         ConnectionFactoryImpl connectionFactory =
-                new ConnectionFactoryImpl(protocol,host, port, username, 
password, clientId, remoteHost, ssl, maxSessions);
-        connectionFactory.setUseBinaryMessageId(binaryMessageId);
-        connectionFactory.setSyncPublish(syncPublish);
+                new ConnectionFactoryImpl(protocol,
+                                          host,
+                                          port,
+                                          options.username,
+                                          options.password,
+                                          options.clientId,
+                                          options.remoteHost,
+                                          options.ssl,
+                                          options.maxSessions);
+        connectionFactory.setUseBinaryMessageId(options.binaryMessageId);
+        connectionFactory.setSyncPublish(options.syncPublish);
+        if (options.maxPrefetch != 0)
+        {
+            connectionFactory.setMaxPrefetch(options.maxPrefetch);
+        }
 
         return connectionFactory;
 
@@ -308,4 +394,6 @@ public class ConnectionFactoryImpl imple
     {
         _syncPublish = syncPublish;
     }
+
+
 }

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1557775&r1=1557774&r2=1557775&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
 Mon Jan 13 17:17:50 2014
@@ -62,6 +62,12 @@ public class ConnectionImpl implements C
     private boolean _useBinaryMessageId = 
Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
     private boolean _syncPublish = 
Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false"));
     private int _maxSessions;
+    private int _maxPrefetch;
+
+    public void setMaxPrefetch(final int maxPrefetch)
+    {
+        _maxPrefetch = maxPrefetch;
+    }
 
     private static enum State
     {
@@ -190,6 +196,10 @@ public class ConnectionImpl implements C
             SessionImpl session = new SessionImpl(this, acknowledgeMode);
             session.setQueueSession(_isQueueConnection);
             session.setTopicSession(_isTopicConnection);
+            if(_maxPrefetch != 0)
+            {
+                session.setMaxPrefetch(_maxPrefetch);
+            }
             
             boolean connectionStarted = false;
             synchronized(_lock)

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1557775&r1=1557774&r2=1557775&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
 Mon Jan 13 17:17:50 2014
@@ -76,6 +76,7 @@ public class MessageConsumerImpl impleme
     private Binary _lastTxnUpdate;
     private final List<Message> _recoverReplayMessages = new 
ArrayList<Message>();
     private final List<Message> _replaymessages = new ArrayList<Message>();
+    private int _maxPrefetch = 100;
 
     MessageConsumerImpl(final Destination destination,
                         final SessionImpl session,
@@ -117,6 +118,10 @@ public class MessageConsumerImpl impleme
             throw new InvalidDestinationException("Invalid destination class " 
+ destination.getClass().getName());
         }
         _session = session;
+        if(session.getMaxPrefetch() != 0)
+        {
+            _maxPrefetch = session.getMaxPrefetch();
+        }
 
         _receiver = createClientReceiver();
         _receiver.setRemoteErrorListener(new Runnable()
@@ -442,7 +447,7 @@ public class MessageConsumerImpl impleme
 
     public void start()
     {
-        _receiver.setCredit(UnsignedInteger.valueOf(100), true);
+        _receiver.setCredit(UnsignedInteger.valueOf(getMaxPrefetch()), true);
     }
 
     public Queue getQueue() throws JMSException
@@ -487,4 +492,14 @@ public class MessageConsumerImpl impleme
             }
         }
     }
+
+    public int getMaxPrefetch()
+    {
+        return _maxPrefetch;
+    }
+
+    public void setMaxPrefetch(final int maxPrefetch)
+    {
+        _maxPrefetch = maxPrefetch;
+    }
 }

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1557775&r1=1557774&r2=1557775&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
 Mon Jan 13 17:17:50 2014
@@ -81,6 +81,7 @@ public class SessionImpl implements Sess
     private boolean _isQueueSession;
     private boolean _isTopicSession;
     private Transaction _txn;
+    private int _maxPrefetch;
 
     protected SessionImpl(final ConnectionImpl connection, final 
AcknowledgeMode acknowledgeMode) throws JMSException
     {
@@ -843,6 +844,16 @@ public class SessionImpl implements Sess
         return _txn;
     }
 
+    public void setMaxPrefetch(final int maxPrefetch)
+    {
+        _maxPrefetch = maxPrefetch;
+    }
+
+    public int getMaxPrefetch()
+    {
+        return _maxPrefetch;
+    }
+
     private class Dispatcher implements Runnable
     {
 

Added: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestHttpsClientCertAuthTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestHttpsClientCertAuthTest.java?rev=1557775&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestHttpsClientCertAuthTest.java
 (added)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestHttpsClientCertAuthTest.java
 Mon Jan 13 17:17:50 2014
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.rest;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Transport;
+import 
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory;
+import 
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerFactory;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.qpid.test.utils.TestSSLConstants.KEYSTORE;
+import static org.apache.qpid.test.utils.TestSSLConstants.KEYSTORE_PASSWORD;
+import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE;
+import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE_PASSWORD;
+
+public class BrokerRestHttpsClientCertAuthTest extends QpidRestTestCase
+{
+
+    @Override
+    public void setUp() throws Exception
+    {
+        setSystemProperty("javax.net.debug", "ssl");
+        super.setUp();
+        setSystemProperty("javax.net.ssl.trustStore", TRUSTSTORE);
+        setSystemProperty("javax.net.ssl.trustStorePassword", 
TRUSTSTORE_PASSWORD);
+        setSystemProperty("javax.net.ssl.keystore", KEYSTORE);
+        setSystemProperty("javax.net.ssl.keyStorePassword", KEYSTORE_PASSWORD);
+
+    }
+
+    @Override
+    protected void customizeConfiguration() throws ConfigurationException, 
IOException
+    {
+        super.customizeConfiguration();
+        getRestTestHelper().setUseSslAuth(true);
+        Map<String, Object> newAttributes = new HashMap<String, Object>();
+        newAttributes.put(Port.PROTOCOLS, 
Collections.singleton(Protocol.HTTP));
+        newAttributes.put(Port.TRANSPORTS, 
Collections.singleton(Transport.SSL));
+        newAttributes.put(Port.KEY_STORE, 
TestBrokerConfiguration.ENTRY_NAME_SSL_KEYSTORE);
+        newAttributes.put(Port.TRUST_STORES, 
Collections.singleton(TestBrokerConfiguration.ENTRY_NAME_SSL_TRUSTSTORE));
+        newAttributes.put(Port.NEED_CLIENT_AUTH,"true");
+
+
+        Map<String, Object> externalProviderAttributes = new HashMap<String, 
Object>();
+        externalProviderAttributes.put(AuthenticationProvider.TYPE, 
ExternalAuthenticationManagerFactory.PROVIDER_TYPE);
+        externalProviderAttributes.put(AuthenticationProvider.NAME, 
EXTERNAL_AUTHENTICATION_PROVIDER);
+        
getBrokerConfiguration().addAuthenticationProviderConfiguration(externalProviderAttributes);
+
+        // set password authentication provider on http port for the tests
+        
getBrokerConfiguration().setObjectAttribute(TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT,
 Port.AUTHENTICATION_PROVIDER,
+                                                    
EXTERNAL_AUTHENTICATION_PROVIDER);
+
+        
getBrokerConfiguration().setObjectAttributes(TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT,
 newAttributes);
+    }
+
+    public void testGetWithHttps() throws Exception
+    {
+        Map<String, Object> saslData = 
getRestTestHelper().getJsonAsMap("/rest/sasl");
+
+        Asserts.assertAttributesPresent(saslData, "user");
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to