nkurihar closed pull request #901: Introduce configuration converter URL: https://github.com/apache/incubator-pulsar/pull/901
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e917d7e3b..27eff7f06 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.broker; +import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.Set; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java index 7aebaa445..b9e0321fc 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.common.configuration; +import org.apache.pulsar.broker.ServiceConfiguration; + import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.pulsar.common.util.FieldParser.update; @@ -25,6 +27,7 @@ import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; +import java.util.Arrays; import java.util.Map; import java.util.Properties; @@ -127,4 +130,44 @@ public static boolean isComplete(Object obj) throws IllegalArgumentException, Il return true; } + /** + * Converts a PulsarConfiguration object to a ServiceConfiguration object. + * + * @param conf + * @param ignoreNonExistMember + * @return + * @throws IllegalArgumentException + * if conf has the field whose name is not contained in ServiceConfiguration and ignoreNonExistMember is false. + * @throws RuntimeException + */ + public static ServiceConfiguration convertFrom(PulsarConfiguration conf, boolean ignoreNonExistMember) throws RuntimeException { + try { + final ServiceConfiguration convertedConf = ServiceConfiguration.class.newInstance(); + Field[] confFields = conf.getClass().getDeclaredFields(); + Arrays.stream(confFields).forEach(confField -> { + try { + Field convertedConfField = ServiceConfiguration.class.getDeclaredField(confField.getName()); + confField.setAccessible(true); + convertedConfField.setAccessible(true); + convertedConfField.set(convertedConf, confField.get(conf)); + } catch (NoSuchFieldException e) { + if (!ignoreNonExistMember) { + throw new IllegalArgumentException("Exception caused while converting configuration: " + e.getMessage()); + } + } catch (IllegalAccessException e) { + throw new RuntimeException("Exception caused while converting configuration: " + e.getMessage()); + } + }); + return convertedConf; + } catch (InstantiationException e) { + throw new RuntimeException("Exception caused while converting configuration: " + e.getMessage()); + } catch (IllegalAccessException e) { + throw new RuntimeException("Exception caused while converting configuration: " + e.getMessage()); + } + } + + public static ServiceConfiguration convertFrom(PulsarConfiguration conf) throws RuntimeException { + return convertFrom(conf, true); + } + } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java index 1b1673355..5ff95dc44 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java @@ -33,11 +33,52 @@ import java.util.Properties; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.common.configuration.FieldContext; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.testng.annotations.Test; public class PulsarConfigurationLoaderTest { + public class MockConfiguration implements PulsarConfiguration { + private Properties properties = new Properties(); + + private String zookeeperServers = "localhost:2181"; + private String globalZookeeperServers = "localhost:2184"; + private int brokerServicePort = 7650; + private int brokerServicePortTls = 7651; + private int webServicePort = 9080; + private int webServicePortTls = 9443; + private int notExistFieldInServiceConfig = 0; + + @Override + public Properties getProperties() { + return properties; + } + + @Override + public void setProperties(Properties properties) { + this.properties = properties; + } + } + + @Test + public void testConfigurationConverting() throws Exception { + MockConfiguration mockConfiguration = new MockConfiguration(); + ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(mockConfiguration); + + // check whether converting correctly + assertEquals(serviceConfiguration.getZookeeperServers(), "localhost:2181"); + assertEquals(serviceConfiguration.getGlobalZookeeperServers(), "localhost:2184"); + assertEquals(serviceConfiguration.getBrokerServicePort(), 7650); + assertEquals(serviceConfiguration.getBrokerServicePortTls(), 7651); + assertEquals(serviceConfiguration.getWebServicePort(), 9080); + assertEquals(serviceConfiguration.getWebServicePortTls(), 9443); + + // check whether exception causes + try { + PulsarConfigurationLoader.convertFrom(mockConfiguration, false); + fail(); + } catch (Exception e) { + assertEquals(e.getClass(), IllegalArgumentException.class); + } + } @Test public void testPulsarConfiguraitonLoadingStream() throws Exception { diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java index 76148dbad..8af2b0a62 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java @@ -28,6 +28,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationManager; import org.apache.pulsar.broker.cache.ConfigurationCacheService; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.discovery.service.server.ServiceConfig; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; @@ -79,7 +80,7 @@ public DiscoveryService(ServiceConfig serviceConfig) { public void start() throws Exception { discoveryProvider = new BrokerDiscoveryProvider(this.config, getZooKeeperClientFactory()); this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache); - ServiceConfiguration serviceConfiguration = createServiceConfiguration(config); + ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(config); authenticationService = new AuthenticationService(serviceConfiguration); authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService); startServer(); @@ -132,16 +133,6 @@ public void close() throws IOException { workerGroup.shutdownGracefully(); } - private ServiceConfiguration createServiceConfiguration(ServiceConfig config) { - ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); - serviceConfiguration.setAuthenticationEnabled(config.isAuthenticationEnabled()); - serviceConfiguration.setAuthorizationEnabled(config.isAuthorizationEnabled()); - serviceConfiguration.setAuthenticationProviders(config.getAuthenticationProviders()); - serviceConfiguration.setAuthorizationAllowWildcardsMatching(config.getAuthorizationAllowWildcardsMatching()); - serviceConfiguration.setProperties(config.getProperties()); - return serviceConfiguration; - } - /** * Derive the host * diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 91e93f769..fed6e15c8 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationService; @@ -34,6 +33,7 @@ import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; @@ -122,7 +122,7 @@ public void shutdown(int exitCode) { discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory()); this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache); - ServiceConfiguration serviceConfiguration = createServiceConfiguration(proxyConfig); + ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(proxyConfig); authenticationService = new AuthenticationService(serviceConfiguration); authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService); @@ -181,15 +181,6 @@ public void close() throws IOException { client.close(); } - private ServiceConfiguration createServiceConfiguration(ProxyConfiguration config) { - ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); - serviceConfiguration.setAuthenticationEnabled(config.isAuthenticationEnabled()); - serviceConfiguration.setAuthorizationEnabled(config.isAuthorizationEnabled()); - serviceConfiguration.setAuthenticationProviders(config.getAuthenticationProviders()); - serviceConfiguration.setProperties(config.getProperties()); - return serviceConfiguration; - } - public String getServiceUrl() { return serviceUrl; } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 838a2f1bf..28a0ed511 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; @@ -66,8 +67,9 @@ AuthorizationManager authorizationManager; PulsarClient pulsarClient; - private final ScheduledExecutorService executor = Executors.newScheduledThreadPool( - WebSocketProxyConfiguration.WEBSOCKET_SERVICE_THREADS, new DefaultThreadFactory("pulsar-websocket")); + private final ScheduledExecutorService executor = Executors + .newScheduledThreadPool(WebSocketProxyConfiguration.WEBSOCKET_SERVICE_THREADS, + new DefaultThreadFactory("pulsar-websocket")); private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor( WebSocketProxyConfiguration.GLOBAL_ZK_THREADS, "pulsar-websocket-ordered"); private GlobalZooKeeperCache globalZkCache; @@ -82,7 +84,7 @@ private final ProxyStats proxyStats; public WebSocketService(WebSocketProxyConfiguration config) { - this(createClusterData(config), createServiceConfiguration(config)); + this(createClusterData(config), PulsarConfigurationLoader.convertFrom(config)); } public WebSocketService(ClusterData localCluster, ServiceConfiguration config) { @@ -210,31 +212,6 @@ private static ClusterData createClusterData(WebSocketProxyConfiguration config) } } - private static ServiceConfiguration createServiceConfiguration(WebSocketProxyConfiguration config) { - ServiceConfiguration serviceConfig = new ServiceConfiguration(); - serviceConfig.setProperties(config.getProperties()); - serviceConfig.setClusterName(config.getClusterName()); - serviceConfig.setWebServicePort(config.getWebServicePort()); - serviceConfig.setWebServicePortTls(config.getWebServicePortTls()); - serviceConfig.setAuthenticationEnabled(config.isAuthenticationEnabled()); - serviceConfig.setAuthenticationProviders(config.getAuthenticationProviders()); - serviceConfig.setBrokerClientAuthenticationPlugin(config.getBrokerClientAuthenticationPlugin()); - serviceConfig.setBrokerClientAuthenticationParameters(config.getBrokerClientAuthenticationParameters()); - serviceConfig.setAuthorizationEnabled(config.isAuthorizationEnabled()); - serviceConfig.setAuthorizationAllowWildcardsMatching(config.getAuthorizationAllowWildcardsMatching()); - serviceConfig.setSuperUserRoles(config.getSuperUserRoles()); - serviceConfig.setGlobalZookeeperServers(config.getGlobalZookeeperServers()); - serviceConfig.setZooKeeperSessionTimeoutMillis(config.getZooKeeperSessionTimeoutMillis()); - serviceConfig.setTlsEnabled(config.isTlsEnabled()); - serviceConfig.setTlsTrustCertsFilePath(config.getTlsTrustCertsFilePath()); - serviceConfig.setTlsCertificateFilePath(config.getTlsCertificateFilePath()); - serviceConfig.setTlsKeyFilePath(config.getTlsKeyFilePath()); - serviceConfig.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection()); - serviceConfig.setWebSocketNumIoThreads(config.getNumIoThreads()); - serviceConfig.setWebSocketConnectionsPerBroker(config.getConnectionsPerBroker()); - return serviceConfig; - } - private ClusterData retrieveClusterData() throws PulsarServerException { if (configurationCacheService == null) { throw new PulsarServerException("Failed to retrieve Cluster data due to empty GlobalZookeeperServers"); @@ -307,16 +284,16 @@ public boolean removeConsumer(ConsumerHandler consumer) { } return false; } - + public boolean addReader(ReaderHandler reader) { return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic -> new ConcurrentOpenHashSet<>()) .add(reader); } - + public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ReaderHandler>> getReaders() { return topicReaderMap; } - + public boolean removeReader(ReaderHandler reader) { final String topicName = reader.getConsumer().getTopic(); if (topicReaderMap.containsKey(topicName)) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services