nkurihar closed pull request #901: Introduce configuration converter
URL: https://github.com/apache/incubator-pulsar/pull/901
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e917d7e3b..27eff7f06 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.broker;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
index 7aebaa445..b9e0321fc 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.common.configuration;
 
+import org.apache.pulsar.broker.ServiceConfiguration;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.pulsar.common.util.FieldParser.update;
 
@@ -25,6 +27,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Field;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
 
@@ -127,4 +130,44 @@ public static boolean isComplete(Object obj) throws 
IllegalArgumentException, Il
         return true;
     }
 
+    /**
+     * Converts a PulsarConfiguration object to a ServiceConfiguration object.
+     *
+     * @param conf
+     * @param ignoreNonExistMember
+     * @return
+     * @throws IllegalArgumentException
+     *             if conf has the field whose name is not contained in 
ServiceConfiguration and ignoreNonExistMember is false.
+     * @throws RuntimeException
+     */
+    public static ServiceConfiguration convertFrom(PulsarConfiguration conf, 
boolean ignoreNonExistMember) throws RuntimeException {
+        try {
+            final ServiceConfiguration convertedConf = 
ServiceConfiguration.class.newInstance();
+            Field[] confFields = conf.getClass().getDeclaredFields();
+            Arrays.stream(confFields).forEach(confField -> {
+                try {
+                    Field convertedConfField = 
ServiceConfiguration.class.getDeclaredField(confField.getName());
+                    confField.setAccessible(true);
+                    convertedConfField.setAccessible(true);
+                    convertedConfField.set(convertedConf, confField.get(conf));
+                } catch (NoSuchFieldException e) {
+                    if (!ignoreNonExistMember) {
+                        throw new IllegalArgumentException("Exception caused 
while converting configuration: " + e.getMessage());
+                    }
+                } catch (IllegalAccessException e) {
+                    throw new RuntimeException("Exception caused while 
converting configuration: " + e.getMessage());
+                }
+            });
+            return convertedConf;
+        } catch (InstantiationException e) {
+            throw new RuntimeException("Exception caused while converting 
configuration: " + e.getMessage());
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException("Exception caused while converting 
configuration: " + e.getMessage());
+        }
+    }
+
+    public static ServiceConfiguration convertFrom(PulsarConfiguration conf) 
throws RuntimeException {
+        return convertFrom(conf, true);
+    }
+
 }
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
index 1b1673355..5ff95dc44 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
@@ -33,11 +33,52 @@
 import java.util.Properties;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.configuration.FieldContext;
-import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.testng.annotations.Test;
 
 public class PulsarConfigurationLoaderTest {
+    public class MockConfiguration implements PulsarConfiguration {
+        private Properties properties = new Properties();
+
+        private String zookeeperServers = "localhost:2181";
+        private String globalZookeeperServers = "localhost:2184";
+        private int brokerServicePort = 7650;
+        private int brokerServicePortTls = 7651;
+        private int webServicePort = 9080;
+        private int webServicePortTls = 9443;
+        private int notExistFieldInServiceConfig = 0;
+
+        @Override
+        public Properties getProperties() {
+            return properties;
+        }
+
+        @Override
+        public void setProperties(Properties properties) {
+            this.properties = properties;
+        }
+    }
+
+    @Test
+    public void testConfigurationConverting() throws Exception {
+        MockConfiguration mockConfiguration = new MockConfiguration();
+        ServiceConfiguration serviceConfiguration = 
PulsarConfigurationLoader.convertFrom(mockConfiguration);
+
+        // check whether converting correctly
+        assertEquals(serviceConfiguration.getZookeeperServers(), 
"localhost:2181");
+        assertEquals(serviceConfiguration.getGlobalZookeeperServers(), 
"localhost:2184");
+        assertEquals(serviceConfiguration.getBrokerServicePort(), 7650);
+        assertEquals(serviceConfiguration.getBrokerServicePortTls(), 7651);
+        assertEquals(serviceConfiguration.getWebServicePort(), 9080);
+        assertEquals(serviceConfiguration.getWebServicePortTls(), 9443);
+
+        // check whether exception causes
+        try {
+            PulsarConfigurationLoader.convertFrom(mockConfiguration, false);
+            fail();
+        } catch (Exception e) {
+            assertEquals(e.getClass(), IllegalArgumentException.class);
+        }
+    }
 
     @Test
     public void testPulsarConfiguraitonLoadingStream() throws Exception {
diff --git 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
index 76148dbad..8af2b0a62 100644
--- 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
+++ 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
@@ -28,6 +28,7 @@
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationManager;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.discovery.service.server.ServiceConfig;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -79,7 +80,7 @@ public DiscoveryService(ServiceConfig serviceConfig) {
     public void start() throws Exception {
         discoveryProvider = new BrokerDiscoveryProvider(this.config, 
getZooKeeperClientFactory());
         this.configurationCacheService = new 
ConfigurationCacheService(discoveryProvider.globalZkCache);
-        ServiceConfiguration serviceConfiguration = 
createServiceConfiguration(config);
+        ServiceConfiguration serviceConfiguration = 
PulsarConfigurationLoader.convertFrom(config);
         authenticationService = new 
AuthenticationService(serviceConfiguration);
         authorizationManager = new AuthorizationManager(serviceConfiguration, 
configurationCacheService);
         startServer();
@@ -132,16 +133,6 @@ public void close() throws IOException {
         workerGroup.shutdownGracefully();
     }
 
-    private ServiceConfiguration createServiceConfiguration(ServiceConfig 
config) {
-        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
-        
serviceConfiguration.setAuthenticationEnabled(config.isAuthenticationEnabled());
-        
serviceConfiguration.setAuthorizationEnabled(config.isAuthorizationEnabled());
-        
serviceConfiguration.setAuthenticationProviders(config.getAuthenticationProviders());
-        
serviceConfiguration.setAuthorizationAllowWildcardsMatching(config.getAuthorizationAllowWildcardsMatching());
-        serviceConfiguration.setProperties(config.getProperties());
-        return serviceConfiguration;
-    }
-
     /**
      * Derive the host
      *
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 91e93f769..fed6e15c8 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -24,7 +24,6 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -34,6 +33,7 @@
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
@@ -122,7 +122,7 @@ public void shutdown(int exitCode) {
 
         discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, 
getZooKeeperClientFactory());
         this.configurationCacheService = new 
ConfigurationCacheService(discoveryProvider.globalZkCache);
-        ServiceConfiguration serviceConfiguration = 
createServiceConfiguration(proxyConfig);
+        ServiceConfiguration serviceConfiguration = 
PulsarConfigurationLoader.convertFrom(proxyConfig);
         authenticationService = new 
AuthenticationService(serviceConfiguration);
         authorizationManager = new AuthorizationManager(serviceConfiguration, 
configurationCacheService);
 
@@ -181,15 +181,6 @@ public void close() throws IOException {
         client.close();
     }
 
-    private ServiceConfiguration createServiceConfiguration(ProxyConfiguration 
config) {
-        ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
-        
serviceConfiguration.setAuthenticationEnabled(config.isAuthenticationEnabled());
-        
serviceConfiguration.setAuthorizationEnabled(config.isAuthorizationEnabled());
-        
serviceConfiguration.setAuthenticationProviders(config.getAuthenticationProviders());
-        serviceConfiguration.setProperties(config.getProperties());
-        return serviceConfiguration;
-    }
-
     public String getServiceUrl() {
         return serviceUrl;
     }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 838a2f1bf..28a0ed511 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -39,6 +39,7 @@
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -66,8 +67,9 @@
     AuthorizationManager authorizationManager;
     PulsarClient pulsarClient;
 
-    private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(
-            WebSocketProxyConfiguration.WEBSOCKET_SERVICE_THREADS, new 
DefaultThreadFactory("pulsar-websocket"));
+    private final ScheduledExecutorService executor = Executors
+            
.newScheduledThreadPool(WebSocketProxyConfiguration.WEBSOCKET_SERVICE_THREADS,
+                    new DefaultThreadFactory("pulsar-websocket"));
     private final OrderedSafeExecutor orderedExecutor = new 
OrderedSafeExecutor(
             WebSocketProxyConfiguration.GLOBAL_ZK_THREADS, 
"pulsar-websocket-ordered");
     private GlobalZooKeeperCache globalZkCache;
@@ -82,7 +84,7 @@
     private final ProxyStats proxyStats;
 
     public WebSocketService(WebSocketProxyConfiguration config) {
-        this(createClusterData(config), createServiceConfiguration(config));
+        this(createClusterData(config), 
PulsarConfigurationLoader.convertFrom(config));
     }
 
     public WebSocketService(ClusterData localCluster, ServiceConfiguration 
config) {
@@ -210,31 +212,6 @@ private static ClusterData 
createClusterData(WebSocketProxyConfiguration config)
         }
     }
 
-    private static ServiceConfiguration 
createServiceConfiguration(WebSocketProxyConfiguration config) {
-        ServiceConfiguration serviceConfig = new ServiceConfiguration();
-        serviceConfig.setProperties(config.getProperties());
-        serviceConfig.setClusterName(config.getClusterName());
-        serviceConfig.setWebServicePort(config.getWebServicePort());
-        serviceConfig.setWebServicePortTls(config.getWebServicePortTls());
-        
serviceConfig.setAuthenticationEnabled(config.isAuthenticationEnabled());
-        
serviceConfig.setAuthenticationProviders(config.getAuthenticationProviders());
-        
serviceConfig.setBrokerClientAuthenticationPlugin(config.getBrokerClientAuthenticationPlugin());
-        
serviceConfig.setBrokerClientAuthenticationParameters(config.getBrokerClientAuthenticationParameters());
-        serviceConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
-        
serviceConfig.setAuthorizationAllowWildcardsMatching(config.getAuthorizationAllowWildcardsMatching());
-        serviceConfig.setSuperUserRoles(config.getSuperUserRoles());
-        
serviceConfig.setGlobalZookeeperServers(config.getGlobalZookeeperServers());
-        
serviceConfig.setZooKeeperSessionTimeoutMillis(config.getZooKeeperSessionTimeoutMillis());
-        serviceConfig.setTlsEnabled(config.isTlsEnabled());
-        
serviceConfig.setTlsTrustCertsFilePath(config.getTlsTrustCertsFilePath());
-        
serviceConfig.setTlsCertificateFilePath(config.getTlsCertificateFilePath());
-        serviceConfig.setTlsKeyFilePath(config.getTlsKeyFilePath());
-        
serviceConfig.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection());
-        serviceConfig.setWebSocketNumIoThreads(config.getNumIoThreads());
-        
serviceConfig.setWebSocketConnectionsPerBroker(config.getConnectionsPerBroker());
-        return serviceConfig;
-    }
-
     private ClusterData retrieveClusterData() throws PulsarServerException {
         if (configurationCacheService == null) {
             throw new PulsarServerException("Failed to retrieve Cluster data 
due to empty GlobalZookeeperServers");
@@ -307,16 +284,16 @@ public boolean removeConsumer(ConsumerHandler consumer) {
         }
         return false;
     }
-    
+
     public boolean addReader(ReaderHandler reader) {
         return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), 
topic -> new ConcurrentOpenHashSet<>())
                 .add(reader);
     }
-    
+
     public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ReaderHandler>> 
getReaders() {
         return topicReaderMap;
     }
-    
+
     public boolean removeReader(ReaderHandler reader) {
         final String topicName = reader.getConsumer().getTopic();
         if (topicReaderMap.containsKey(topicName)) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to