This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fb61a0f Make broker module gradually conform the checkstyle (#8592)
fb61a0f is described below
commit fb61a0f6e07c7bdf7b7a96885bd6f87deb2709ac
Author: Renkai <[email protected]>
AuthorDate: Tue Dec 1 12:29:26 2020 +0800
Make broker module gradually conform the checkstyle (#8592)
---
.../resources/pulsar/checkstyle-pulsar-broker.xml | 151 ---------------------
.../src/main/resources/pulsar/suppressions.xml | 6 +
pulsar-broker/pom.xml | 2 +-
.../org/apache/pulsar/PulsarBrokerStarter.java | 44 +++---
.../apache/pulsar/PulsarClusterMetadataSetup.java | 60 ++++----
.../pulsar/PulsarClusterMetadataTeardown.java | 5 +-
.../java/org/apache/pulsar/PulsarStandalone.java | 19 ++-
.../org/apache/pulsar/PulsarStandaloneStarter.java | 5 +-
.../PulsarTransactionCoordinatorMetadataSetup.java | 2 +-
.../pulsar/ZookeeperSessionExpiredHandlers.java | 5 +-
.../pulsar/broker/admin/impl/NamespacesBase.java | 2 +-
.../apache/pulsar/broker/admin/v1/Namespaces.java | 2 +-
.../org/apache/pulsar/client/api/RawReader.java | 8 +-
.../pulsar/client/impl/RawBatchConverter.java | 17 ++-
.../apache/pulsar/client/impl/RawReaderImpl.java | 10 +-
.../common/naming/NamespaceBundleFactory.java | 13 +-
.../naming/NamespaceBundleSplitAlgorithm.java | 18 +--
.../pulsar/common/naming/NamespaceBundles.java | 2 +-
.../RangeEquallyDivideBundleSplitAlgorithm.java | 4 +-
.../pulsar/utils/auth/tokens/TokensCliUtils.java | 81 ++++++-----
.../apache/pulsar/broker/admin/AdminApiTest.java | 6 +-
.../broker/namespace/NamespaceServiceTest.java | 10 +-
22 files changed, 188 insertions(+), 284 deletions(-)
diff --git a/buildtools/src/main/resources/pulsar/checkstyle-pulsar-broker.xml
b/buildtools/src/main/resources/pulsar/checkstyle-pulsar-broker.xml
deleted file mode 100644
index 77f901a..0000000
--- a/buildtools/src/main/resources/pulsar/checkstyle-pulsar-broker.xml
+++ /dev/null
@@ -1,151 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<!DOCTYPE module PUBLIC
- "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
- "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
-
-<!-- This is a checkstyle configuration file. For descriptions of
-what the following rules do, please see the checkstyle configuration
-page at http://checkstyle.sourceforge.net/config.html -->
-
-<module name="Checker">
- <!-- TODO: gradually make this file same as checkstyle.xml then remove
it -->
-
- <module name="SuppressWarningsFilter"/>
-
- <module name="FileTabCharacter">
- <!-- Checks that there are no tab characters in the file. -->
- </module>
-
- <module name="RegexpSingleline">
- <!-- Checks that TODOs don't have stuff in parenthesis, e.g.,
username. -->
- <property name="format" value="((//.*)|(\*.*))TODO\("/>
- <property name="message" value="TODO comments must not include
usernames."/>
- <property name="severity" value="error"/>
- </module>
-
- <module name="RegexpSingleline">
- <property name="format" value="\s+$"/>
- <property name="message" value="Trailing whitespace"/>
- <property name="severity" value="error"/>
- </module>
-
- <module name="RegexpSingleline">
- <property name="format" value="Throwables.propagate\("/>
- <property name="message" value="Throwables.propagate is deprecated"/>
- <property name="severity" value="error"/>
- </module>
-
- <!-- Prevent *Tests.java as tools may not pick them up -->
- <module name="RegexpOnFilename">
- <property name="fileNamePattern" value=".*Tests\.java$"/>
- </module>
-
- <module name="SuppressionFilter">
- <property name="file" value="${checkstyle.suppressions.file}"
default="suppressions.xml"/>
- </module>
-
- <!-- Check that every module has a package-info.java -->
- <module name="JavadocPackage"/>
-
- <!-- All Java AST specific tests live under TreeWalker module. -->
- <module name="TreeWalker">
-
- <!-- Allow use of comment to suppress javadocstyle -->
- <module name="SuppressionCommentFilter">
- <property name="offCommentFormat" value="CHECKSTYLE.OFF\:
([\w\|]+)"/>
- <property name="onCommentFormat" value="CHECKSTYLE.ON\:
([\w\|]+)"/>
- <property name="checkFormat" value="$1"/>
- </module>
-
- <module name="SuppressWarningsHolder"/>
-
- <module name="TodoComment">
- <!-- Checks that disallowed strings are not used in comments. -->
- <property name="format" value="(FIXME)|(XXX)|(@author)"/>
- </module>
-
- <!--
-
- IMPORT CHECKS
-
- -->
-
- <module name="RedundantImport">
- <!-- Checks for redundant import statements. -->
- <property name="severity" value="error"/>
- <message key="import.redundancy"
- value="Redundant import {0}."/>
- </module>
-
- <module name="ImportOrder">
- <property name="severity" value="error"/>
- <!-- This ensures that static imports go first. -->
- <property name="option" value="top"/>
- <property name="sortStaticImportsAlphabetically" value="true"/>
- <property name="tokens" value="STATIC_IMPORT, IMPORT"/>
- <message key="import.ordering"
- value="Import {0} appears after other imports that it
should precede"/>
- </module>
-
- <module name="AvoidStarImport">
- <property name="severity" value="error"/>
- </module>
-
- <module name="IllegalImport">
- <property name="illegalPkgs"
- value="autovalue.shaded, avro.shaded, bk-shade,
com.google.api.client.repackaged, com.google.appengine.repackaged,
io.netty.util.internal"/>
- </module>
-
- <module name="RedundantModifier">
- <!-- Checks for redundant modifiers on various symbol definitions.
- See:
http://checkstyle.sourceforge.net/config_modifier.html#RedundantModifier
- -->
- <property name="tokens"
- value="METHOD_DEF, VARIABLE_DEF, ANNOTATION_FIELD_DEF,
INTERFACE_DEF, CLASS_DEF, ENUM_DEF"/>
- </module>
-
- <!--
- IllegalImport cannot blacklist classes, and c.g.api.client.util is
used for some shaded
- code and some useful code. So we need to fall back to Regexp.
- -->
- <module name="RegexpSinglelineJava">
- <property name="format"
-
value="com\.google\.api\.client\.util\.(ByteStreams|Charsets|Collections2|Joiner|Lists|Maps|Objects|Preconditions|Sets|Strings|Throwables)"/>
- </module>
-
- <!--
- Require static importing from Preconditions.
- -->
- <module name="RegexpSinglelineJava">
- <property name="format" value="^import
com.google.common.base.Preconditions;$"/>
- <property name="message" value="Static import functions from Guava
Preconditions"/>
- </module>
-
- <module name="UnusedImports">
- <property name="severity" value="error"/>
- <property name="processJavadoc" value="true"/>
- <message key="import.unused"
- value="Unused import: {0}."/>
- </module>
- </module>
-</module>
diff --git a/buildtools/src/main/resources/pulsar/suppressions.xml
b/buildtools/src/main/resources/pulsar/suppressions.xml
index 1b22378..12eae26 100644
--- a/buildtools/src/main/resources/pulsar/suppressions.xml
+++ b/buildtools/src/main/resources/pulsar/suppressions.xml
@@ -39,6 +39,7 @@
<!-- suppress all checks in the copied code -->
<suppress checks=".*"
files=".+[\\/]com[\\/]scurrilous[\\/]circe[\\/].+\.java"/>
+ <!-- TODO: gradually delete below lines to make the whole project conform
the checkstyle rule -->
<suppress checks=".*" files="MLDataFormats.java"/>
<suppress checks=".*" files="BitSetRecyclable.java"/>
<suppress checks=".*" files="Schema.java"/>
@@ -47,4 +48,9 @@
<suppress checks="ConstantName" files="MessageId.java"/>
<suppress checks="MethodName" files="TopicsImpl.java"/>
<suppress checks="MemberName" files="TopicsImpl.java"/>
+ <suppress checks="ImportOrder"
files="src/main/java/org/apache/pulsar/common/.*.java"/>
+ <suppress checks="ImportOrder"
files="src/main/java/org/apache/pulsar/client/.*.java"/>
+ <suppress checks="ImportOrder"
files="src/main/java/org/apache/pulsar/transaction/.*.java"/>
+ <suppress checks=".*"
files="src/main/java/org/apache/pulsar/broker/.*.java"/>
+ <suppress checks=".*"
files="src/main/java/org/apache/pulsar/compaction/.*.java"/>
</suppressions>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 6f6f42b..fe4c7eb 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -350,7 +350,7 @@
<id>check-style</id>
<phase>verify</phase>
<configuration>
-
<configLocation>../buildtools/src/main/resources/pulsar/checkstyle-pulsar-broker.xml</configLocation>
+
<configLocation>../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
<suppressionsLocation>../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
<encoding>UTF-8</encoding>
<excludes>**/proto/*</excludes>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index c2f2b1c..c5403bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -71,22 +71,26 @@ public class PulsarBrokerStarter {
@VisibleForTesting
private static class StarterArguments {
@Parameter(names = {"-c", "--broker-conf"}, description =
"Configuration file for Broker")
- private String brokerConfigFile =
Paths.get("").toAbsolutePath().normalize().toString() + "/conf/broker.conf";
+ private String brokerConfigFile =
+ Paths.get("").toAbsolutePath().normalize().toString() +
"/conf/broker.conf";
@Parameter(names = {"-rb", "--run-bookie"}, description = "Run Bookie
together with Broker")
private boolean runBookie = false;
- @Parameter(names = {"-ra", "--run-bookie-autorecovery"}, description =
"Run Bookie Autorecovery together with broker")
+ @Parameter(names = {"-ra", "--run-bookie-autorecovery"},
+ description = "Run Bookie Autorecovery together with broker")
private boolean runBookieAutoRecovery = false;
@Parameter(names = {"-bc", "--bookie-conf"}, description =
"Configuration file for Bookie")
- private String bookieConfigFile =
Paths.get("").toAbsolutePath().normalize().toString() + "/conf/bookkeeper.conf";
+ private String bookieConfigFile =
+ Paths.get("").toAbsolutePath().normalize().toString() +
"/conf/bookkeeper.conf";
@Parameter(names = {"-rfw", "--run-functions-worker"}, description =
"Run functions worker with Broker")
private boolean runFunctionsWorker = false;
@Parameter(names = {"-fwc", "--functions-worker-conf"}, description =
"Configuration file for Functions Worker")
- private String fnWorkerConfigFile =
Paths.get("").toAbsolutePath().normalize().toString() +
"/conf/functions_worker.yml";
+ private String fnWorkerConfigFile =
+ Paths.get("").toAbsolutePath().normalize().toString() +
"/conf/functions_worker.yml";
@Parameter(names = {"-h", "--help"}, description = "Show this help
message")
private boolean help = false;
@@ -151,13 +155,17 @@ public class PulsarBrokerStarter {
throw new IllegalArgumentException("Max message size need
smaller than jvm directMemory");
}
- if
(!NamespaceBundleSplitAlgorithm.availableAlgorithms.containsAll(brokerConfig.getSupportedNamespaceBundleSplitAlgorithms()))
{
- throw new IllegalArgumentException("The given supported
namespace bundle split algorithm has unavailable algorithm. " +
- "Available algorithms are " +
NamespaceBundleSplitAlgorithm.availableAlgorithms);
+ if
(!NamespaceBundleSplitAlgorithm.AVAILABLE_ALGORITHMS.containsAll(
+
brokerConfig.getSupportedNamespaceBundleSplitAlgorithms())) {
+ throw new IllegalArgumentException(
+ "The given supported namespace bundle split algorithm
has unavailable algorithm. "
+ + "Available algorithms are " +
NamespaceBundleSplitAlgorithm.AVAILABLE_ALGORITHMS);
}
- if
(!brokerConfig.getSupportedNamespaceBundleSplitAlgorithms().contains(brokerConfig.getDefaultNamespaceBundleSplitAlgorithm()))
{
- throw new IllegalArgumentException("Supported namespace bundle
split algorithms must contains the default namespace bundle split algorithm");
+ if
(!brokerConfig.getSupportedNamespaceBundleSplitAlgorithms().contains(
+ brokerConfig.getDefaultNamespaceBundleSplitAlgorithm())) {
+ throw new IllegalArgumentException("Supported namespace bundle
split algorithms "
+ + "must contains the default namespace bundle split
algorithm");
}
// init functions worker
@@ -193,7 +201,8 @@ public class PulsarBrokerStarter {
// client in worker will use this config to authenticate with
broker
workerConfig.setBrokerClientAuthenticationPlugin(brokerConfig.getBrokerClientAuthenticationPlugin());
-
workerConfig.setBrokerClientAuthenticationParameters(brokerConfig.getBrokerClientAuthenticationParameters());
+ workerConfig.setBrokerClientAuthenticationParameters(
+
brokerConfig.getBrokerClientAuthenticationParameters());
// inherit super users
workerConfig.setSuperUserRoles(brokerConfig.getSuperUserRoles());
@@ -214,13 +223,13 @@ public class PulsarBrokerStarter {
// if no argument to run bookie in cmd line, read from pulsar
config
if (!argsContains(args, "-rb") && !argsContains(args,
"--run-bookie")) {
- checkState(starterArguments.runBookie == false,
- "runBookie should be false if has no argument specified");
+ checkState(!starterArguments.runBookie,
+ "runBookie should be false if has no argument
specified");
starterArguments.runBookie =
brokerConfig.isEnableRunBookieTogether();
}
if (!argsContains(args, "-ra") && !argsContains(args,
"--run-bookie-autorecovery")) {
- checkState(starterArguments.runBookieAutoRecovery == false,
- "runBookieAutoRecovery should be false if has no argument
specified");
+ checkState(!starterArguments.runBookieAutoRecovery,
+ "runBookieAutoRecovery should be false if has no
argument specified");
starterArguments.runBookieAutoRecovery =
brokerConfig.isEnableRunBookieAutoRecoveryTogether();
}
@@ -246,7 +255,8 @@ public class PulsarBrokerStarter {
if (starterArguments.runBookie) {
checkNotNull(bookieConfig, "No ServerConfiguration for
Bookie");
checkNotNull(bookieStatsProvider, "No Stats Provider for
Bookie");
- bookieServer = new BookieServer(bookieConfig,
bookieStatsProvider.getStatsLogger(""), BookieServiceInfo.NO_INFO);
+ bookieServer = new BookieServer(
+ bookieConfig, bookieStatsProvider.getStatsLogger(""),
BookieServiceInfo.NO_INFO);
} else {
bookieServer = null;
}
@@ -323,7 +333,9 @@ public class PulsarBrokerStarter {
public static void main(String[] args) throws Exception {
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
hh:mm:ss,SSS");
Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
- System.out.println(String.format("%s [%s] error Uncaught exception
in thread %s: %s", dateFormat.format(new Date()),
thread.getContextClassLoader(), thread.getName(), exception.getMessage()));
+ System.out.println(String.format("%s [%s] error Uncaught exception
in thread %s: %s",
+ dateFormat.format(new Date()),
thread.getContextClassLoader(),
+ thread.getName(), exception.getMessage()));
});
BrokerStarter starter = new BrokerStarter(args);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 5bb6a99..b533fa3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -56,7 +56,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Setup the metadata for a new Pulsar cluster
+ * Setup the metadata for a new Pulsar cluster.
*/
public class PulsarClusterMetadataSetup {
@@ -68,16 +68,18 @@ public class PulsarClusterMetadataSetup {
"--web-service-url" }, description = "Web-service URL for new
cluster", required = true)
private String clusterWebServiceUrl;
- @Parameter(names = { "-tw",
- "--web-service-url-tls" }, description = "Web-service URL for
new cluster with TLS encryption", required = false)
+ @Parameter(names = {"-tw",
+ "--web-service-url-tls"},
+ description = "Web-service URL for new cluster with TLS
encryption", required = false)
private String clusterWebServiceUrlTls;
@Parameter(names = { "-ub",
"--broker-service-url" }, description = "Broker-service URL
for new cluster", required = false)
private String clusterBrokerServiceUrl;
- @Parameter(names = { "-tb",
- "--broker-service-url-tls" }, description = "Broker-service
URL for new cluster with TLS encryption", required = false)
+ @Parameter(names = {"-tb",
+ "--broker-service-url-tls"},
+ description = "Broker-service URL for new cluster with TLS
encryption", required = false)
private String clusterBrokerServiceUrlTls;
@Parameter(names = { "-zk",
@@ -89,8 +91,9 @@ public class PulsarClusterMetadataSetup {
}, description = "Local zookeeper session timeout ms")
private int zkSessionTimeoutMillis = 30000;
- @Parameter(names = { "-gzk",
- "--global-zookeeper" }, description = "Global ZooKeeper quorum
connection string", required = false, hidden = true)
+ @Parameter(names = {"-gzk",
+ "--global-zookeeper"},
+ description = "Global ZooKeeper quorum connection string",
required = false, hidden = true)
private String globalZookeeper;
@Parameter(names = { "-cs",
@@ -108,7 +111,8 @@ public class PulsarClusterMetadataSetup {
private int numTransactionCoordinators = 16;
@Parameter(names = {
- "--existing-bk-metadata-service-uri" }, description = "The
metadata service URI of the existing BookKeeper cluster that you want to use")
+ "--existing-bk-metadata-service-uri"},
+ description = "The metadata service URI of the existing
BookKeeper cluster that you want to use")
private String existingBkMetadataServiceUri;
@Parameter(names = { "-h", "--help" }, description = "Show this help
message")
@@ -116,7 +120,7 @@ public class PulsarClusterMetadataSetup {
}
/**
- * a wrapper for ZkUtils.createFullPathOptimistic but ignore exception of
node exists
+ * a wrapper for ZkUtils.createFullPathOptimistic but ignore exception of
node exists.
*/
private static void createZkNode(ZooKeeper zkc, String path,
byte[] data, final List<ACL> acl, final
CreateMode createMode)
@@ -151,7 +155,8 @@ public class PulsarClusterMetadataSetup {
}
if (arguments.configurationStore != null && arguments.globalZookeeper
!= null) {
- System.err.println("Configuration store argument
(--configuration-store) supersedes the deprecated (--global-zookeeper)
argument");
+ System.err.println("Configuration store argument
(--configuration-store) "
+ + "supersedes the deprecated (--global-zookeeper)
argument");
jcommander.usage();
System.exit(1);
}
@@ -184,7 +189,8 @@ public class PulsarClusterMetadataSetup {
// Format BookKeeper stream storage metadata
if (arguments.numStreamStorageContainers > 0) {
- String uriStr = arguments.existingBkMetadataServiceUri == null ?
bkConf.getMetadataServiceUri() : arguments.existingBkMetadataServiceUri;
+ String uriStr = arguments.existingBkMetadataServiceUri == null
+ ? bkConf.getMetadataServiceUri() :
arguments.existingBkMetadataServiceUri;
ServiceURI bkMetadataServiceUri = ServiceURI.create(uriStr);
ClusterInitializer initializer = new
ZkClusterInitializer(arguments.zookeeper);
initializer.initializeCluster(bkMetadataServiceUri.getUri(),
arguments.numStreamStorageContainers);
@@ -201,24 +207,25 @@ public class PulsarClusterMetadataSetup {
createZkNode(localZk, "/namespace", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
createZkNode(configStoreZk, POLICIES_ROOT, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ CreateMode.PERSISTENT);
createZkNode(configStoreZk, "/admin/clusters", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ CreateMode.PERSISTENT);
ClusterData clusterData = new
ClusterData(arguments.clusterWebServiceUrl, arguments.clusterWebServiceUrlTls,
arguments.clusterBrokerServiceUrl,
arguments.clusterBrokerServiceUrlTls);
byte[] clusterDataJson =
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData);
- createZkNode(configStoreZk,"/admin/clusters/" + arguments.cluster,
clusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ createZkNode(configStoreZk, "/admin/clusters/" + arguments.cluster,
clusterDataJson,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
// Create marker for "global" cluster
ClusterData globalClusterData = new ClusterData(null, null);
byte[] globalClusterDataJson =
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(globalClusterData);
createZkNode(configStoreZk, "/admin/clusters/global",
globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ CreateMode.PERSISTENT);
// Create public tenant, whitelisted to use the this same cluster,
along with other clusters
createTenantIfAbsent(configStoreZk, TopicName.PUBLIC_TENANT,
arguments.cluster);
@@ -234,7 +241,8 @@ public class PulsarClusterMetadataSetup {
createNamespaceIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE,
arguments.cluster);
// Create transaction coordinator assign partitioned topic
- createPartitionedTopic(configStoreZk,
TopicName.TRANSACTION_COORDINATOR_ASSIGN, arguments.numTransactionCoordinators);
+ createPartitionedTopic(configStoreZk,
TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+ arguments.numTransactionCoordinators);
localZk.close();
configStoreZk.close();
@@ -271,7 +279,7 @@ public class PulsarClusterMetadataSetup {
static void createNamespaceIfAbsent(ZooKeeper configStoreZk, NamespaceName
namespaceName, String cluster)
throws KeeperException, InterruptedException, IOException {
- String namespacePath = POLICIES_ROOT + "/" +namespaceName.toString();
+ String namespacePath = POLICIES_ROOT + "/" + namespaceName.toString();
Policies policies;
Stat stat = configStoreZk.exists(namespacePath, false);
if (stat == null) {
@@ -299,21 +307,23 @@ public class PulsarClusterMetadataSetup {
}
}
- static void createPartitionedTopic(ZooKeeper configStoreZk, TopicName
topicName, int numPartitions) throws KeeperException, InterruptedException,
IOException {
+ static void createPartitionedTopic(ZooKeeper configStoreZk, TopicName
topicName, int numPartitions)
+ throws KeeperException, InterruptedException, IOException {
String partitionedTopicPath =
ZkAdminPaths.partitionedTopicPath(topicName);
Stat stat = configStoreZk.exists(partitionedTopicPath, false);
PartitionedTopicMetadata metadata = new
PartitionedTopicMetadata(numPartitions);
if (stat == null) {
createZkNode(
- configStoreZk,
- partitionedTopicPath,
-
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata),
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT
+ configStoreZk,
+ partitionedTopicPath,
+
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT
);
} else {
byte[] content = configStoreZk.getData(partitionedTopicPath,
false, null);
- PartitionedTopicMetadata existsMeta =
ObjectMapperFactory.getThreadLocal().readValue(content,
PartitionedTopicMetadata.class);
+ PartitionedTopicMetadata existsMeta =
+ ObjectMapperFactory.getThreadLocal().readValue(content,
PartitionedTopicMetadata.class);
// Only update z-node if the partitions should be modified
if (existsMeta.partitions < numPartitions) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
index 42cb245..80b34cb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -41,7 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Teardown the metadata for a existed Pulsar cluster
+ * Teardown the metadata for a existed Pulsar cluster.
*/
public class PulsarClusterMetadataTeardown {
@@ -87,7 +87,8 @@ public class PulsarClusterMetadataTeardown {
}
if (arguments.bkMetadataServiceUri != null) {
- BookKeeper bookKeeper = new BookKeeper(new
ClientConfiguration().setMetadataServiceUri(arguments.bkMetadataServiceUri));
+ BookKeeper bookKeeper =
+ new BookKeeper(new
ClientConfiguration().setMetadataServiceUri(arguments.bkMetadataServiceUri));
ZooKeeper localZk = initZk(arguments.zookeeper,
arguments.zkSessionTimeoutMillis);
ManagedLedgerFactory managedLedgerFactory = new
ManagedLedgerFactoryImpl(bookKeeper, localZk);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index 49877e3..9cea534 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -76,7 +76,9 @@ public class PulsarStandalone implements AutoCloseable {
this.advertisedAddress = advertisedAddress;
}
- public void setConfig(ServiceConfiguration config) { this.config = config;
}
+ public void setConfig(ServiceConfiguration config) {
+ this.config = config;
+ }
public void setFnWorkerService(WorkerService fnWorkerService) {
this.fnWorkerService = fnWorkerService;
@@ -225,7 +227,8 @@ public class PulsarStandalone implements AutoCloseable {
private boolean noFunctionsWorker = false;
@Parameter(names = {"-fwc", "--functions-worker-conf"}, description =
"Configuration file for Functions Worker")
- private String fnWorkerConfigFile =
Paths.get("").toAbsolutePath().normalize().toString() +
"/conf/functions_worker.yml";
+ private String fnWorkerConfigFile =
+ Paths.get("").toAbsolutePath().normalize().toString() +
"/conf/functions_worker.yml";
@Parameter(names = {"-nss", "--no-stream-storage"}, description = "Disable
stream storage")
private boolean noStreamStorage = false;
@@ -330,9 +333,12 @@ public class PulsarStandalone implements AutoCloseable {
String.format("http://%s:%d",
config.getAdvertisedAddress(), config.getWebServicePort().get()));
String brokerServiceUrl = String.format("pulsar://%s:%d",
config.getAdvertisedAddress(),
config.getBrokerServicePort().get());
- admin =
PulsarAdmin.builder().serviceHttpUrl(webServiceUrl.toString()).authentication(
- config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters()).build();
- ClusterData clusterData = new
ClusterData(webServiceUrl.toString(), null, brokerServiceUrl, null);
+ admin = PulsarAdmin.builder().serviceHttpUrl(
+ webServiceUrl.toString()).authentication(
+ config.getBrokerClientAuthenticationPlugin(),
+ config.getBrokerClientAuthenticationParameters()).build();
+ ClusterData clusterData =
+ new ClusterData(webServiceUrl.toString(), null,
brokerServiceUrl, null);
createSampleNameSpace(clusterData, cluster);
} else {
URL webServiceUrlTls = new URL(
@@ -379,7 +385,8 @@ public class PulsarStandalone implements AutoCloseable {
}
if
(!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
admin.namespaces().createNamespace(defaultNamespace);
-
admin.namespaces().setNamespaceReplicationClusters(defaultNamespace,
Sets.newHashSet(config.getClusterName()));
+ admin.namespaces().setNamespaceReplicationClusters(
+ defaultNamespace,
Sets.newHashSet(config.getClusterName()));
}
} catch (PulsarAdminException e) {
log.info(e.getMessage(), e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index 36b462c..27c9a65 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -53,7 +53,8 @@ public class PulsarStandaloneStarter extends PulsarStandalone
{
return;
}
- this.config = PulsarConfigurationLoader.create((new
FileInputStream(this.getConfigFile())), ServiceConfiguration.class);
+ this.config = PulsarConfigurationLoader.create(
+ (new FileInputStream(this.getConfigFile())),
ServiceConfiguration.class);
String zkServers = "127.0.0.1";
@@ -70,7 +71,7 @@ public class PulsarStandaloneStarter extends PulsarStandalone
{
// Set ZK server's host to localhost
// Priority: args > conf > default
- if (argsContains(args,"--zookeeper-port")) {
+ if (argsContains(args, "--zookeeper-port")) {
config.setZookeeperServers(zkServers + ":" + this.getZkPort());
config.setConfigurationStoreServers(zkServers + ":" +
this.getZkPort());
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
index c77d0c9..f105b9b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
@@ -26,7 +26,7 @@ import org.apache.zookeeper.ZooKeeper;
/**
* Setup the transaction coordinator metadata for a cluster, the setup will
create pulsar/system namespace and create
- * partitioned topic for transaction coordinator assign
+ * partitioned topic for transaction coordinator assign.
*/
public class PulsarTransactionCoordinatorMetadataSetup {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java
index e654193..dddb9f6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/ZookeeperSessionExpiredHandlers.java
@@ -24,7 +24,7 @@ import
org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler;
/**
- * Handlers for broker service to handle Zookeeper session expired
+ * Handlers for broker service to handle Zookeeper session expired.
*/
public class ZookeeperSessionExpiredHandlers {
@@ -35,7 +35,8 @@ public class ZookeeperSessionExpiredHandlers {
return new ShutDownWhenSessionExpired(shutdownService);
}
- public static ZookeeperSessionExpiredHandler
reconnectWhenZookeeperSessionExpired(PulsarService pulsarService,
ShutdownService shutdownService) {
+ public static ZookeeperSessionExpiredHandler
reconnectWhenZookeeperSessionExpired(
+ PulsarService pulsarService, ShutdownService shutdownService) {
return new ReconnectWhenSessionExpired(pulsarService, shutdownService);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index ff94374..ec4e028 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1374,7 +1374,7 @@ public abstract class NamespacesBase extends
AdminResource {
algorithm =
NamespaceBundleSplitAlgorithm.of(pulsar().getConfig().getDefaultNamespaceBundleSplitAlgorithm());
}
if (algorithm == null) {
- algorithm = NamespaceBundleSplitAlgorithm.rangeEquallyDivide;
+ algorithm =
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
}
return algorithm;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 743e3bf..e17399b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -567,7 +567,7 @@ public class Namespaces extends NamespacesBase {
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload) {
validateNamespaceName(property, cluster, namespace);
- internalSplitNamespaceBundle(bundleRange, authoritative, unload,
NamespaceBundleSplitAlgorithm.rangeEquallyDivideName);
+ internalSplitNamespaceBundle(bundleRange, authoritative, unload,
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index e142924..415c3dc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -38,7 +38,7 @@ public interface RawReader {
}
/**
- * Get the topic for the reader
+ * Get the topic for the reader.
*
* @return topic for the reader
*/
@@ -69,13 +69,13 @@ public interface RawReader {
* with the individual acknowledgement, so later acknowledgements will
overwrite all
* properties from previous acknowledgements.
*
- * @param messageId to cumulatively acknowledge to
+ * @param messageId to cumulatively acknowledge to
* @param properties a map of properties which will be stored with the
acknowledgement
*/
- CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
Map<String,Long> properties);
+ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
Map<String, Long> properties);
/**
- * Get the last message id available immediately available for reading
+ * Get the last message id available immediately available for reading.
*/
CompletableFuture<MessageId> getLastMessageIdAsync();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 7447419..17306ff 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -75,7 +75,9 @@ public class RawBatchConverter {
msg.getMessageIdData().getPartition(),
i);
if (!singleMessageMetadataBuilder.getCompactedOut()) {
- idsAndKeysAndSize.add(ImmutableTriple.of(id,
singleMessageMetadataBuilder.getPartitionKey(),
singleMessageMetadataBuilder.getPayloadSize()));
+ idsAndKeysAndSize.add(ImmutableTriple.of(
+ id, singleMessageMetadataBuilder.getPartitionKey(),
+ singleMessageMetadataBuilder.getPayloadSize()));
}
singleMessageMetadataBuilder.recycle();
singleMessagePayload.release();
@@ -108,16 +110,17 @@ public class RawBatchConverter {
int batchSize = metadata.getNumMessagesInBatch();
int messagesRetained = 0;
- SingleMessageMetadata.Builder emptyMetadataBuilder =
SingleMessageMetadata.newBuilder().setCompactedOut(true);
+ SingleMessageMetadata.Builder emptyMetadataBuilder =
+ SingleMessageMetadata.newBuilder().setCompactedOut(true);
for (int i = 0; i < batchSize; i++) {
SingleMessageMetadata.Builder singleMessageMetadataBuilder =
SingleMessageMetadata.newBuilder();
ByteBuf singleMessagePayload =
Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
-
singleMessageMetadataBuilder,
-
0, batchSize);
+ singleMessageMetadataBuilder,
+ 0, batchSize);
MessageId id = new
BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
-
msg.getMessageIdData().getEntryId(),
-
msg.getMessageIdData().getPartition(),
- i);
+ msg.getMessageIdData().getEntryId(),
+ msg.getMessageIdData().getPartition(),
+ i);
if (!singleMessageMetadataBuilder.hasPartitionKey()) {
messagesRetained++;
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 3863d4d..6cdf62e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -83,7 +83,7 @@ public class RawReaderImpl implements RawReader {
}
@Override
- public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId
messageId, Map<String,Long> properties) {
+ public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId
messageId, Map<String, Long> properties) {
return consumer.doAcknowledgeWithTxn(messageId, AckType.Cumulative,
properties, null);
}
@@ -140,7 +140,8 @@ public class RawReaderImpl implements RawReader {
} else {
int numMsg;
try {
- MessageMetadata msgMetadata =
Commands.parseMessageMetadata(messageAndCnx.msg.getHeadersAndPayload());
+ MessageMetadata msgMetadata =
+
Commands.parseMessageMetadata(messageAndCnx.msg.getHeadersAndPayload());
numMsg = msgMetadata.getNumMessagesInBatch();
msgMetadata.recycle();
} catch (Throwable t) {
@@ -201,10 +202,11 @@ public class RawReaderImpl implements RawReader {
}
@Override
- void messageReceived(MessageIdData messageId, int redeliveryCount,
List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {
+ void messageReceived(MessageIdData messageId, int redeliveryCount,
+ List<Long> ackSet, ByteBuf headersAndPayload,
ClientCnx cnx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received raw message: {}/{}/{}", topic,
subscription,
- messageId.getEntryId(), messageId.getLedgerId(),
messageId.getPartition());
+ messageId.getEntryId(), messageId.getLedgerId(),
messageId.getPartition());
}
incomingRawMessages.add(
new RawMessageAndCnx(new RawMessageImpl(messageId,
headersAndPayload), cnx));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 26caee0..f1a127e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -132,7 +132,7 @@ public class NamespaceBundleFactory implements
ZooKeeperCacheListener<LocalPolic
}
/**
- * checks if the local broker is the owner of the namespace bundle
+ * checks if the local broker is the owner of the namespace bundle.
*
* @param nsBundle
* @return
@@ -223,11 +223,13 @@ public class NamespaceBundleFactory implements
ZooKeeperCacheListener<LocalPolic
* @return List of split {@link NamespaceBundle} and {@link
NamespaceBundles} that contains final bundles including
* split bundles for a given namespace
*/
- public Pair<NamespaceBundles, List<NamespaceBundle>>
splitBundles(NamespaceBundle targetBundle, int numBundles, Long splitBoundary) {
+ public Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(
+ NamespaceBundle targetBundle, int numBundles, Long splitBoundary) {
checkArgument(canSplitBundle(targetBundle), "%s bundle can't be split
further", targetBundle);
if (splitBoundary != null) {
- checkArgument(splitBoundary > targetBundle.getLowerEndpoint() &&
splitBoundary < targetBundle.getUpperEndpoint(),
- "The given fixed key must between the key range of the %s
bundle", targetBundle);
+ checkArgument(splitBoundary > targetBundle.getLowerEndpoint()
+ && splitBoundary < targetBundle.getUpperEndpoint(),
+ "The given fixed key must between the key range of the %s
bundle", targetBundle);
numBundles = 2;
}
checkNotNull(targetBundle, "can't split null bundle");
@@ -261,7 +263,8 @@ public class NamespaceBundleFactory implements
ZooKeeperCacheListener<LocalPolic
partitions[pos] = sourceBundle.partitions[lastIndex];
if (splitPartition != -1) {
// keep version of sourceBundle
- NamespaceBundles splittedNsBundles = new NamespaceBundles(nsname,
partitions, this, sourceBundle.getVersion());
+ NamespaceBundles splittedNsBundles =
+ new NamespaceBundles(nsname, partitions, this,
sourceBundle.getVersion());
List<NamespaceBundle> splittedBundles =
splittedNsBundles.getBundles().subList(splitPartition,
(splitPartition + numBundles));
return new ImmutablePair<NamespaceBundles,
List<NamespaceBundle>>(splittedNsBundles, splittedBundles);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
index c7647f5..1dc19f3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
@@ -28,23 +28,23 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
*/
public interface NamespaceBundleSplitAlgorithm {
- String rangeEquallyDivideName = "range_equally_divide";
- String topicCountEquallyDivideName = "topic_count_equally_divide";
+ String RANGE_EQUALLY_DIVIDE_NAME = "range_equally_divide";
+ String TOPIC_COUNT_EQUALLY_DIVIDE = "topic_count_equally_divide";
- List<String> availableAlgorithms =
Lists.newArrayList(rangeEquallyDivideName, topicCountEquallyDivideName);
+ List<String> AVAILABLE_ALGORITHMS =
Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME, TOPIC_COUNT_EQUALLY_DIVIDE);
- NamespaceBundleSplitAlgorithm rangeEquallyDivide = new
RangeEquallyDivideBundleSplitAlgorithm();
- NamespaceBundleSplitAlgorithm topicCountEquallyDivide = new
TopicCountEquallyDivideBundleSplitAlgorithm();
+ NamespaceBundleSplitAlgorithm RANGE_EQUALLY_DIVIDE_ALGO = new
RangeEquallyDivideBundleSplitAlgorithm();
+ NamespaceBundleSplitAlgorithm TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = new
TopicCountEquallyDivideBundleSplitAlgorithm();
static NamespaceBundleSplitAlgorithm of(String algorithmName) {
if (algorithmName == null) {
return null;
}
switch (algorithmName) {
- case rangeEquallyDivideName:
- return rangeEquallyDivide;
- case topicCountEquallyDivideName:
- return topicCountEquallyDivide;
+ case RANGE_EQUALLY_DIVIDE_NAME:
+ return RANGE_EQUALLY_DIVIDE_ALGO;
+ case TOPIC_COUNT_EQUALLY_DIVIDE:
+ return TOPIC_COUNT_EQUALLY_DIVIDE_ALGO;
default:
return null;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
index af8c807..75e235f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
@@ -79,7 +79,7 @@ public class NamespaceBundles {
lowerBound = upperBound;
}
} else {
- this.partitions = new long[] { 0L };
+ this.partitions = new long[]{0L};
bundles.add(fullBundle);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java
index 855193c..e0b2347 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/RangeEquallyDivideBundleSplitAlgorithm.java
@@ -28,7 +28,7 @@ public class RangeEquallyDivideBundleSplitAlgorithm
implements NamespaceBundleSp
@Override
public CompletableFuture<Long> getSplitBoundary(NamespaceService service,
NamespaceBundle bundle) {
- return CompletableFuture.completedFuture(bundle.getLowerEndpoint() +
- (bundle.getUpperEndpoint() - bundle.getLowerEndpoint()) / 2);
+ return CompletableFuture.completedFuture(bundle.getLowerEndpoint()
+ + (bundle.getUpperEndpoint() - bundle.getLowerEndpoint()) / 2);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
index 2aef18e..ee2c384 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
@@ -49,22 +49,22 @@ import org.apache.pulsar.common.util.RelativeTimeUtil;
public class TokensCliUtils {
public static class Arguments {
- @Parameter(names = { "-h", "--help" }, description = "Show this help
message")
+ @Parameter(names = {"-h", "--help"}, description = "Show this help
message")
private boolean help = false;
}
@Parameters(commandDescription = "Create a new secret key")
public static class CommandCreateSecretKey {
- @Parameter(names = { "-a",
- "--signature-algorithm" }, description = "The signature
algorithm for the new secret key.")
+ @Parameter(names = {"-a",
+ "--signature-algorithm"}, description = "The signature
algorithm for the new secret key.")
SignatureAlgorithm algorithm = SignatureAlgorithm.HS256;
- @Parameter(names = { "-o",
- "--output" }, description = "Write the secret key to a file
instead of stdout")
+ @Parameter(names = {"-o",
+ "--output"}, description = "Write the secret key to a file
instead of stdout")
String outputFile;
@Parameter(names = {
- "-b", "--base64" }, description = "Encode the key in base64")
+ "-b", "--base64"}, description = "Encode the key in base64")
boolean base64 = false;
public void run() throws IOException {
@@ -85,15 +85,15 @@ public class TokensCliUtils {
@Parameters(commandDescription = "Create a new or pair of keys
public/private")
public static class CommandCreateKeyPair {
- @Parameter(names = { "-a",
- "--signature-algorithm" }, description = "The signature
algorithm for the new key pair.")
+ @Parameter(names = {"-a",
+ "--signature-algorithm"}, description = "The signature
algorithm for the new key pair.")
SignatureAlgorithm algorithm = SignatureAlgorithm.RS256;
@Parameter(names = {
- "--output-private-key" }, description = "File where to write
the private key", required = true)
+ "--output-private-key"}, description = "File where to write
the private key", required = true)
String privateKeyFile;
@Parameter(names = {
- "--output-public-key" }, description = "File where to write
the public key", required = true)
+ "--output-public-key"}, description = "File where to write the
public key", required = true)
String publicKeyFile;
public void run() throws IOException {
@@ -106,24 +106,29 @@ public class TokensCliUtils {
@Parameters(commandDescription = "Create a new token")
public static class CommandCreateToken {
- @Parameter(names = { "-a",
- "--signature-algorithm" }, description = "The signature
algorithm for the new key pair.")
+ @Parameter(names = {"-a",
+ "--signature-algorithm"}, description = "The signature
algorithm for the new key pair.")
SignatureAlgorithm algorithm = SignatureAlgorithm.RS256;
- @Parameter(names = { "-s",
- "--subject" }, description = "Specify the 'subject' or
'principal' associate with this token", required = true)
+ @Parameter(names = {"-s",
+ "--subject"},
+ description = "Specify the 'subject' or 'principal' associate
with this token", required = true)
private String subject;
- @Parameter(names = { "-e",
- "--expiry-time" }, description = "Relative expiry time for the
token (eg: 1h, 3d, 10y). (m=minutes) Default: no expiration")
+ @Parameter(names = {"-e",
+ "--expiry-time"},
+ description = "Relative expiry time for the token (eg: 1h, 3d,
10y)."
+ + " (m=minutes) Default: no expiration")
private String expiryTime;
- @Parameter(names = { "-sk",
- "--secret-key" }, description = "Pass the secret key for
signing the token. This can either be: data:, file:, etc..")
+ @Parameter(names = {"-sk",
+ "--secret-key"},
+ description = "Pass the secret key for signing the token. This
can either be: data:, file:, etc..")
private String secretKey;
- @Parameter(names = { "-pk",
- "--private-key" }, description = "Pass the private key for
signing the token. This can either be: data:, file:, etc..")
+ @Parameter(names = {"-pk",
+ "--private-key"},
+ description = "Pass the private key for signing the token.
This can either be: data:, file:, etc..")
private String privateKey;
public void run() throws Exception {
@@ -165,12 +170,12 @@ public class TokensCliUtils {
@Parameter(description = "The token string", arity = 1)
private java.util.List<String> args;
- @Parameter(names = { "-i",
- "--stdin" }, description = "Read token from standard input")
+ @Parameter(names = {"-i",
+ "--stdin"}, description = "Read token from standard input")
private Boolean stdin = false;
- @Parameter(names = { "-f",
- "--token-file" }, description = "Read token from a file")
+ @Parameter(names = {"-f",
+ "--token-file"}, description = "Read token from a file")
private String tokenFile;
public void run() throws Exception {
@@ -187,7 +192,8 @@ public class TokensCliUtils {
token = System.getenv("TOKEN");
} else {
System.err.println(
- "Token needs to be either passed as an argument or
through `--stdin`, `--token-file` or by the `TOKEN` environment variable");
+ "Token needs to be either passed as an argument or
through `--stdin`,"
+ + " `--token-file` or by the `TOKEN`
environment variable");
System.exit(1);
return;
}
@@ -202,27 +208,29 @@ public class TokensCliUtils {
@Parameters(commandDescription = "Validate a token against a key")
public static class CommandValidateToken {
- @Parameter(names = { "-a",
- "--signature-algorithm" }, description = "The signature
algorithm for the key pair if using public key.")
+ @Parameter(names = {"-a",
+ "--signature-algorithm"}, description = "The signature
algorithm for the key pair if using public key.")
SignatureAlgorithm algorithm = SignatureAlgorithm.RS256;
@Parameter(description = "The token string", arity = 1)
private java.util.List<String> args;
- @Parameter(names = { "-i",
- "--stdin" }, description = "Read token from standard input")
+ @Parameter(names = {"-i",
+ "--stdin"}, description = "Read token from standard input")
private Boolean stdin = false;
- @Parameter(names = { "-f",
- "--token-file" }, description = "Read token from a file")
+ @Parameter(names = {"-f",
+ "--token-file"}, description = "Read token from a file")
private String tokenFile;
- @Parameter(names = { "-sk",
- "--secret-key" }, description = "Pass the secret key for
validating the token. This can either be: data:, file:, etc..")
+ @Parameter(names = {"-sk",
+ "--secret-key"},
+ description = "Pass the secret key for validating the token.
This can either be: data:, file:, etc..")
private String secretKey;
- @Parameter(names = { "-pk",
- "--public-key" }, description = "Pass the public key for
validating the token. This can either be: data:, file:, etc..")
+ @Parameter(names = {"-pk",
+ "--public-key"},
+ description = "Pass the public key for validating the token.
This can either be: data:, file:, etc..")
private String publicKey;
public void run() throws Exception {
@@ -249,7 +257,8 @@ public class TokensCliUtils {
token = System.getenv("TOKEN");
} else {
System.err.println(
- "Token needs to be either passed as an argument or
through `--stdin`, `--token-file` or by the `TOKEN` environment variable");
+ "Token needs to be either passed as an argument or
through `--stdin`,"
+ + " `--token-file` or by the `TOKEN`
environment variable");
System.exit(1);
return;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index ad9939c..302f327 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -1132,7 +1132,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
try {
admin.namespaces().splitNamespaceBundle(namespace,
"0x00000000_0xffffffff", true,
- NamespaceBundleSplitAlgorithm.topicCountEquallyDivideName);
+ NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE);
} catch (Exception e) {
fail("split bundle shouldn't have thrown exception");
}
@@ -1161,7 +1161,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
@Test
public void
testNamespaceSplitBundleWithDefaultTopicCountEquallyDivideAlgorithm() throws
Exception {
-
conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.topicCountEquallyDivideName);
+
conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE);
// Force to create a topic
final String namespace = "prop-xyz/ns1";
List<String> topicNames = Lists.newArrayList(
@@ -1195,7 +1195,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
assertNotEquals(bundles.getBundles().get(i).toString(),
splitRange[i]);
}
producers.forEach(Producer::closeAsync);
-
conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.rangeEquallyDivideName);
+
conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index b819601..0fb9b4c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -114,7 +114,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
NamespaceBundle originalBundle = bundles.findBundle(topicName);
// Split bundle and take ownership of split bundles
- CompletableFuture<Void> result =
namespaceService.splitAndOwnBundle(originalBundle, false,
NamespaceBundleSplitAlgorithm.rangeEquallyDivide);
+ CompletableFuture<Void> result =
namespaceService.splitAndOwnBundle(originalBundle, false,
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result.get();
@@ -194,7 +194,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
assertNotNull(list);
// Split bundle and take ownership of split bundles
- CompletableFuture<Void> result =
namespaceService.splitAndOwnBundle(originalBundle, false,
NamespaceBundleSplitAlgorithm.rangeEquallyDivide);
+ CompletableFuture<Void> result =
namespaceService.splitAndOwnBundle(originalBundle, false,
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result.get();
} catch (Exception e) {
@@ -409,7 +409,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
NamespaceBundle originalBundle = bundles.findBundle(topicName);
// Split bundle and take ownership of split bundles
- CompletableFuture<Void> result =
namespaceService.splitAndOwnBundle(originalBundle, false,
NamespaceBundleSplitAlgorithm.rangeEquallyDivide);
+ CompletableFuture<Void> result =
namespaceService.splitAndOwnBundle(originalBundle, false,
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result.get();
@@ -474,7 +474,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
NamespaceBundles bundles =
namespaceService.getNamespaceBundleFactory().getBundles(nsname);
NamespaceBundle originalBundle = bundles.findBundle(topicName);
- CompletableFuture<Void> result1 =
namespaceService.splitAndOwnBundle(originalBundle, false,
NamespaceBundleSplitAlgorithm.rangeEquallyDivide);
+ CompletableFuture<Void> result1 =
namespaceService.splitAndOwnBundle(originalBundle, false,
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result1.get();
} catch (Exception e) {
@@ -493,7 +493,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
}
});
- CompletableFuture<Void> result2 =
namespaceService.splitAndOwnBundle(splittedBundle, true,
NamespaceBundleSplitAlgorithm.rangeEquallyDivide);
+ CompletableFuture<Void> result2 =
namespaceService.splitAndOwnBundle(splittedBundle, true,
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result2.get();
} catch (Exception e) {