This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 208ca5c6fbe79b73c5f4b5d744a8d40694a6175f
Author: Tboy <[email protected]>
AuthorDate: Wed Jul 7 07:45:03 2021 +0800

    Fix init WebSocketService with ClusterData (#11234)
    
    1. Fix init WebSocketService with ClusterData.
    2. Refactor ProxyServiceStarter to be easier for test.
    3. Add test case for ProxyServiceStarter.
    
    (cherry picked from commit 93e145bea96c88a24d33cf65e657e86ac4e4c491)
---
 .../pulsar/proxy/server/ProxyServiceStarter.java   | 117 ++++++----
 .../proxy/server/ProxyServiceStarterTest.java      | 160 ++++++++++++++
 pulsar-proxy/src/test/resources/proxy.conf         | 236 +++++++++++++++++++++
 3 files changed, 470 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 3e79fe2..c34f8cc 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -21,14 +21,17 @@ package org.apache.pulsar.proxy.server;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import static org.slf4j.bridge.SLF4JBridgeHandler.install;
 import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import 
org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.websocket.WebSocketConsumerServlet;
 import org.apache.pulsar.websocket.WebSocketProducerServlet;
 import org.apache.pulsar.websocket.WebSocketReaderServlet;
@@ -81,6 +84,8 @@ public class ProxyServiceStarter {
     @Parameter(names = { "-h", "--help" }, description = "Show this help 
message")
     private boolean help = false;
 
+    private ProxyConfiguration config;
+
     public ProxyServiceStarter(String[] args) throws Exception {
         try {
 
@@ -107,7 +112,7 @@ public class ProxyServiceStarter {
             }
 
             // load config file
-            final ProxyConfiguration config = 
PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class);
+            config = PulsarConfigurationLoader.create(configFile, 
ProxyConfiguration.class);
 
             if (!isBlank(zookeeperServers)) {
                 // Use zookeeperServers from command line
@@ -135,55 +140,58 @@ public class ProxyServiceStarter {
                 checkArgument(!isEmpty(config.getZookeeperServers()), 
"zookeeperServers must be provided");
             }
 
-            AuthenticationService authenticationService = new 
AuthenticationService(
-                    PulsarConfigurationLoader.convertFrom(config));
-            // create proxy service
-            ProxyService proxyService = new ProxyService(config, 
authenticationService);
-            // create a web-service
-            final WebServer server = new WebServer(config, 
authenticationService);
-
-            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-                try {
-                    proxyService.close();
-                    server.stop();
-                } catch (Exception e) {
-                    log.warn("server couldn't stop gracefully {}", 
e.getMessage(), e);
-                }
-            }));
+        } catch (Exception e) {
+            log.error("Failed to start pulsar proxy service. error msg " + 
e.getMessage(), e);
+            throw new PulsarServerException(e);
+        }
+    }
 
-            proxyService.start();
+    public static void main(String[] args) throws Exception {
+        ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args);
+        serviceStarter.start();
+    }
 
-            // Setup metrics
-            DefaultExports.initialize();
+    public void start() throws Exception {
+        AuthenticationService authenticationService = new 
AuthenticationService(
+                PulsarConfigurationLoader.convertFrom(config));
+        // create proxy service
+        ProxyService proxyService = new ProxyService(config, 
authenticationService);
+        // create a web-service
+        final WebServer server = new WebServer(config, authenticationService);
 
-            // Report direct memory from Netty counters
-            Gauge.build("jvm_memory_direct_bytes_used", 
"-").create().setChild(new Child() {
-                @Override
-                public double get() {
-                    return getJvmDirectMemoryUsed();
-                }
-            }).register(CollectorRegistry.defaultRegistry);
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+                proxyService.close();
+                server.stop();
+            } catch (Exception e) {
+                log.warn("server couldn't stop gracefully {}", e.getMessage(), 
e);
+            }
+        }));
 
-            Gauge.build("jvm_memory_direct_bytes_max", 
"-").create().setChild(new Child() {
-                @Override
-                public double get() {
-                    return PlatformDependent.maxDirectMemory();
-                }
-            }).register(CollectorRegistry.defaultRegistry);
+        proxyService.start();
 
-            addWebServerHandlers(server, config, proxyService, 
proxyService.getDiscoveryProvider());
+        // Setup metrics
+        DefaultExports.initialize();
 
-            // start web-service
-            server.start();
+        // Report direct memory from Netty counters
+        Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new 
Child() {
+            @Override
+            public double get() {
+                return getJvmDirectMemoryUsed();
+            }
+        }).register(CollectorRegistry.defaultRegistry);
 
-        } catch (Exception e) {
-            log.error("Failed to start pulsar proxy service. error msg " + 
e.getMessage(), e);
-            throw new PulsarServerException(e);
-        }
-    }
+        Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new 
Child() {
+            @Override
+            public double get() {
+                return PlatformDependent.maxDirectMemory();
+            }
+        }).register(CollectorRegistry.defaultRegistry);
 
-    public static void main(String[] args) throws Exception {
-        new ProxyServiceStarter(args);
+        addWebServerHandlers(server, config, proxyService, 
proxyService.getDiscoveryProvider());
+
+        // start web-service
+        server.start();
     }
 
     public static void addWebServerHandlers(WebServer server,
@@ -224,7 +232,7 @@ public class ProxyServiceStarter {
         if (config.isWebSocketServiceEnabled()) {
             // add WebSocket servlet
             // Use local broker address to avoid different IP address when 
using a VIP for service discovery
-            WebSocketService webSocketService = new WebSocketService(null, 
PulsarConfigurationLoader.convertFrom(config));
+            WebSocketService webSocketService = new 
WebSocketService(createClusterData(config), 
PulsarConfigurationLoader.convertFrom(config));
             webSocketService.start();
             final WebSocketServlet producerWebSocketServlet = new 
WebSocketProducerServlet(webSocketService);
             server.addServlet(WebSocketProducerServlet.SERVLET_PATH,
@@ -246,6 +254,29 @@ public class ProxyServiceStarter {
         }
     }
 
+    private static ClusterData createClusterData(ProxyConfiguration config) {
+        if (isNotBlank(config.getBrokerServiceURL()) || 
isNotBlank(config.getBrokerServiceURLTLS())) {
+            return ClusterData.builder()
+                    .serviceUrl(config.getBrokerWebServiceURL())
+                    .serviceUrlTls(config.getBrokerWebServiceURLTLS())
+                    .brokerServiceUrl(config.getBrokerServiceURL())
+                    .brokerServiceUrlTls(config.getBrokerServiceURLTLS())
+                    .build();
+        } else if (isNotBlank(config.getBrokerWebServiceURL()) || 
isNotBlank(config.getBrokerWebServiceURLTLS())) {
+            return ClusterData.builder()
+                    .serviceUrl(config.getBrokerWebServiceURL())
+                    .serviceUrlTls(config.getBrokerWebServiceURLTLS())
+                    .build();
+        } else {
+            return null;
+        }
+    }
+
+    @VisibleForTesting
+    public ProxyConfiguration getConfig() {
+        return config;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ProxyServiceStarter.class);
 
 }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
new file mode 100644
index 0000000..63d7e39
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import lombok.Cleanup;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Base64;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Future;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        internalSetup();
+        String[] args = new String[]{"-c", "./src/test/resources/proxy.conf"};
+        ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args);
+        
serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
+        serviceStarter.getConfig().setServicePort(Optional.of(11000));
+        serviceStarter.getConfig().setWebSocketServiceEnabled(true);
+        serviceStarter.start();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        internalCleanup();
+    }
+
+    @Test
+    public void testEnableWebSocketServer() throws Exception {
+        HttpClient httpClient = new HttpClient();
+        WebSocketClient webSocketClient = new WebSocketClient(httpClient);
+        webSocketClient.start();
+        MyWebSocket myWebSocket = new MyWebSocket();
+        String webSocketUri = "ws://localhost:8080/ws/pingpong";
+        Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, 
URI.create(webSocketUri));
+        
sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
+        assertTrue(myWebSocket.getResponse().contains("ping"));
+    }
+
+    @Test
+    public void testProducer() throws Exception {
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:11000")
+                .build();
+
+        @Cleanup
+        Producer<byte[]> producer = client.newProducer()
+                .topic("persistent://sample/test/local/websocket-topic")
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test".getBytes());
+        }
+    }
+
+    @Test
+    public void testProduceAndConsumeMessageWithWebsocket() throws Exception {
+        HttpClient producerClient = new HttpClient();
+        WebSocketClient producerWebSocketClient = new 
WebSocketClient(producerClient);
+        producerWebSocketClient.start();
+        MyWebSocket producerSocket = new MyWebSocket();
+        String produceUri = 
"ws://localhost:8080/ws/producer/persistent/sample/test/local/websocket-topic";
+        Future<Session> producerSession = 
producerWebSocketClient.connect(producerSocket, URI.create(produceUri));
+
+        ProducerMessage produceRequest = new ProducerMessage();
+        produceRequest.setContext("context");
+        produceRequest.setPayload(Base64.getEncoder().encodeToString("my 
payload".getBytes()));
+
+        HttpClient consumerClient = new HttpClient();
+        WebSocketClient consumerWebSocketClient = new 
WebSocketClient(consumerClient);
+        consumerWebSocketClient.start();
+        MyWebSocket consumerSocket = new MyWebSocket();
+        String consumeUri = 
"ws://localhost:8080/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
+        Future<Session> consumerSession = 
consumerWebSocketClient.connect(consumerSocket, URI.create(consumeUri));
+        
consumerSession.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
+        
producerSession.get().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(produceRequest));
+        assertTrue(consumerSocket.getResponse().contains("ping"));
+        ProducerMessage message = 
ObjectMapperFactory.getThreadLocal().readValue(consumerSocket.getResponse(), 
ProducerMessage.class);
+        assertEquals(new 
String(Base64.getDecoder().decode(message.getPayload())), "my payload");
+    }
+
+    @WebSocket
+    public static class MyWebSocket extends WebSocketAdapter implements 
WebSocketPingPongListener {
+
+        ArrayBlockingQueue<String> incomingMessages = new 
ArrayBlockingQueue<>(10);
+
+        @Override
+        public void onWebSocketText(String message) {
+            incomingMessages.add(message);
+        }
+
+        @Override
+        public void onWebSocketClose(int i, String s) {
+        }
+
+        @Override
+        public void onWebSocketConnect(Session session) {
+        }
+
+        @Override
+        public void onWebSocketError(Throwable throwable) {
+        }
+
+        @Override
+        public void onWebSocketPing(ByteBuffer payload) {
+        }
+
+        @Override
+        public void onWebSocketPong(ByteBuffer payload) {
+            incomingMessages.add(BufferUtil.toDetailString(payload));
+        }
+
+        public String getResponse() throws InterruptedException {
+            return incomingMessages.take();
+        }
+    }
+
+}
diff --git a/pulsar-proxy/src/test/resources/proxy.conf 
b/pulsar-proxy/src/test/resources/proxy.conf
new file mode 100644
index 0000000..b5ed33f
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/proxy.conf
@@ -0,0 +1,236 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+### --- Broker Discovery --- ###
+
+# The ZooKeeper quorum connection string (as a comma-separated list)
+zookeeperServers=
+
+# Configuration store connection string (as a comma-separated list)
+configurationStoreServers=
+
+# if Service Discovery is Disabled this url should point to the discovery 
service provider.
+brokerServiceURL=pulsar://0.0.0.0:0
+brokerServiceURLTLS=
+
+# These settings are unnecessary if `zookeeperServers` is specified
+brokerWebServiceURL=http://0.0.0.0:0
+brokerWebServiceURLTLS=
+
+# If function workers are setup in a separate cluster, configure the following 
2 settings
+# to point to the function workers cluster
+functionWorkerWebServiceURL=
+functionWorkerWebServiceURLTLS=
+
+# ZooKeeper session timeout (in milliseconds)
+zookeeperSessionTimeoutMs=30000
+
+# ZooKeeper cache expiry time in seconds
+zooKeeperCacheExpirySeconds=300
+
+### --- Server --- ###
+
+# Hostname or IP address the service binds on, default is 0.0.0.0.
+bindAddress=0.0.0.0
+
+# Hostname or IP address the service advertises to the outside world.
+# If not set, the value of `InetAddress.getLocalHost().getHostname()` is used.
+advertisedAddress=
+
+# Enable or disable the HAProxy protocol.
+haProxyProtocolEnabled=false
+
+# The port to use for server binary Protobuf requests
+servicePort=6650
+
+# The port to use to server binary Protobuf TLS requests
+servicePortTls=
+
+# Port that discovery service listen on
+webServicePort=8080
+
+# Port to use to server HTTPS request
+webServicePortTls=
+
+# Path for the file used to determine the rotation status for the proxy 
instance when responding
+# to service discovery health checks
+statusFilePath=
+
+# Proxy log level, default is 0.
+# 0: Do not log any tcp channel info
+# 1: Parse and log any tcp channel info and command info without message body
+# 2: Parse and log channel info, command info and message body
+proxyLogLevel=0
+
+### ---Authorization --- ###
+
+# Role names that are treated as "super-users," meaning that they will be able 
to perform all admin
+# operations and publish/consume to/from all topics (as a comma-separated list)
+superUserRoles=
+
+# Whether authorization is enforced by the Pulsar proxy
+authorizationEnabled=false
+
+# Authorization provider as a fully qualified class name
+authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
+# Whether client authorization credentials are forwared to the broker for 
re-authorization.
+# Authentication must be enabled via authenticationEnabled=true for this to 
take effect.
+forwardAuthorizationCredentials=false
+
+### --- Authentication --- ###
+
+# Whether authentication is enabled for the Pulsar proxy
+authenticationEnabled=false
+
+# Authentication provider name list (a comma-separated list of class names)
+authenticationProviders=
+
+# When this parameter is not empty, unauthenticated users perform as 
anonymousUserRole
+anonymousUserRole=
+
+### --- Client Authentication --- ###
+
+# The three brokerClient* authentication settings below are for the proxy 
itself and determine how it
+# authenticates with Pulsar brokers
+
+# The authentication plugin used by the Pulsar proxy to authenticate with 
Pulsar brokers
+brokerClientAuthenticationPlugin=
+
+# The authentication parameters used by the Pulsar proxy to authenticate with 
Pulsar brokers
+brokerClientAuthenticationParameters=
+
+# The path to trusted certificates used by the Pulsar proxy to authenticate 
with Pulsar brokers
+brokerClientTrustCertsFilePath=
+
+# Whether TLS is enabled when communicating with Pulsar brokers
+tlsEnabledWithBroker=false
+
+# Tls cert refresh duration in seconds (set 0 to check on every new connection)
+tlsCertRefreshCheckDurationSec=300
+
+##### --- Rate Limiting --- #####
+
+# Max concurrent inbound connections. The proxy will reject requests beyond 
that.
+maxConcurrentInboundConnections=10000
+
+# Max concurrent outbound connections. The proxy will error out requests 
beyond that.
+maxConcurrentLookupRequests=50000
+
+##### --- TLS --- #####
+
+# Deprecated - use servicePortTls and webServicePortTls instead
+tlsEnabledInProxy=false
+
+# Path for the TLS certificate file
+tlsCertificateFilePath=
+
+# Path for the TLS private key file
+tlsKeyFilePath=
+
+# Path for the trusted TLS certificate file.
+# This cert is used to verify that any certs presented by connecting clients
+# are signed by a certificate authority. If this verification
+# fails, then the certs are untrusted and the connections are dropped.
+tlsTrustCertsFilePath=
+
+# Accept untrusted TLS certificate from client.
+# If true, a client with a cert which cannot be verified with the
+# 'tlsTrustCertsFilePath' cert will allowed to connect to the server,
+# though the cert will not be used for client authentication.
+tlsAllowInsecureConnection=false
+
+# Whether the hostname is validated when the proxy creates a TLS connection 
with brokers
+tlsHostnameVerificationEnabled=false
+
+# Specify the tls protocols the broker will use to negotiate during TLS 
handshake
+# (a comma-separated list of protocol names).
+# Examples:- [TLSv1.3, TLSv1.2]
+tlsProtocols=
+
+# Specify the tls cipher the broker will use to negotiate during TLS Handshake
+# (a comma-separated list of ciphers).
+# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
+tlsCiphers=
+
+# Whether client certificates are required for TLS. Connections are rejected 
if the client
+# certificate isn't trusted.
+tlsRequireTrustedClientCertOnConnect=false
+
+##### --- HTTP --- #####
+
+# Http directs to redirect to non-pulsar services.
+httpReverseProxyConfigs=
+
+# Http output buffer size. The amount of data that will be buffered for http 
requests
+# before it is flushed to the channel. A larger buffer size may result in 
higher http throughput
+# though it may take longer for the client to see data.
+# If using HTTP streaming via the reverse proxy, this should be set to the 
minimum value, 1,
+# so that clients see the data as soon as possible.
+httpOutputBufferSize=32768
+
+# Number of threads to use for HTTP requests processing. Default is
+# 2 * Runtime.getRuntime().availableProcessors()
+httpNumThreads=
+
+# Enable the enforcement of limits on the incoming HTTP requests
+httpRequestsLimitEnabled=false
+
+# Max HTTP requests per seconds allowed. The excess of requests will be 
rejected with HTTP code 429 (Too many requests)
+httpRequestsMaxPerSecond=100.0
+
+
+### --- Token Authentication Provider --- ###
+
+## Symmetric key
+# Configure the secret key to be used to validate auth tokens
+# The key can be specified like:
+# tokenSecretKey=data:;base64,xxxxxxxxx
+# tokenSecretKey=file:///my/secret.key    ( Note: key file must be DER-encoded 
)
+tokenSecretKey=
+
+## Asymmetric public/private key pair
+# Configure the public key to be used to validate auth tokens
+# The key can be specified like:
+# tokenPublicKey=data:;base64,xxxxxxxxx
+# tokenPublicKey=file:///my/public.key    ( Note: key file must be DER-encoded 
)
+tokenPublicKey=
+
+# The token "claim" that will be interpreted as the authentication "role" or 
"principal" by AuthenticationProviderToken (defaults to "sub" if blank)
+tokenAuthClaim=
+
+# The token audience "claim" name, e.g. "aud", that will be used to get the 
audience from token.
+# If not set, audience will not be verified.
+tokenAudienceClaim=
+
+# The token audience stands for this broker. The field `tokenAudienceClaim` of 
a valid token, need contains this.
+tokenAudience=
+
+### --- WebSocket config variables --- ###
+
+# Enable or disable the WebSocket servlet.
+webSocketServiceEnabled=false
+
+# Name of the cluster to which this broker belongs to
+clusterName=
+
+### --- Deprecated config variables --- ###
+
+# Deprecated. Use configurationStoreServers
+globalZookeeperServers=

Reply via email to