This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new c62b5c0 Issue 1014: Rename "global zookeeper" to "configuration-store"(change in code, conf and cli) (#1059) c62b5c0 is described below commit c62b5c014b1f69c8c9aea78be22d3f9c95eefb99 Author: Jia Zhai <zhaiji...@gmail.com> AuthorDate: Wed May 2 00:35:57 2018 +0800 Issue 1014: Rename "global zookeeper" to "configuration-store"(change in code, conf and cli) (#1059) * global-zookeeper => configuration-store: change in code, conf and cli * change following @sijie's comments * remove un-used imports * change following @ivan's comments * change following comments * fix rebase error * change for PR #1572 and #1223, fix integration fail * Fix cube definitions after global zookeeper is renamed to configuration-store * Hide deprecated options and few more adjustments * Fixed required status of global-zk argument in cluster init tool * Limit the memory usage for processes * Fix the aliases and limit the memory usage * remove environment settings * Fix time.sleep at watch-znode.py * revert unstarted to use started yml * Fix invalid `""` --- bin/pulsar | 26 +++++++-- bin/pulsar-admin | 1 - bin/pulsar-daemon | 21 ++++--- conf/broker.conf | 11 +++- conf/discovery.conf | 10 +++- conf/proxy.conf | 14 +++-- conf/standalone.conf | 10 +++- conf/websocket.conf | 12 +++- deployment/dcos/PulsarGroups.json | 4 +- deployment/kubernetes/aws/broker.yaml | 2 +- deployment/kubernetes/generic/broker.yaml | 2 +- deployment/kubernetes/generic/proxy.yaml | 2 +- .../google-kubernetes-engine/broker.yaml | 2 +- .../google-kubernetes-engine/cluster-metadata.yaml | 2 +- .../kubernetes/google-kubernetes-engine/proxy.yaml | 2 +- deployment/terraform-ansible/deploy-pulsar.yaml | 6 +- deployment/terraform-ansible/templates/broker.conf | 2 +- .../apache/pulsar/broker/ServiceConfiguration.java | 24 ++++++++ .../PulsarConfigurationLoaderTest.java | 6 +- .../apache/pulsar/PulsarClusterMetadataSetup.java | 51 ++++++++++------ .../org/apache/pulsar/PulsarStandaloneStarter.java | 2 +- .../org/apache/pulsar/broker/PulsarService.java | 6 +- .../pulsar/broker/admin/impl/BrokersBase.java | 8 +-- .../org/apache/pulsar/PulsarBrokerStarterTest.java | 8 +-- .../org/apache/pulsar/broker/admin/AdminTest.java | 2 +- .../broker/auth/MockedPulsarServiceBaseTest.java | 2 +- .../pulsar/broker/service/ReplicatorTestBase.java | 6 +- .../broker/service/v1/V1_ReplicatorTestBase.java | 6 +- .../pulsar/client/api/NonPersistentTopicTest.java | 8 +-- .../websocket/proxy/ProxyAuthenticationTest.java | 11 +--- .../websocket/proxy/ProxyAuthorizationTest.java | 2 +- .../websocket/proxy/ProxyPublishConsumeTest.java | 2 +- .../proxy/ProxyPublishConsumeTlsTest.java | 2 +- .../proxy/v1/V1_ProxyAuthenticationTest.java | 6 +- .../configurations/pulsar_broker_test.conf | 2 +- pulsar-client-cpp/test-conf/standalone-ssl.conf | 5 +- pulsar-client-cpp/tests/authentication.conf | 5 +- pulsar-client-cpp/tests/standalone.conf | 31 +++++----- .../common/conf/InternalConfigurationData.java | 16 +++--- .../discovery/service/BrokerDiscoveryProvider.java | 2 +- .../service/server/DiscoveryServiceStarter.java | 14 +++-- .../discovery/service/server/ServiceConfig.java | 19 +++++- .../service/server/DiscoveryServiceWebTest.java | 67 ++++++++++++++++++++-- .../proxy/server/BrokerDiscoveryProvider.java | 2 +- .../pulsar/proxy/server/ProxyConfiguration.java | 16 +++++- .../apache/pulsar/proxy/server/ProxyService.java | 4 +- .../pulsar/proxy/server/ProxyServiceStarter.java | 13 ++++- .../ProxyAuthenticatedProducerConsumerTest.java | 2 +- .../server/ProxyConnectionThrottlingTest.java | 2 +- .../proxy/server/ProxyLookupThrottlingTest.java | 2 +- .../org/apache/pulsar/proxy/server/ProxyTest.java | 2 +- .../apache/pulsar/proxy/server/ProxyTlsTest.java | 2 +- .../apache/pulsar/websocket/WebSocketService.java | 9 +-- .../service/WebSocketProxyConfiguration.java | 18 +++++- ...Starter.java => ConfigurationStoreStarter.java} | 4 +- site/docs/latest/deployment/cluster.md | 4 +- .../latest-version-image/conf/bookie.conf | 2 +- .../latest-version-image/conf/broker.conf | 1 + .../latest-version-image/conf/global-zk.conf | 3 +- .../latest-version-image/conf/local-zk.conf | 1 + .../latest-version-image/conf/proxy.conf | 2 + .../latest-version-image/scripts/init-cluster.sh | 2 +- tests/integration-tests-base/pom.xml | 1 - ...single-cluster-3-bookie-2-broker-unstarted.yaml | 16 +++--- .../single-cluster-3-bookie-2-broker.yaml | 16 +++--- .../java/org/apache/pulsar/tests/DockerUtils.java | 1 + .../apache/pulsar/tests/PulsarClusterUtils.java | 4 ++ .../pulsar/tests/integration/TestCompaction.java | 16 +++--- .../smoke/src/test/resources/arquillian.xml | 1 + 69 files changed, 400 insertions(+), 188 deletions(-) diff --git a/bin/pulsar b/bin/pulsar index 0bd2228..4a58d30 100755 --- a/bin/pulsar +++ b/bin/pulsar @@ -24,7 +24,7 @@ PULSAR_HOME=`cd $BINDIR/..;pwd` DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf DEFAULT_ZK_CONF=$PULSAR_HOME/conf/zookeeper.conf -DEFAULT_GLOBAL_ZK_CONF=$PULSAR_HOME/conf/global_zookeeper.conf +DEFAULT_CONFIGURATION_STORE_CONF=$PULSAR_HOME/conf/global_zookeeper.conf DEFAULT_DISCOVERY_CONF=$PULSAR_HOME/conf/discovery.conf DEFAULT_PROXY_CONF=$PULSAR_HOME/conf/proxy.conf DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf @@ -103,18 +103,19 @@ pulsar_help() { cat <<EOF Usage: pulsar <command> where command is one of: + broker Run a broker server bookie Run a bookie server zookeeper Run a zookeeper server - global-zookeeper Run a global-zookeeper server + configuration-store Run a configuration-store server discovery Run a discovery server proxy Run a pulsar proxy websocket Run a web socket proxy server functions-worker Run a functions worker server standalone Run a broker server with local bookies and local zookeeper - compact-topic Run compaction against a topic initialize-cluster-metadata One-time metadata initialization + compact-topic Run compaction against a topic zookeeper-shell Open a ZK shell client help This help message @@ -126,7 +127,7 @@ Environment variables: PULSAR_BROKER_CONF Configuration file for broker (default: $DEFAULT_BROKER_CONF) PULSAR_BOOKKEEPER_CONF Configuration file for bookie (default: $DEFAULT_BOOKKEEPER_CONF) PULSAR_ZK_CONF Configuration file for zookeeper (default: $DEFAULT_ZK_CONF) - PULSAR_GLOBAL_ZK_CONF Configuration file for global zookeeper (default: $DEFAULT_GLOBAL_ZK_CONF) + PULSAR_CONFIGURATION_STORE_CONF Configuration file for global configuration store (default: $DEFAULT_CONFIGURATION_STORE_CONF) PULSAR_DISCOVERY_CONF Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF) PULSAR_WEBSOCKET_CONF Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF) PULSAR_PROXY_CONF Configuration file for Pulsar proxy (default: $DEFAULT_PROXY_CONF) @@ -192,6 +193,11 @@ fi if [ -z "$PULSAR_GLOBAL_ZK_CONF" ]; then PULSAR_GLOBAL_ZK_CONF=$DEFAULT_GLOBAL_ZK_CONF + PULSAR_CONFIGURATION_STORE_CONF=$DEFAULT_GLOBAL_ZK_CONF +fi + +if [ -z "$PULSAR_CONFIGURATION_STORE_CONF" ]; then + PULSAR_CONFIGURATION_STORE_CONF=$DEFAULT_CONFIGURATION_STORE_CONF fi if [ -z "$PULSAR_DISCOVERY_CONF" ]; then @@ -260,7 +266,12 @@ elif [ $COMMAND == "global-zookeeper" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"} # Allow global ZK to turn into read-only mode when it cannot reach the quorum OPTS="${OPTS} -Dreadonlymode.enabled=true" - exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.GlobalZooKeeperStarter $PULSAR_GLOBAL_ZK_CONF $@ + exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_GLOBAL_ZK_CONF $@ +elif [ $COMMAND == "configuration-store" ]; then + PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"configuration-store.log"} + # Allow global ZK to turn into read-only mode when it cannot reach the quorum + OPTS="${OPTS} -Dreadonlymode.enabled=true" + exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_CONFIGURATION_STORE_CONF $@ elif [ $COMMAND == "discovery" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"discovery.log"} exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter $PULSAR_DISCOVERY_CONF $@ @@ -285,5 +296,8 @@ elif [ $COMMAND == "compact-topic" ]; then elif [ $COMMAND == "help" ]; then pulsar_help; else - exec $JAVA $OPTS $COMMAND $@ + echo "" + echo "-- Invalid command '$COMMAND' -- Use '$0 help' to get a list of valid commands" + echo "" + exit 1 fi diff --git a/bin/pulsar-admin b/bin/pulsar-admin index c8b41da..837a605 100755 --- a/bin/pulsar-admin +++ b/bin/pulsar-admin @@ -146,5 +146,4 @@ OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}" #Change to PULSAR_HOME to support relative paths cd "$PULSAR_HOME" - exec $JAVA $OPTS org.apache.pulsar.admin.cli.PulsarAdminTool $PULSAR_CLIENT_CONF "$@" diff --git a/bin/pulsar-daemon b/bin/pulsar-daemon index 5853f11..0957413 100755 --- a/bin/pulsar-daemon +++ b/bin/pulsar-daemon @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,13 +22,13 @@ usage() { cat <<EOF Usage: pulsar-daemon (start|stop) <command> <args...> where command is one of: - broker Run a broker server - bookie Run a bookie server - zookeeper Run a zookeeper server - global-zookeeper Run a global-zookeeper server - discovery Run a discovery server - websocket Run a websocket proxy server - standalone Run a standalone Pulsar service + broker Run a broker server + bookie Run a bookie server + zookeeper Run a zookeeper server + configuration-store Run a configuration-store server + discovery Run a discovery server + websocket Run a websocket proxy server + standalone Run a standalone Pulsar service where argument is one of: -force (accepted only with stop command): Decides whether to stop the server forcefully if not stopped by normal shutdown @@ -73,6 +73,9 @@ case $command in (global-zookeeper) echo "doing $startStop $command ..." ;; + (configuration-store) + echo "doing $startStop $command ..." + ;; (discovery) echo "doing $startStop $command ..." ;; diff --git a/conf/broker.conf b/conf/broker.conf index 6f502a5..dc7ca04 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -22,8 +22,8 @@ # Zookeeper quorum connection string zookeeperServers= -# Global Zookeeper quorum connection string -globalZookeeperServers= +# Configuration Store connection string +configurationStoreServers= # Broker data port brokerServicePort=6650 @@ -457,3 +457,10 @@ functionsWorkerEnabled=false # Enable topic level metrics exposePublisherStats=true + + +### --- Deprecated config variables --- ### + +# Deprecated. Use configurationStoreServers +globalZookeeperServers= + diff --git a/conf/discovery.conf b/conf/discovery.conf index 87f887f..b1b6f41 100644 --- a/conf/discovery.conf +++ b/conf/discovery.conf @@ -20,8 +20,8 @@ # Zookeeper quorum connection string (comma-separated) zookeeperServers= -# Global zookeeper quorum connection string (comma-separated) -globalZookeeperServers= +# Configuration Store connection string (comma-separated) +configurationStoreServers= # ZooKeeper session timeout zookeeperSessionTimeoutMs=30000 @@ -77,3 +77,9 @@ tlsKeyFilePath= # Specify whether Client certificates are required for TLS # Reject the Connection if the Client Certificate is not trusted. tlsRequireTrustedClientCertOnConnect=false + + +### --- Deprecated config variables --- ### + +# Deprecated. Use configurationStoreServers +globalZookeeperServers= diff --git a/conf/proxy.conf b/conf/proxy.conf index f731240..a904f47 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -20,8 +20,8 @@ # Zookeeper quorum connection string (comma-separated) zookeeperServers= -# Global zookeeper quorum connection string (comma-separated) -globalZookeeperServers= +# Configuration Store connection string (comma-separated) +configurationStoreServers= # ZooKeeper session timeout zookeeperSessionTimeoutMs=30000 @@ -70,10 +70,10 @@ superUserRoles= forwardAuthorizationCredentials=false # --- RateLimiting ---- -# Max concurrent inbound Connections, proxy will reject requests beyond that. Default value is 10,000 +# Max concurrent inbound Connections, proxy will reject requests beyond that. Default value is 10,000 maxConcurrentInboundConnections=10000 -# Max concurrent outbound Connections, proxy will error out requests beyond that. Default value is 10,000 +# Max concurrent outbound Connections, proxy will error out requests beyond that. Default value is 10,000 maxConcurrentLookupRequests=10000 ##### --- TLS --- ##### @@ -96,3 +96,9 @@ tlsHostnameVerificationEnabled=false # Specify whether Client certificates are required for TLS # Reject the Connection if the Client Certificate is not trusted. tlsRequireTrustedClientCertOnConnect=false + + +### --- Deprecated config variables --- ### + +# Deprecated. Use configurationStoreServers +globalZookeeperServers= diff --git a/conf/standalone.conf b/conf/standalone.conf index 2beddf5..f5c9546 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -22,8 +22,8 @@ # Zookeeper quorum connection string zookeeperServers= -# Global Zookeeper quorum connection string -globalZookeeperServers= +# Configuration Store connection string +configurationStoreServers= brokerServicePort=6650 @@ -396,3 +396,9 @@ exposeTopicLevelMetricsInPrometheus=true # Enable topic level metrics exposePublisherStats=true + +### --- Deprecated config variables --- ### + +# Deprecated. Use configurationStoreServers +globalZookeeperServers= + diff --git a/conf/websocket.conf b/conf/websocket.conf index 87accac..f9f2436 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -19,13 +19,13 @@ ### --- Web Socket proxy settings --- ### -# Global Zookeeper quorum connection string -globalZookeeperServers= +# Configuration Store connection string +configurationStoreServers= # Zookeeper session timeout in milliseconds zooKeeperSessionTimeoutMillis=30000 -# Pulsar cluster url to connect to broker (optional if globalZookeeperServers present) +# Pulsar cluster url to connect to broker (optional if configurationStoreServers present) serviceUrl= serviceUrlTls= brokerServiceUrl= @@ -103,3 +103,9 @@ tlsTrustCertsFilePath= # Specify whether Client certificates are required for TLS # Reject the Connection if the Client Certificate is not trusted. tlsRequireTrustedClientCertOnConnect=false + + +### --- Deprecated config variables --- ### + +# Deprecated. Use configurationStoreServers +globalZookeeperServers= diff --git a/deployment/dcos/PulsarGroups.json b/deployment/dcos/PulsarGroups.json index 6585976..990ec57 100644 --- a/deployment/dcos/PulsarGroups.json +++ b/deployment/dcos/PulsarGroups.json @@ -182,7 +182,7 @@ "// Notice": "add PULSAR_MEM, PULSAR_GC, according to your environment.", "webServicePort": "8082", "zookeeperServers": "master.mesos:2181", - "globalZookeeperServers": "master.mesos:2181", + "configurationStoreServers": "master.mesos:2181", "clusterName": "pulsar-dcos" }, @@ -223,7 +223,7 @@ "env": { "webServicePort": "8082", "zookeeperServers": "master.mesos:2181", - "globalZookeeperServers": "master.mesos:2181", + "configurationStoreServers": "master.mesos:2181", "clusterName": "pulsar-dcos", "webServiceUrl": "http://broker.brokers.pulsar.marathon.mesos:8082", "brokerServiceUrl": "pulsar://broker.brokers.pulsar.marathon.mesos:6650" diff --git a/deployment/kubernetes/aws/broker.yaml b/deployment/kubernetes/aws/broker.yaml index c01ff20..70940ef 100644 --- a/deployment/kubernetes/aws/broker.yaml +++ b/deployment/kubernetes/aws/broker.yaml @@ -28,7 +28,7 @@ data: PULSAR_MEM: "\" -Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.linkCapacity=1024 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB -XX:+ExitOnOutOfMemoryError -XX:+PerfDisableSharedMem -Xms12g -Xmx12g -XX:MaxDirectMemorySize=14g -Dpulsar.root.logger=DEBUG,FILE \"" PULSAR_GC: "\" -XX:+UseG1GC -XX:MaxGCPauseMillis=10\"" zookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper - globalZookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper + configurationStoreServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper clusterName: us-east managedLedgerDefaultEnsembleSize: "2" managedLedgerDefaultWriteQuorum: "2" diff --git a/deployment/kubernetes/generic/broker.yaml b/deployment/kubernetes/generic/broker.yaml index 452cdf1..cf760c1 100644 --- a/deployment/kubernetes/generic/broker.yaml +++ b/deployment/kubernetes/generic/broker.yaml @@ -27,7 +27,7 @@ data: # better GC behavior at high throughput PULSAR_MEM: "\" -Xms1g -Xmx1g -XX:MaxDirectMemorySize=1g\"" zookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper - globalZookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper + configurationStoreServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper clusterName: us-central --- ## diff --git a/deployment/kubernetes/generic/proxy.yaml b/deployment/kubernetes/generic/proxy.yaml index 453b91e..6715cf1 100644 --- a/deployment/kubernetes/generic/proxy.yaml +++ b/deployment/kubernetes/generic/proxy.yaml @@ -25,7 +25,7 @@ metadata: data: PULSAR_MEM: "\" -Xms4g -Xmx4g -XX:MaxDirectMemorySize=4g\"" zookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper - globalZookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper + configurationStoreServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper clusterName: us-central --- ## diff --git a/deployment/kubernetes/google-kubernetes-engine/broker.yaml b/deployment/kubernetes/google-kubernetes-engine/broker.yaml index a07baa1..6e0d78e 100644 --- a/deployment/kubernetes/google-kubernetes-engine/broker.yaml +++ b/deployment/kubernetes/google-kubernetes-engine/broker.yaml @@ -26,7 +26,7 @@ data: PULSAR_MEM: "\" -Xms8g -Xmx8g -XX:MaxDirectMemorySize=4g\"" PULSAR_GC: "\" -XX:+UseG1GC \"" zookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper - globalZookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper + configurationStoreServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper clusterName: pulsar-gke --- ## diff --git a/deployment/kubernetes/google-kubernetes-engine/cluster-metadata.yaml b/deployment/kubernetes/google-kubernetes-engine/cluster-metadata.yaml index e4afffb..0027490 100644 --- a/deployment/kubernetes/google-kubernetes-engine/cluster-metadata.yaml +++ b/deployment/kubernetes/google-kubernetes-engine/cluster-metadata.yaml @@ -36,7 +36,7 @@ spec: bin/pulsar initialize-cluster-metadata \ --cluster us-central \ --zookeeper zookeeper \ - --global-zookeeper zookeeper \ + --configuration-store zookeeper \ --web-service-url http://broker.default.svc.cluster.local:8080/ \ --broker-service-url pulsar://broker.default.svc.cluster.local:6650/ || true; restartPolicy: Never diff --git a/deployment/kubernetes/google-kubernetes-engine/proxy.yaml b/deployment/kubernetes/google-kubernetes-engine/proxy.yaml index e62aa2a..138f12f 100644 --- a/deployment/kubernetes/google-kubernetes-engine/proxy.yaml +++ b/deployment/kubernetes/google-kubernetes-engine/proxy.yaml @@ -24,7 +24,7 @@ metadata: data: PULSAR_MEM: "\" -Xms4g -Xmx4g -XX:MaxDirectMemorySize=4g\"" zookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper - globalZookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper + configurationStoreServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper clusterName: pulsar-gke --- diff --git a/deployment/terraform-ansible/deploy-pulsar.yaml b/deployment/terraform-ansible/deploy-pulsar.yaml index c0e8eb8..bec552a 100644 --- a/deployment/terraform-ansible/deploy-pulsar.yaml +++ b/deployment/terraform-ansible/deploy-pulsar.yaml @@ -88,7 +88,7 @@ state: directory with_items: - data/zookeeper - - name: Add pulsar_env.sh configuration file + - name: Add pulsar_env.sh configuration file template: src: "../templates/pulsar_env.sh" dest: "/opt/pulsar/conf/pulsar_env.sh" @@ -114,7 +114,7 @@ bin/pulsar initialize-cluster-metadata \ --cluster {{ cluster_name }} \ --zookeeper localhost:2181 \ - --global-zookeeper localhost:2181 \ + --configuration-store localhost:2181 \ --web-service-url {{ http_url }} \ --broker-service-url {{ service_url }} args: @@ -161,7 +161,7 @@ args: chdir: /opt/pulsar when: groups['zookeeper'][0] == inventory_hostname - + - name: Hosts addresses hosts: localhost become: false diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index de78366..d168100 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -25,7 +25,7 @@ zookeeperServers={{ zookeeper_servers }} # Global Zookeeper quorum connection string -globalZookeeperServers={{ zookeeper_servers }} +configurationStoreServers={{ zookeeper_servers }} # Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used. advertisedAddress={{ hostvars[inventory_hostname].public_ip }} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e6c37ba..5cf31c1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -41,8 +41,12 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext(required = true) private String zookeeperServers; // Global Zookeeper quorum connection string + @Deprecated @FieldContext(required = false) private String globalZookeeperServers; + // Configuration Store connection string + @FieldContext(required = false) + private String configurationStoreServers; private int brokerServicePort = 6650; private int brokerServicePortTls = 6651; // Port to use to server HTTP request @@ -467,6 +471,10 @@ public class ServiceConfiguration implements PulsarConfiguration { this.zookeeperServers = zookeeperServers; } + /** + * @deprecated See {@link #getConfigurationStoreServers} + */ + @Deprecated public String getGlobalZookeeperServers() { if (this.globalZookeeperServers == null || this.globalZookeeperServers.isEmpty()) { // If the configuration is not set, assuming that the globalZK is not enabled and all data is in the same @@ -476,10 +484,26 @@ public class ServiceConfiguration implements PulsarConfiguration { return globalZookeeperServers; } + /** + * @deprecated See {@link #setConfigurationStoreServers(String)} + */ + @Deprecated public void setGlobalZookeeperServers(String globalZookeeperServers) { this.globalZookeeperServers = globalZookeeperServers; } + public String getConfigurationStoreServers() { + if (this.configurationStoreServers == null || this.configurationStoreServers.isEmpty()) { + // If the configuration is not set, assuming that all data is in the same as globalZK cluster + return this.getGlobalZookeeperServers(); + } + return configurationStoreServers; + } + + public void setConfigurationStoreServers(String configurationStoreServers) { + this.configurationStoreServers = configurationStoreServers; + } + public int getBrokerServicePort() { return brokerServicePort; } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java index d830c4f..baa0e75 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java @@ -41,7 +41,7 @@ public class PulsarConfigurationLoaderTest { private Properties properties = new Properties(); private String zookeeperServers = "localhost:2181"; - private String globalZookeeperServers = "localhost:2184"; + private String configurationStoreServers = "localhost:2184"; private int brokerServicePort = 7650; private int brokerServicePortTls = 7651; private int webServicePort = 9080; @@ -66,7 +66,7 @@ public class PulsarConfigurationLoaderTest { // check whether converting correctly assertEquals(serviceConfiguration.getZookeeperServers(), "localhost:2181"); - assertEquals(serviceConfiguration.getGlobalZookeeperServers(), "localhost:2184"); + assertEquals(serviceConfiguration.getConfigurationStoreServers(), "localhost:2184"); assertEquals(serviceConfiguration.getBrokerServicePort(), 7650); assertEquals(serviceConfiguration.getBrokerServicePortTls(), 7651); assertEquals(serviceConfiguration.getWebServicePort(), 9080); @@ -90,7 +90,7 @@ public class PulsarConfigurationLoaderTest { final String zkServer = "z1.example.com,z2.example.com,z3.example.com"; PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile))); printWriter.println("zookeeperServers=" + zkServer); - printWriter.println("globalZookeeperServers=gz1.example.com,gz2.example.com,gz3.example.com/foo"); + printWriter.println("configurationStoreServers=gz1.example.com,gz2.example.com,gz3.example.com/foo"); printWriter.println("brokerDeleteInactiveTopicsEnabled=true"); printWriter.println("statusFilePath=/tmp/status.html"); printWriter.println("managedLedgerDefaultEnsembleSize=1"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index 5dfa80d..3e62c9a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -79,9 +79,13 @@ public class PulsarClusterMetadataSetup { private String zookeeper; @Parameter(names = { "-gzk", - "--global-zookeeper" }, description = "Global ZooKeeper quorum connection string", required = true) + "--global-zookeeper" }, description = "Global ZooKeeper quorum connection string", required = false, hidden = true) private String globalZookeeper; + @Parameter(names = { "-cs", + "--configuration-store" }, description = "Configuration Store connection string", required = false) + private String configurationStore; + @Parameter(names = { "-h", "--help" }, description = "Show this help message") private boolean help = false; } @@ -101,12 +105,27 @@ public class PulsarClusterMetadataSetup { throw e; } - log.info("Setting up cluster {} with zk={} global-zk={}", arguments.cluster, arguments.zookeeper, - arguments.globalZookeeper); + if (arguments.configurationStore == null && arguments.globalZookeeper == null) { + System.err.println("Configuration store address argument is required (--configuration-store)"); + jcommander.usage(); + System.exit(1); + } + + if (arguments.configurationStore != null && arguments.globalZookeeper != null) { + System.err.println("Configuration store argument (--configuration-store) supercedes the deprecated (--global-zookeeper) argument"); + jcommander.usage(); + System.exit(1); + } + + if (arguments.configurationStore == null) { + arguments.configurationStore = arguments.globalZookeeper; + } + log.info("Setting up cluster {} with zk={} configuration-store ={}", arguments.cluster, arguments.zookeeper, + arguments.configurationStore); ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl(); ZooKeeper localZk = zkfactory.create(arguments.zookeeper, SessionType.ReadWrite, 30000).get(); - ZooKeeper globalZk = zkfactory.create(arguments.globalZookeeper, SessionType.ReadWrite, 30000).get(); + ZooKeeper configStoreZk = zkfactory.create(arguments.configurationStore, SessionType.ReadWrite, 30000).get(); // Format BookKeeper metadata ServerConfiguration bkConf = new ServerConfiguration(); @@ -121,14 +140,14 @@ public class PulsarClusterMetadataSetup { localZk.create("/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); try { - ZkUtils.createFullPathOptimistic(globalZk, POLICIES_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + ZkUtils.createFullPathOptimistic(configStoreZk, POLICIES_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (NodeExistsException e) { // Ignore } try { - ZkUtils.createFullPathOptimistic(globalZk, "/admin/clusters", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + ZkUtils.createFullPathOptimistic(configStoreZk, "/admin/clusters", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (NodeExistsException e) { // Ignore @@ -138,7 +157,7 @@ public class PulsarClusterMetadataSetup { arguments.clusterBrokerServiceUrl, arguments.clusterBrokerServiceUrlTls); byte[] clusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData); - globalZk.create("/admin/clusters/" + arguments.cluster, clusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE, + configStoreZk.create("/admin/clusters/" + arguments.cluster, clusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // Create marker for "global" cluster @@ -146,7 +165,7 @@ public class PulsarClusterMetadataSetup { byte[] globalClusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(globalClusterData); try { - globalZk.create("/admin/clusters/global", globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE, + configStoreZk.create("/admin/clusters/global", globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (NodeExistsException e) { // Ignore @@ -155,12 +174,12 @@ public class PulsarClusterMetadataSetup { // Create public tenant, whitelisted to use the this same cluster, along with other clusters String publicTenantPath = POLICIES_ROOT + "/" + TopicName.PUBLIC_TENANT; - Stat stat = globalZk.exists(publicTenantPath, false); + Stat stat = configStoreZk.exists(publicTenantPath, false); if (stat == null) { TenantInfo publicTenant = new TenantInfo(Collections.emptySet(), Collections.singleton(arguments.cluster)); try { - ZkUtils.createFullPathOptimistic(globalZk, publicTenantPath, + ZkUtils.createFullPathOptimistic(configStoreZk, publicTenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (NodeExistsException e) { @@ -168,14 +187,14 @@ public class PulsarClusterMetadataSetup { } } else { // Update existing public tenant with new cluster - byte[] content = globalZk.getData(publicTenantPath, false, null); + byte[] content = configStoreZk.getData(publicTenantPath, false, null); TenantInfo publicTenant = ObjectMapperFactory.getThreadLocal().readValue(content, TenantInfo.class); // Only update z-node if the list of clusters should be modified if (!publicTenant.getAllowedClusters().contains(arguments.cluster)) { publicTenant.getAllowedClusters().add(arguments.cluster); - globalZk.setData(publicTenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant), + configStoreZk.setData(publicTenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant), stat.getVersion()); } } @@ -184,7 +203,7 @@ public class PulsarClusterMetadataSetup { String defaultNamespacePath = POLICIES_ROOT + "/" + TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE; Policies policies; - stat = globalZk.exists(defaultNamespacePath, false); + stat = configStoreZk.exists(defaultNamespacePath, false); if (stat == null) { policies = new Policies(); policies.bundles = getBundles(16); @@ -192,7 +211,7 @@ public class PulsarClusterMetadataSetup { try { ZkUtils.createFullPathOptimistic( - globalZk, + configStoreZk, defaultNamespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies), ZooDefs.Ids.OPEN_ACL_UNSAFE, @@ -201,14 +220,14 @@ public class PulsarClusterMetadataSetup { // Ignore } } else { - byte[] content = globalZk.getData(defaultNamespacePath, false, null); + byte[] content = configStoreZk.getData(defaultNamespacePath, false, null); policies = ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class); // Only update z-node if the list of clusters should be modified if (!policies.replication_clusters.contains(arguments.cluster)) { policies.replication_clusters.add(arguments.cluster); - globalZk.setData(defaultNamespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies), + configStoreZk.setData(defaultNamespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies), stat.getVersion()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java index e5415b4..641636a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java @@ -133,7 +133,7 @@ public class PulsarStandaloneStarter { // Set ZK server's host to localhost config.setZookeeperServers(zkServers + ":" + zkPort); - config.setGlobalZookeeperServers(zkServers + ":" + zkPort); + config.setConfigurationStoreServers(zkServers + ":" + zkPort); config.setRunningStandalone(true); Runtime.getRuntime().addShutdownHook(new Thread() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 3d14b8b..c74ecd7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -421,7 +421,7 @@ public class PulsarService implements AutoCloseable { state = State.Started; acquireSLANamespace(); - + // start function worker service if necessary this.startWorkerService(); @@ -491,7 +491,7 @@ public class PulsarService implements AutoCloseable { this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor()); this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), - (int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(), + (int) config.getZooKeeperSessionTimeoutMillis(), config.getConfigurationStoreServers(), getOrderedExecutor(), this.cacheExecutor); try { this.globalZkCache.start(); @@ -889,7 +889,7 @@ public class PulsarService implements AutoCloseable { InternalConfigurationData internalConf = new InternalConfigurationData( this.getConfiguration().getZookeeperServers(), - this.getConfiguration().getGlobalZookeeperServers(), + this.getConfiguration().getConfigurationStoreServers(), new ClientConfiguration().getZkLedgersRootPath()); URI dlogURI; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 59027c2..cab707e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -56,7 +56,7 @@ import io.swagger.annotations.ApiResponses; public class BrokersBase extends AdminResource { private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class); private int serviceConfigZkVersion = -1; - + @GET @Path("/{cluster}") @ApiOperation(value = "Get the list of active brokers (web service addresses) in the cluster.", response = String.class, responseContainer = "Set") @@ -135,11 +135,11 @@ public class BrokersBase extends AdminResource { public List<String> getDynamicConfigurationName() { return BrokerService.getDynamicConfiguration(); } - + /** * if {@link ServiceConfiguration}-field is allowed to be modified dynamically, update configuration-map into zk, so * all other brokers get the watch and can see the change and take appropriate action on the change. - * + * * @param configName * : configuration key * @param configValue @@ -192,7 +192,7 @@ public class BrokersBase extends AdminResource { ClientConfiguration conf = new ClientConfiguration(); return new InternalConfigurationData( pulsar().getConfiguration().getZookeeperServers(), - pulsar().getConfiguration().getGlobalZookeeperServers(), + pulsar().getConfiguration().getConfigurationStoreServers(), conf.getZkLedgersRootPath()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java index f9373c0..9cce6db 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java @@ -47,7 +47,7 @@ public class PulsarBrokerStarterTest { } PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile))); printWriter.println("zookeeperServers=z1.example.com,z2.example.com,z3.example.com"); - printWriter.println("globalZookeeperServers=gz1.example.com,gz2.example.com,gz3.example.com/foo"); + printWriter.println("configurationStoreServers=gz1.example.com,gz2.example.com,gz3.example.com/foo"); printWriter.println("brokerDeleteInactiveTopicsEnabled=false"); printWriter.println("statusFilePath=/tmp/status.html"); printWriter.println("managedLedgerDefaultEnsembleSize=1"); @@ -99,7 +99,7 @@ public class PulsarBrokerStarterTest { Assert.assertTrue(ServiceConfiguration.class.isInstance(returnValue)); ServiceConfiguration serviceConfig = (ServiceConfiguration) returnValue; Assert.assertEquals(serviceConfig.getZookeeperServers(), "z1.example.com,z2.example.com,z3.example.com"); - Assert.assertEquals(serviceConfig.getGlobalZookeeperServers(), + Assert.assertEquals(serviceConfig.getConfigurationStoreServers(), "gz1.example.com,gz2.example.com,gz3.example.com/foo"); Assert.assertFalse(serviceConfig.isBrokerDeleteInactiveTopicsEnabled()); Assert.assertEquals(serviceConfig.getStatusFilePath(), "/tmp/status.html"); @@ -217,7 +217,7 @@ public class PulsarBrokerStarterTest { } PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile))); printWriter.println("zookeeperServers=z1.example.com,z2.example.com,z3.example.com"); - printWriter.println("globalZookeeperServers="); + printWriter.println("configurationStoreServers="); printWriter.println("brokerDeleteInactiveTopicsEnabled=false"); printWriter.println("statusFilePath=/tmp/status.html"); printWriter.println("managedLedgerDefaultEnsembleSize=1"); @@ -248,7 +248,7 @@ public class PulsarBrokerStarterTest { Assert.assertTrue(ServiceConfiguration.class.isInstance(returnValue)); ServiceConfiguration serviceConfig = (ServiceConfiguration) returnValue; Assert.assertEquals(serviceConfig.getZookeeperServers(), "z1.example.com,z2.example.com,z3.example.com"); - Assert.assertEquals(serviceConfig.getGlobalZookeeperServers(), "z1.example.com,z2.example.com,z3.example.com"); + Assert.assertEquals(serviceConfig.getConfigurationStoreServers(), "z1.example.com,z2.example.com,z3.example.com"); Assert.assertFalse(serviceConfig.isBrokerDeleteInactiveTopicsEnabled()); Assert.assertEquals(serviceConfig.getStatusFilePath(), "/tmp/status.html"); Assert.assertEquals(serviceConfig.getBacklogQuotaDefaultLimitGB(), 18); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index c14ecfc..1f0058c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -208,7 +208,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest { void internalConfiguration() throws Exception { InternalConfigurationData expectedData = new InternalConfigurationData( pulsar.getConfiguration().getZookeeperServers(), - pulsar.getConfiguration().getGlobalZookeeperServers(), + pulsar.getConfiguration().getConfigurationStoreServers(), new ClientConfiguration().getZkLedgersRootPath()); assertEquals(brokers.getInternalConfigurationData(), expectedData); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 72b8b74..23592b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -100,7 +100,7 @@ public abstract class MockedPulsarServiceBaseTest { this.conf.setActiveConsumerFailoverDelayTimeMillis(0); this.conf.setDefaultNumberOfNamespaceBundles(1); this.conf.setZookeeperServers("localhost:2181"); - this.conf.setGlobalZookeeperServers("localhost:3181"); + this.conf.setConfigurationStoreServers("localhost:3181"); } protected final void internalSetup() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 7d411f5..2f599ee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -119,7 +119,7 @@ public class ReplicatorTestBase { config1.setWebServicePort(webServicePort1); config1.setWebServicePortTls(webServicePortTls1); config1.setZookeeperServers("127.0.0.1:" + zkPort1); - config1.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo"); + config1.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config1.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); @@ -153,7 +153,7 @@ public class ReplicatorTestBase { config2.setWebServicePort(webServicePort2); config2.setWebServicePortTls(webServicePortTls2); config2.setZookeeperServers("127.0.0.1:" + zkPort2); - config2.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo"); + config2.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config2.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); @@ -187,7 +187,7 @@ public class ReplicatorTestBase { config3.setWebServicePort(webServicePort3); config3.setWebServicePortTls(webServicePortTls3); config3.setZookeeperServers("127.0.0.1:" + zkPort3); - config3.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo"); + config3.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config3.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java index 298acde..fc5001b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java @@ -118,7 +118,7 @@ public class V1_ReplicatorTestBase { config1.setWebServicePort(webServicePort1); config1.setWebServicePortTls(webServicePortTls1); config1.setZookeeperServers("127.0.0.1:" + zkPort1); - config1.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo"); + config1.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config1.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); @@ -152,7 +152,7 @@ public class V1_ReplicatorTestBase { config2.setWebServicePort(webServicePort2); config2.setWebServicePortTls(webServicePortTls2); config2.setZookeeperServers("127.0.0.1:" + zkPort2); - config2.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo"); + config2.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config2.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); @@ -186,7 +186,7 @@ public class V1_ReplicatorTestBase { config3.setWebServicePort(webServicePort3); config3.setWebServicePortTls(webServicePortTls3); config3.setZookeeperServers("127.0.0.1:" + zkPort3); - config3.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo"); + config3.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config3.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index e96dff4..20bb407 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -886,7 +886,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { config1.setAdvertisedAddress("localhost"); config1.setWebServicePort(webServicePort1); config1.setZookeeperServers("127.0.0.1:" + zkPort1); - config1.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo"); + config1.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config1.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); @@ -912,7 +912,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { config2.setWebServicePort(webServicePort2); config2.setAdvertisedAddress("localhost"); config2.setZookeeperServers("127.0.0.1:" + zkPort2); - config2.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo"); + config2.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config2.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); @@ -938,7 +938,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { config3.setWebServicePort(webServicePort3); config3.setAdvertisedAddress("localhost"); config3.setZookeeperServers("127.0.0.1:" + zkPort3); - config3.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo"); + config3.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo"); config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); config3.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); @@ -1012,4 +1012,4 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { log.error("Stats executor error", e); } } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java index eab6428..c59bf47 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java @@ -22,18 +22,17 @@ import static java.util.concurrent.Executors.newFixedThreadPool; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; +import com.google.common.collect.Sets; import java.net.URI; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Invocation; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; - import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.websocket.WebSocketService; @@ -50,8 +49,6 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.collect.Sets; - public class ProxyAuthenticationTest extends ProducerConsumerBase { private int port; @@ -70,11 +67,9 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase { config.setWebServicePort(port); config.setClusterName("test"); config.setAuthenticationEnabled(true); - config.setGlobalZookeeperServers("dummy-zk-servers"); - config.setSuperUserRoles(Sets.newHashSet("pulsar.super_user")); - // If this is not set, 500 error occurs. - config.setGlobalZookeeperServers("dummy"); + config.setConfigurationStoreServers("dummy"); + config.setSuperUserRoles(Sets.newHashSet("pulsar.super_user")); if (methodName.equals("authenticatedSocketTest") || methodName.equals("statsTest")) { config.setAuthenticationProviders(Sets.newHashSet("org.apache.pulsar.websocket.proxy.MockAuthenticationProvider")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index f9ac9eb..b79862f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -59,7 +59,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest { WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); Set<String> superUser = Sets.newHashSet(""); config.setAuthorizationEnabled(true); - config.setGlobalZookeeperServers("dummy-zk-servers"); + config.setConfigurationStoreServers("dummy-zk-servers"); config.setSuperUserRoles(superUser); config.setClusterName("c1"); config.setWebServicePort(TEST_PORT); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index c05c356..e084b04 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -86,7 +86,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); config.setWebServicePort(port); config.setClusterName("test"); - config.setGlobalZookeeperServers("dummy-zk-servers"); + config.setConfigurationStoreServers("dummy-zk-servers"); service = spy(new WebSocketService(config)); doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); proxyServer = new ProxyServer(config); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java index 6d486c2..650e05e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java @@ -73,7 +73,7 @@ public class ProxyPublishConsumeTlsTest extends TlsProducerConsumerBase { config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); config.setClusterName("use"); - config.setGlobalZookeeperServers("dummy-zk-servers"); + config.setConfigurationStoreServers("dummy-zk-servers"); config.setBrokerClientAuthenticationParameters("tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + ",tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH); config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT_TLS).toString(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java index fbe6e07..b13a6c6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java @@ -72,11 +72,9 @@ public class V1_ProxyAuthenticationTest extends V1_ProducerConsumerBase { config.setWebServicePort(port); config.setClusterName("use"); config.setAuthenticationEnabled(true); - config.setGlobalZookeeperServers("dummy-zk-servers"); - config.setSuperUserRoles(Sets.newHashSet("pulsar.super_user")); - // If this is not set, 500 error occurs. - config.setGlobalZookeeperServers("dummy"); + config.setConfigurationStoreServers("dummy"); + config.setSuperUserRoles(Sets.newHashSet("pulsar.super_user")); if (methodName.equals("authenticatedSocketTest") || methodName.equals("statsTest")) { config.setAuthenticationProviders(Sets.newHashSet("org.apache.pulsar.websocket.proxy.MockAuthenticationProvider")); diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index 2918e45..8034185 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -19,7 +19,7 @@ applicationName="pulsar_broker" zookeeperServers="localhost" -globalZookeeperServers="localhost" +configurationStoreServers="localhost" brokerServicePort=6650 brokerServicePortTls=6651 webServicePort=8080 diff --git a/pulsar-client-cpp/test-conf/standalone-ssl.conf b/pulsar-client-cpp/test-conf/standalone-ssl.conf index 44b730c..bb097d6 100644 --- a/pulsar-client-cpp/test-conf/standalone-ssl.conf +++ b/pulsar-client-cpp/test-conf/standalone-ssl.conf @@ -22,9 +22,12 @@ # Zookeeper quorum connection string zookeeperServers= -# Global Zookeeper quorum connection string +# Deprecated. Global zookeeper quorum connection string globalZookeeperServers= +# Configuration Store connection string +configurationStoreServers= + brokerServicePort=9885 brokerServicePortTls=9886 diff --git a/pulsar-client-cpp/tests/authentication.conf b/pulsar-client-cpp/tests/authentication.conf index f7a53d6..6db26da 100644 --- a/pulsar-client-cpp/tests/authentication.conf +++ b/pulsar-client-cpp/tests/authentication.conf @@ -22,9 +22,12 @@ # Zookeeper quorum connection string zookeeperServers= -# Global Zookeeper quorum connection string +# Deprecated. Global Zookeeper quorum connection string globalZookeeperServers= +# Configuration Store connection string +configurationStoreServers= + brokerServicePort=9885 brokerServicePortTls=9886 diff --git a/pulsar-client-cpp/tests/standalone.conf b/pulsar-client-cpp/tests/standalone.conf index 630de70..2fb8c0b 100644 --- a/pulsar-client-cpp/tests/standalone.conf +++ b/pulsar-client-cpp/tests/standalone.conf @@ -22,9 +22,12 @@ # Zookeeper quorum connection string zookeeperServers= -# Global Zookeeper quorum connection string +# Deprecated. Global Zookeeper quorum connection string globalZookeeperServers= +# Configuration Store connection string +configurationStoreServers= + brokerServicePort=8885 # Port to use to server HTTP request @@ -60,13 +63,13 @@ brokerDeleteInactiveTopicsEnabled=true # How often to check for inactive topics brokerDeleteInactiveTopicsFrequencySeconds=60 -# How frequently to proactively check and purge expired messages +# How frequently to proactively check and purge expired messages messageExpiryCheckIntervalInMinutes=5 # Enable check for minimum allowed client library version clientLibraryVersionCheckEnabled=false -# Allow client libraries with no version information +# Allow client libraries with no version information clientLibraryVersionCheckAllowUnversioned=true # Path for the file used to determine the rotation status for the broker when responding @@ -108,7 +111,7 @@ bookkeeperClientAuthenticationPlugin= bookkeeperClientAuthenticationParametersName= bookkeeperClientAuthenticationParameters= -# Timeout for BK add / read operations +# Timeout for BK add / read operations bookkeeperClientTimeoutInSeconds=30 # Speculative reads are initiated if a read request doesn't complete within a certain time @@ -124,11 +127,11 @@ bookkeeperClientHealthCheckErrorThresholdPerInterval=5 bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800 # Enable rack-aware bookie selection policy. BK will chose bookies from different racks when -# forming a new bookie ensemble +# forming a new bookie ensemble bookkeeperClientRackawarePolicyEnabled=true -# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie -# outside the specified groups will not be used by the broker +# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie +# outside the specified groups will not be used by the broker bookkeeperClientIsolationGroups= ### --- Managed Ledger --- ### @@ -144,7 +147,7 @@ managedLedgerDefaultAckQuorum=1 # Amount of memory to use for caching data payload in managed ledger. This memory # is allocated from JVM direct memory and it's shared across all the topics -# running in the same broker +# running in the same broker managedLedgerCacheSizeMB=1024 # Threshold to which bring down the cache level when eviction is triggered @@ -156,7 +159,7 @@ managedLedgerDefaultMarkDeleteRateLimit=0.1 # Max number of entries to append to a ledger before triggering a rollover # A ledger rollover is triggered on these conditions # * Either the max rollover time has been reached -# * or max entries have been written to the ledged and at least min-time +# * or max entries have been written to the ledged and at least min-time # has passed managedLedgerMaxEntriesPerLedger=50000 @@ -174,7 +177,7 @@ managedLedgerCursorRolloverTimeInSeconds=14400 -### --- Load balancer --- ### +### --- Load balancer --- ### # Enable load balancer loadBalancerEnabled=false @@ -192,13 +195,13 @@ loadBalancerReportUpdateMaxIntervalMinutes=15 loadBalancerHostUsageCheckIntervalMinutes=1 # Load shedding interval. Broker periodically checks whether some traffic should be offload from -# some over-loaded broker to other under-loaded brokers +# some over-loaded broker to other under-loaded brokers loadBalancerSheddingIntervalMinutes=30 -# Prevent the same topics to be shed and moved to other broker more that once within this timeframe +# Prevent the same topics to be shed and moved to other broker more that once within this timeframe loadBalancerSheddingGracePeriodMinutes=30 -# Usage threshold to determine a broker as under-loaded +# Usage threshold to determine a broker as under-loaded loadBalancerBrokerUnderloadedThresholdPercentage=1 # Usage threshold to determine a broker as over-loaded @@ -238,7 +241,7 @@ replicationMetricsEnabled=true # Max number of connections to open for each broker in a remote cluster # More connections host-to-host lead to better throughput over high-latency -# links. +# links. replicationConnectionsPerBroker=16 # Replicator producer queue size diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java index aa1068a..4a038a3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java @@ -24,17 +24,17 @@ import java.util.Objects; public class InternalConfigurationData { private String zookeeperServers; - private String globalZooKeeperServers; + private String configurationStoreServers; private String ledgersRootPath; public InternalConfigurationData() { } public InternalConfigurationData(String zookeeperServers, - String globalZooKeeperServers, + String configurationStoreServers, String ledgersRootPath) { this.zookeeperServers = zookeeperServers; - this.globalZooKeeperServers = globalZooKeeperServers; + this.configurationStoreServers = configurationStoreServers; this.ledgersRootPath = ledgersRootPath; } @@ -42,8 +42,8 @@ public class InternalConfigurationData { return zookeeperServers; } - public String getGlobalZooKeeperServers() { - return globalZooKeeperServers; + public String getConfigurationStoreServers() { + return configurationStoreServers; } public String getLedgersRootPath() { @@ -57,20 +57,20 @@ public class InternalConfigurationData { } InternalConfigurationData other = (InternalConfigurationData) obj; return Objects.equals(zookeeperServers, other.zookeeperServers) - && Objects.equals(globalZooKeeperServers, other.globalZooKeeperServers) + && Objects.equals(configurationStoreServers, other.configurationStoreServers) && Objects.equals(ledgersRootPath, other.ledgersRootPath); } @Override public int hashCode() { - return Objects.hash(zookeeperServers, globalZooKeeperServers, ledgersRootPath); + return Objects.hash(zookeeperServers, configurationStoreServers, ledgersRootPath); } @Override public String toString() { return MoreObjects.toStringHelper(this) .add("zookeeperServers", zookeeperServers) - .add("globalZooKeeperServers", globalZooKeeperServers) + .add("configurationStoreServers", configurationStoreServers) .add("ledgersRootPath", ledgersRootPath) .toString(); } diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java index 60a72c1..54463aa 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java @@ -72,7 +72,7 @@ public class BrokerDiscoveryProvider implements Closeable { localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(), config.getZookeeperSessionTimeoutMs()); globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(), - config.getGlobalZookeeperServers(), orderedExecutor, scheduledExecutorScheduler); + config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler); globalZkCache.start(); } catch (Exception e) { LOG.error("Failed to start Zookkeeper {}", e.getMessage(), e); diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarter.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarter.java index f36ccee..35af859 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarter.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarter.java @@ -40,6 +40,11 @@ import org.slf4j.LoggerFactory; */ public class DiscoveryServiceStarter { + public static void checkConfig(ServiceConfig config) { + checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided"); + checkArgument(!isEmpty(config.getConfigurationStoreServers()), "configuration-store Servers must be provided"); + } + public static void init(String configFile) throws Exception { // setup handlers removeHandlersForRootLogger(); @@ -50,14 +55,13 @@ public class DiscoveryServiceStarter { // load config file final ServiceConfig config = PulsarConfigurationLoader.create(configFile, ServiceConfig.class); - checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided"); - checkArgument(!isEmpty(config.getGlobalZookeeperServers()), "global-zookeeperServers must be provided"); - + checkConfig(config); + // create Discovery service DiscoveryService discoveryService = new DiscoveryService(config); // create a web-service final ServerManager server = new ServerManager(config); - + Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { @@ -69,7 +73,7 @@ public class DiscoveryServiceStarter { } } }); - + discoveryService.start(); startWebService(server, config); } diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java index c1d59ee..c024c19 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java @@ -36,7 +36,10 @@ public class ServiceConfig implements PulsarConfiguration { // Local-Zookeeper quorum connection string private String zookeeperServers; // Global-Zookeeper quorum connection string + @Deprecated private String globalZookeeperServers; + // Configuration Store connection string + private String configurationStoreServers; // ZooKeeper session timeout private int zookeeperSessionTimeoutMs = 30_000; @@ -91,7 +94,7 @@ public class ServiceConfig implements PulsarConfiguration { // Specify whether Client certificates are required for TLS // Reject the Connection if the Client Certificate is not trusted. private boolean tlsRequireTrustedClientCertOnConnect = false; - + private Properties properties = new Properties(); public String getZookeeperServers() { @@ -102,14 +105,24 @@ public class ServiceConfig implements PulsarConfiguration { this.zookeeperServers = zookeeperServers; } + @Deprecated public String getGlobalZookeeperServers() { return globalZookeeperServers; } + @Deprecated public void setGlobalZookeeperServers(String globalZookeeperServers) { this.globalZookeeperServers = globalZookeeperServers; } + public String getConfigurationStoreServers() { + return null == configurationStoreServers ? getGlobalZookeeperServers() : configurationStoreServers; + } + + public void setConfigurationStoreServers(String configurationStoreServers) { + this.configurationStoreServers = configurationStoreServers; + } + public int getZookeeperSessionTimeoutMs() { return zookeeperSessionTimeoutMs; } @@ -253,7 +266,7 @@ public class ServiceConfig implements PulsarConfiguration { public void setProperties(Properties properties) { this.properties = properties; } - + public Set<String> getTlsProtocols() { return tlsProtocols; } @@ -269,7 +282,7 @@ public class ServiceConfig implements PulsarConfiguration { public void setTlsCiphers(Set<String> tlsCiphers) { this.tlsCiphers = tlsCiphers; } - + public boolean getTlsRequireTrustedClientCertOnConnect() { return tlsRequireTrustedClientCertOnConnect; } diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceWebTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceWebTest.java index 48b69a1..3902431 100644 --- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceWebTest.java +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceWebTest.java @@ -27,9 +27,8 @@ import java.io.OutputStreamWriter; import java.io.PrintWriter; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; -import org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter; -import org.apache.pulsar.discovery.service.server.ServerManager; -import org.apache.pulsar.discovery.service.server.ServiceConfig; +import org.apache.pulsar.discovery.service.DiscoveryService; +import org.mockito.Mockito; import org.testng.annotations.Test; /** @@ -40,7 +39,7 @@ import org.testng.annotations.Test; */ public class DiscoveryServiceWebTest { - + @Test public void testWebDiscoveryServiceStarter() throws Exception { @@ -51,7 +50,7 @@ public class DiscoveryServiceWebTest { } PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile))); printWriter.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); - printWriter.println("globalZookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); + printWriter.println("configurationStoreServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); printWriter.println("webServicePort=" + port); printWriter.close(); testConfigFile.deleteOnExit(); @@ -63,4 +62,62 @@ public class DiscoveryServiceWebTest { testConfigFile.delete(); } + /** + * Test Configuration BackwardCompat for the change from globalzookeeper to configurationStore + */ + @Test + public void testConfigurationBackwardCompat() throws Exception { + DiscoveryService service = Mockito.mock(DiscoveryService.class); + + int port = nextFreePort(); + File testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties"); + if (testConfigFile.exists()) { + testConfigFile.delete(); + } + PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile))); + printWriter.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); + printWriter.println("globalZookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); + printWriter.println("webServicePort=" + port); + printWriter.close(); + testConfigFile.deleteOnExit(); + + ServiceConfig config = PulsarConfigurationLoader.create(testConfigFile.getAbsolutePath(), ServiceConfig.class); + // have zookeeperServers and globalZookeeperServers, config is valid + // should not throw IllegalArgumentException. + DiscoveryServiceStarter.checkConfig(config); + + + if (testConfigFile.exists()) { + testConfigFile.delete(); + } + PrintWriter printWriter2 = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile))); + printWriter2.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); + printWriter2.println("configurationStoreServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); + printWriter2.println("webServicePort=" + port); + printWriter2.close(); + config = PulsarConfigurationLoader.create(testConfigFile.getAbsolutePath(), ServiceConfig.class); + // have zookeeperServers and configurationStoreServers, config is valid + // should not throw IllegalArgumentException. + DiscoveryServiceStarter.checkConfig(config); + + + if (testConfigFile.exists()) { + testConfigFile.delete(); + } + PrintWriter printWriter3 = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile))); + printWriter3.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com"); + printWriter3.println("webServicePort=" + port); + printWriter3.close(); + config = PulsarConfigurationLoader.create(testConfigFile.getAbsolutePath(), ServiceConfig.class); + // only have zookeeperServers + // should throw IllegalArgumentException. + try { + DiscoveryServiceStarter.checkConfig(config); + } catch (IllegalArgumentException e) { + // expected: configure error + } + + testConfigFile.delete(); + } + } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java index d038381..56e6cc5 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java @@ -71,7 +71,7 @@ public class BrokerDiscoveryProvider implements Closeable { localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(), config.getZookeeperSessionTimeoutMs()); globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(), - config.getGlobalZookeeperServers(), orderedExecutor, scheduledExecutorScheduler); + config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler); globalZkCache.start(); } catch (Exception e) { LOG.error("Failed to start Zookkeeper {}", e.getMessage(), e); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 43b8d56..dcd9580 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -30,9 +30,13 @@ public class ProxyConfiguration implements PulsarConfiguration { // Local-Zookeeper quorum connection string private String zookeeperServers; + @Deprecated // Global-Zookeeper quorum connection string private String globalZookeeperServers; + // Configuration Store connection string + private String configurationStoreServers; + // ZooKeeper session timeout private int zookeeperSessionTimeoutMs = 30_000; @@ -49,7 +53,7 @@ public class ProxyConfiguration implements PulsarConfiguration { private int webServicePort = 8080; // Port to use to server HTTPS request private int webServicePortTls = 8443; - + // Path for the file used to determine the rotation status for the broker // when responding to service discovery health checks private String statusFilePath; @@ -142,14 +146,24 @@ public class ProxyConfiguration implements PulsarConfiguration { this.zookeeperServers = zookeeperServers; } + @Deprecated public String getGlobalZookeeperServers() { return globalZookeeperServers; } + @Deprecated public void setGlobalZookeeperServers(String globalZookeeperServers) { this.globalZookeeperServers = globalZookeeperServers; } + public String getConfigurationStoreServers() { + return null == configurationStoreServers ? getGlobalZookeeperServers() : configurationStoreServers; + } + + public void setConfigurationStoreServers(String configurationStoreServers) { + this.configurationStoreServers = configurationStoreServers; + } + public int getZookeeperSessionTimeoutMs() { return zookeeperSessionTimeoutMs; } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index e798077..48293be 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -119,7 +119,7 @@ public class ProxyService implements Closeable { ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(proxyConfig); authenticationService = new AuthenticationService(serviceConfiguration); - if (!isBlank(proxyConfig.getZookeeperServers()) && !isBlank(proxyConfig.getGlobalZookeeperServers())) { + if (!isBlank(proxyConfig.getZookeeperServers()) && !isBlank(proxyConfig.getConfigurationStoreServers())) { discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory()); this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache); authorizationService = new AuthorizationService(serviceConfiguration, configurationCacheService); @@ -214,4 +214,4 @@ public class ProxyService implements Closeable { } private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class); -} \ No newline at end of file +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 0738893..77fcd77 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -53,9 +53,14 @@ public class ProxyServiceStarter { @Parameter(names = { "-zk", "--zookeeper-servers" }, description = "Local zookeeper connection string") private String zookeeperServers = ""; + @Deprecated @Parameter(names = { "-gzk", "--global-zookeeper-servers" }, description = "Global zookeeper connection string") private String globalZookeeperServers = ""; + @Parameter(names = { "-cs", "--configuration-store-servers" }, + description = "Configuration store connection string") + private String configurationStoreServers = ""; + @Parameter(names = { "-h", "--help" }, description = "Show this help message") private boolean help = false; @@ -90,13 +95,17 @@ public class ProxyServiceStarter { if (!isBlank(globalZookeeperServers)) { // Use globalZookeeperServers from command line - config.setGlobalZookeeperServers(globalZookeeperServers); + config.setConfigurationStoreServers(globalZookeeperServers); + } + if (!isBlank(configurationStoreServers)) { + // Use configurationStoreServers from command line + config.setConfigurationStoreServers(configurationStoreServers); } if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS())) || config.isAuthorizationEnabled()) { checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided"); - checkArgument(!isEmpty(config.getGlobalZookeeperServers()), "globalZookeeperServers must be provided"); + checkArgument(!isEmpty(config.getConfigurationStoreServers()), "configurationStoreServers must be provided"); } java.security.Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java index ca72616..fba6010 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java @@ -117,7 +117,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase proxyConfig.setAuthenticationProviders(providers); proxyConfig.setZookeeperServers(DUMMY_VALUE); - proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE); + proxyConfig.setConfigurationStoreServers(DUMMY_VALUE); proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java index 1658e21..91e3523 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java @@ -47,7 +47,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest { proxyConfig.setServicePort(PortManager.nextFreePort()); proxyConfig.setZookeeperServers(DUMMY_VALUE); - proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE); + proxyConfig.setConfigurationStoreServers(DUMMY_VALUE); proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP); proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION); proxyService = Mockito.spy(new ProxyService(proxyConfig)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index 4411f80..07eb137 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -46,7 +46,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest { proxyConfig.setServicePort(PortManager.nextFreePort()); proxyConfig.setZookeeperServers(DUMMY_VALUE); - proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE); + proxyConfig.setConfigurationStoreServers(DUMMY_VALUE); proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP); proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION); proxyService = Mockito.spy(new ProxyService(proxyConfig)); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index 5c994b0..8647994 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -51,7 +51,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { proxyConfig.setServicePort(PortManager.nextFreePort()); proxyConfig.setZookeeperServers(DUMMY_VALUE); - proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE); + proxyConfig.setConfigurationStoreServers(DUMMY_VALUE); proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java index 5243b9e..a2332a8 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java @@ -60,7 +60,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest { proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH); proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH); proxyConfig.setZookeeperServers(DUMMY_VALUE); - proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE); + proxyConfig.setConfigurationStoreServers(DUMMY_VALUE); proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory(); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index b917dc2..6660cd8 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -99,9 +99,9 @@ public class WebSocketService implements Closeable { public void start() throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException, DeploymentException { - if (isNotBlank(config.getGlobalZookeeperServers())) { + if (isNotBlank(config.getConfigurationStoreServers())) { this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), - (int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(), + (int) config.getZooKeeperSessionTimeoutMillis(), config.getConfigurationStoreServers(), this.orderedExecutor, this.executor); try { this.globalZkCache.start(); @@ -116,7 +116,7 @@ public class WebSocketService implements Closeable { if (config.isAuthorizationEnabled()) { if (configurationCacheService == null) { throw new PulsarServerException( - "Failed to initialize authorization manager due to empty GlobalZookeeperServers"); + "Failed to initialize authorization manager due to empty ConfigurationStoreServers"); } authorizationService = new AuthorizationService(this.config, configurationCacheService); } @@ -218,7 +218,8 @@ public class WebSocketService implements Closeable { private ClusterData retrieveClusterData() throws PulsarServerException { if (configurationCacheService == null) { - throw new PulsarServerException("Failed to retrieve Cluster data due to empty GlobalZookeeperServers"); + throw new PulsarServerException( + "Failed to retrieve Cluster data due to empty ConfigurationStoreServers"); } try { String path = "/admin/clusters/" + config.getClusterName(); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java index 0126d49..b790723 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java @@ -40,18 +40,20 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration { @FieldContext(required = true) private String clusterName; - // Pulsar cluster url to connect to broker (optional if globalZookeeperServers present) + // Pulsar cluster url to connect to broker (optional if configurationStoreServers present) private String serviceUrl; private String serviceUrlTls; private String brokerServiceUrl; private String brokerServiceUrlTls; - + // Path for the file used to determine the rotation status for the broker // when responding to service discovery health checks private String statusFilePath; - // Global Zookeeper quorum connection string + // Configuration Store connection string + @Deprecated private String globalZookeeperServers; + private String configurationStoreServers; // Zookeeper session timeout in milliseconds private long zooKeeperSessionTimeoutMillis = 30000; @@ -159,14 +161,24 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration { this.statusFilePath = statusFilePath; } + @Deprecated public String getGlobalZookeeperServers() { return globalZookeeperServers; } + @Deprecated public void setGlobalZookeeperServers(String globalZookeeperServers) { this.globalZookeeperServers = globalZookeeperServers; } + public String getConfigurationStoreServers() { + return null == configurationStoreServers ? getGlobalZookeeperServers() : configurationStoreServers; + } + + public void setConfigurationStoreServers(String configurationStoreServers) { + this.configurationStoreServers = configurationStoreServers; + } + public long getZooKeeperSessionTimeoutMillis() { return zooKeeperSessionTimeoutMillis; } diff --git a/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperStarter.java b/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ConfigurationStoreStarter.java similarity index 86% rename from pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperStarter.java rename to pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ConfigurationStoreStarter.java index d70885b..6f488bb 100644 --- a/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperStarter.java +++ b/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ConfigurationStoreStarter.java @@ -21,10 +21,10 @@ package org.apache.pulsar.zookeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class GlobalZooKeeperStarter extends ZooKeeperStarter { +public class ConfigurationStoreStarter extends ZooKeeperStarter { public static void main(String[] args) throws Exception { start(args, "8001"); } - private static final Logger log = LoggerFactory.getLogger(GlobalZooKeeperStarter.class); + private static final Logger log = LoggerFactory.getLogger(ConfigurationStoreStarter.class); } diff --git a/site/docs/latest/deployment/cluster.md b/site/docs/latest/deployment/cluster.md index 0a23b0e..773cfa2 100644 --- a/site/docs/latest/deployment/cluster.md +++ b/site/docs/latest/deployment/cluster.md @@ -142,7 +142,7 @@ You can initialize this metadata using the [`initialize-cluster-metadata`](../.. $ bin/pulsar initialize-cluster-metadata \ --cluster pulsar-cluster-1 \ --zookeeper zk1.us-west.example.com:2181 \ - --global-zookeeper zk1.us-west.example.com:2181 \ + --configuration-store zk1.us-west.example.com:2181 \ --web-service-url http://pulsar.us-west.example.com:8080 \ --web-service-url-tls https://pulsar.us-west.example.com:8443 \ --broker-service-url pulsar://pulsar.us-west.example.com:6650 \ @@ -155,7 +155,7 @@ Flag | Description :----|:----------- `--cluster` | A name for the cluster `--zookeeper` | A "local" ZooKeeper connection string for the cluster. This connection string only needs to include *one* machine in the ZooKeeper cluster. -`--global-zookeeper` | The "global" ZooKeeper connection string for the entire instance. As with the `--zookeeper` flag, this connection string only needs to include *one* machine in the ZooKeeper cluster. +`--configuration-store` | The configuration store (ZooKeeper) where the configuration policies for all tenants and namespaces across all clusters will be stored. As with the `--zookeeper` flag, this connection string only needs to include *one* machine in the ZooKeeper cluster. `--web-service-url` | The web service URL for the cluster, plus a port. This URL should be a standard DNS name. The default port is 8080 (we don't recommend using a different port). `--web-service-url-tls` | If you're using [TLS](../../../admin/Authz#tls-client-auth), you'll also need to specify a TLS web service URL for the cluster. The default port is 8443 (we don't recommend using a different port). `--broker-service-url` | A broker service URL enabling interaction with the {% popover brokers %} in the cluster. This URL should use the same DNS name as the web service URL but should use the `pulsar` scheme instead. The default port is 6650 (we don't recommend using a different port). diff --git a/tests/docker-images/latest-version-image/conf/bookie.conf b/tests/docker-images/latest-version-image/conf/bookie.conf index 53fd06e..030d6ad 100644 --- a/tests/docker-images/latest-version-image/conf/bookie.conf +++ b/tests/docker-images/latest-version-image/conf/bookie.conf @@ -22,5 +22,5 @@ autostart=false redirect_stderr=true stdout_logfile=/var/log/pulsar/bookie.log directory=/pulsar +environment=PULSAR_MEM=-Xms128M command=/pulsar/bin/pulsar bookie - diff --git a/tests/docker-images/latest-version-image/conf/broker.conf b/tests/docker-images/latest-version-image/conf/broker.conf index ca8843f..5492abf 100644 --- a/tests/docker-images/latest-version-image/conf/broker.conf +++ b/tests/docker-images/latest-version-image/conf/broker.conf @@ -22,5 +22,6 @@ autostart=false redirect_stderr=true stdout_logfile=/var/log/pulsar/broker.log directory=/pulsar +environment=PULSAR_MEM=-Xms128M command=/pulsar/bin/pulsar broker diff --git a/tests/docker-images/latest-version-image/conf/global-zk.conf b/tests/docker-images/latest-version-image/conf/global-zk.conf index f589ade..5c6edaa 100644 --- a/tests/docker-images/latest-version-image/conf/global-zk.conf +++ b/tests/docker-images/latest-version-image/conf/global-zk.conf @@ -22,5 +22,6 @@ autostart=false redirect_stderr=true stdout_logfile=/var/log/pulsar/global-zk.log directory=/pulsar -command=/pulsar/bin/pulsar global-zookeeper +environment=PULSAR_MEM=-Xms128M +command=/pulsar/bin/pulsar configuration-store diff --git a/tests/docker-images/latest-version-image/conf/local-zk.conf b/tests/docker-images/latest-version-image/conf/local-zk.conf index f5daba0..2822cb1 100644 --- a/tests/docker-images/latest-version-image/conf/local-zk.conf +++ b/tests/docker-images/latest-version-image/conf/local-zk.conf @@ -22,5 +22,6 @@ autostart=false redirect_stderr=true stdout_logfile=/var/log/pulsar/local-zk.log directory=/pulsar +environment=PULSAR_MEM=-Xms128M command=/pulsar/bin/pulsar zookeeper diff --git a/tests/docker-images/latest-version-image/conf/proxy.conf b/tests/docker-images/latest-version-image/conf/proxy.conf index 7ab7e8f..359e657 100644 --- a/tests/docker-images/latest-version-image/conf/proxy.conf +++ b/tests/docker-images/latest-version-image/conf/proxy.conf @@ -21,6 +21,8 @@ autostart=false redirect_stderr=true stdout_logfile=/var/log/pulsar/proxy.log +stderr_logfile=/var/log/pulsar/proxy-stderr.log directory=/pulsar +environment=PULSAR_MEM=-Xms128M command=/pulsar/bin/pulsar proxy diff --git a/tests/docker-images/latest-version-image/scripts/init-cluster.sh b/tests/docker-images/latest-version-image/scripts/init-cluster.sh index e4403b3..d70e257 100755 --- a/tests/docker-images/latest-version-image/scripts/init-cluster.sh +++ b/tests/docker-images/latest-version-image/scripts/init-cluster.sh @@ -27,7 +27,7 @@ if [ $? != 0 ]; then echo Initializing cluster bin/apply-config-from-env.py conf/bookkeeper.conf && bin/pulsar initialize-cluster-metadata --cluster $cluster --zookeeper $zkServers \ - --global-zookeeper $globalZkServers --web-service-url http://$pulsarNode:8080/ \ + --configuration-store $configurationStore --web-service-url http://$pulsarNode:8080/ \ --broker-service-url http://$pulsarNode:6650/ && bin/watch-znode.py -z $zkServers -p $ZNODE -c echo Initialized diff --git a/tests/integration-tests-base/pom.xml b/tests/integration-tests-base/pom.xml index 6017744..37b8f70 100644 --- a/tests/integration-tests-base/pom.xml +++ b/tests/integration-tests-base/pom.xml @@ -31,7 +31,6 @@ <version>2.0.0-incubating-SNAPSHOT</version> </parent> - <groupId>org.apache.pulsar.tests</groupId> <artifactId>integration-tests-base</artifactId> <packaging>pom</packaging> diff --git a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker-unstarted.yaml b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker-unstarted.yaml index a77d34d..d2c34cc 100644 --- a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker-unstarted.yaml +++ b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker-unstarted.yaml @@ -37,17 +37,17 @@ zookeeper*: strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction networkMode: pulsarnet* -global-zookeeper*: +configuration-store*: image: apachepulsar/pulsar-test-latest-version:latest await: strategy: org.apache.pulsar.tests.NoopAwaitStrategy - env: [ZOOKEEPER_SERVERS=zookeeper] + env: [ZOOKEEPER_SERVERS=configuration-store] labels: cluster: test - service: global-zookeeper + service: configuration-store entryPoint: [bin/run-global-zk.sh] aliases: - - global-zookeeper + - configuration-store beforeStop: - customBeforeStopAction: strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction @@ -60,7 +60,7 @@ init*: env: - cluster=test - zkServers=zookeeper - - globalZkServers=global-zookeeper:2184 + - configurationStore=configuration-store:2184 - pulsarNode=pulsar-broker1 labels: cluster: test @@ -127,7 +127,7 @@ pulsar-broker1*: - pulsar-broker1 env: - zookeeperServers=zookeeper - - globalZookeeperServers=global-zookeeper:2184 + - configurationStoreServers=configuration-store:2184 - clusterName=test - NO_AUTOSTART=true labels: @@ -147,7 +147,7 @@ pulsar-broker2*: - pulsar-broker2 env: - zookeeperServers=zookeeper - - globalZookeeperServers=global-zookeeper:2184 + - configurationStoreServers=configuration-store:2184 - clusterName=test - NO_AUTOSTART=true labels: @@ -167,7 +167,7 @@ pulsar-proxy*: - pulsar-broker2 env: - zookeeperServers=zookeeper - - globalZookeeperServers=global-zookeeper:2184 + - configurationStoreServers=configuration-store:2184 - clusterName=test - NO_AUTOSTART=true labels: diff --git a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml index dd12abd..3695f2a 100644 --- a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml +++ b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml @@ -37,17 +37,17 @@ zookeeper*: strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction networkMode: pulsarnet* -global-zookeeper*: +configuration-store*: image: apachepulsar/pulsar-test-latest-version:latest await: strategy: org.apache.pulsar.tests.NoopAwaitStrategy - env: [ZOOKEEPER_SERVERS=zookeeper] + env: [ZOOKEEPER_SERVERS=configuration-store] labels: cluster: test - service: global-zookeeper + service: configuration-store entryPoint: [bin/run-global-zk.sh] aliases: - - global-zookeeper + - configuration-store beforeStop: - customBeforeStopAction: strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction @@ -60,7 +60,7 @@ init*: env: - cluster=test - zkServers=zookeeper - - globalZkServers=global-zookeeper:2184 + - configurationStore=configuration-store:2184 - pulsarNode=pulsar-broker1 labels: cluster: test @@ -127,7 +127,7 @@ pulsar-broker1*: - pulsar-broker1 env: - zookeeperServers=zookeeper - - globalZookeeperServers=global-zookeeper:2184 + - configurationStoreServers=configuration-store:2184 - clusterName=test labels: cluster: test @@ -146,7 +146,7 @@ pulsar-broker2*: - pulsar-broker2 env: - zookeeperServers=zookeeper - - globalZookeeperServers=global-zookeeper:2184 + - configurationStoreServers=configuration-store:2184 - clusterName=test labels: cluster: test @@ -165,7 +165,7 @@ pulsar-proxy*: - pulsar-broker2 env: - zookeeperServers=zookeeper - - globalZookeeperServers=global-zookeeper:2184 + - configurationStoreServers=configuration-store:2184 - clusterName=test labels: cluster: test diff --git a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java index fb69dc7..b815b51 100644 --- a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java +++ b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java @@ -183,6 +183,7 @@ public class DockerUtils { } int retCode = resp.getExitCode(); if (retCode != 0) { + LOG.error("DOCKER.exec({}:{}): failed with {} : {}", containerId, cmdString, retCode, output); throw new RuntimeException( String.format("cmd(%s) failed on %s with exitcode %d", cmdString, containerId, retCode)); diff --git a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java index ff82b8d..984dadc 100644 --- a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java +++ b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java @@ -41,6 +41,9 @@ public class PulsarClusterUtils { private static final Logger LOG = LoggerFactory.getLogger(PulsarClusterUtils.class); static final short BROKER_PORT = 8080; + public static final String PULSAR_ADMIN = "/pulsar/bin/pulsar-admin"; + public static final String PULSAR = "/pulsar/bin/pulsar"; + public static String zookeeperConnectString(DockerClient docker, String cluster) { return DockerUtils.cubeIdsWithLabels(docker, ImmutableMap.of("service", "zookeeper", "cluster", cluster)) .stream().map((id) -> DockerUtils.getContainerIP(docker, id)).collect(Collectors.joining(":")); @@ -166,6 +169,7 @@ public class PulsarClusterUtils { return true; } catch (Exception e) { // couldn't connect, try again after sleep + LOG.info("Failed to connect {} @ {}", ip, BROKER_PORT, e); } try { Thread.sleep(pollMillis); diff --git a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java index bd84913..5570f05 100644 --- a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java +++ b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java @@ -51,14 +51,14 @@ public class TestCompaction extends Arquillian { @Test public void testPublishCompactAndConsumeCLI() throws Exception { PulsarClusterUtils.runOnAnyBroker(docker, clusterName, - "/pulsar/bin/pulsar-admin", "tenants", + PulsarClusterUtils.PULSAR_ADMIN, "tenants", "create", "compaction-test-cli", "--allowed-clusters", clusterName, "--admin-roles", "admin"); PulsarClusterUtils.runOnAnyBroker(docker, clusterName, - "/pulsar/bin/pulsar-admin", "namespaces", + PulsarClusterUtils.PULSAR_ADMIN, "namespaces", "create", "compaction-test-cli/ns1"); PulsarClusterUtils.runOnAnyBroker(docker, clusterName, - "/pulsar/bin/pulsar-admin", "namespaces", + PulsarClusterUtils.PULSAR_ADMIN, "namespaces", "set-clusters", "--clusters", "test", "compaction-test-cli/ns1"); String brokerIp = DockerUtils.getContainerIP( @@ -86,7 +86,7 @@ public class TestCompaction extends Arquillian { } PulsarClusterUtils.runOnAnyBroker(docker, clusterName, - "/pulsar/bin/pulsar", "compact-topic", + PulsarClusterUtils.PULSAR, "compact-topic", "-t", topic); try (Consumer<byte[]> consumer = client.newConsumer().topic(topic) @@ -101,14 +101,14 @@ public class TestCompaction extends Arquillian { @Test public void testPublishCompactAndConsumeRest() throws Exception { PulsarClusterUtils.runOnAnyBroker(docker, clusterName, - "/pulsar/bin/pulsar-admin", "tenants", + PulsarClusterUtils.PULSAR_ADMIN, "tenants", "create", "compaction-test-rest", "--allowed-clusters", clusterName, "--admin-roles", "admin"); PulsarClusterUtils.runOnAnyBroker(docker, clusterName, - "/pulsar/bin/pulsar-admin", "namespaces", + PulsarClusterUtils.PULSAR_ADMIN, "namespaces", "create", "compaction-test-rest/ns1"); PulsarClusterUtils.runOnAnyBroker(docker, clusterName, - "/pulsar/bin/pulsar-admin", "namespaces", + PulsarClusterUtils.PULSAR_ADMIN, "namespaces", "set-clusters", "--clusters", "test", "compaction-test-rest/ns1"); String brokerIp = DockerUtils.getContainerIP( @@ -135,7 +135,7 @@ public class TestCompaction extends Arquillian { Assert.assertEquals(m.getData(), "content1".getBytes()); } PulsarClusterUtils.runOnAnyBroker(docker, clusterName, - "/pulsar/bin/pulsar-admin", "persistent", "compact", topic); + PulsarClusterUtils.PULSAR_ADMIN, "persistent", "compact", topic); PulsarClusterUtils.runOnAnyBroker(docker, clusterName, "/pulsar/bin/pulsar-admin", "persistent", "compaction-status", diff --git a/tests/integration/smoke/src/test/resources/arquillian.xml b/tests/integration/smoke/src/test/resources/arquillian.xml index ced9de1..73a5908 100644 --- a/tests/integration/smoke/src/test/resources/arquillian.xml +++ b/tests/integration/smoke/src/test/resources/arquillian.xml @@ -25,6 +25,7 @@ http://jboss.org/schema/arquillian/arquillian_1_0.xsd"> <extension qualifier="docker"> + <property name="serverUri">unix:///var/run/docker.sock</property> <property name="definitionFormat">CUBE</property> <property name="dockerContainersResource">cube-definitions/single-cluster-3-bookie-2-broker-unstarted.yaml</property> </extension> -- To stop receiving notification emails like this one, please contact mme...@apache.org.