This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bdf3b4320f1 [fix][broker] read local cookie when start pulsar
standalone (#18260)
bdf3b4320f1 is described below
commit bdf3b4320f1517db5002e1407a56745590e83722
Author: labuladong <[email protected]>
AuthorDate: Tue Nov 1 12:45:24 2022 +0800
[fix][broker] read local cookie when start pulsar standalone (#18260)
---
.../java/org/apache/pulsar/PulsarStandalone.java | 3 +-
.../org/apache/pulsar/PulsarStandaloneTest.java | 36 ++++++++
...pulsar_broker_test_standalone_with_rocksdb.conf | 97 ++++++++++++++++++++++
.../pulsar/metadata/bookkeeper/BKCluster.java | 19 +++++
4 files changed, 154 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index 3408d7ccfe4..0ff33180570 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -440,7 +440,8 @@ public class PulsarStandalone implements AutoCloseable {
}
}
- private void startBookieWithMetadataStore() throws Exception {
+ @VisibleForTesting
+ void startBookieWithMetadataStore() throws Exception {
if (StringUtils.isBlank(metadataStoreUrl)){
log.info("Starting BK with RocksDb metadata store");
metadataStoreUrl = "rocksdb://" +
Paths.get(metadataDir).toAbsolutePath();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
index 53863225651..357a8e1bd4b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar;
+import static org.apache.commons.io.FileUtils.cleanDirectory;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.eq;
@@ -26,6 +27,10 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.File;
+import java.util.List;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.IOUtils;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -35,6 +40,7 @@ import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TenantResources;
+import org.testng.Assert;
import org.testng.annotations.Test;
@Test(groups = "broker")
@@ -91,4 +97,34 @@ public class PulsarStandaloneTest {
verify(admin.namespaces(),
times(1)).createNamespace(eq(ns.toString()));
}
+ @Test(groups = "broker")
+ public void testStandaloneWithRocksDB() throws Exception {
+ String[] args = new String[]{"--config",
+
"./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf"};
+ final int bookieNum = 3;
+ final File tempDir = IOUtils.createTempDir("standalone", "test");
+
+ PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args);
+ standalone.setBkDir(tempDir.getAbsolutePath());
+ standalone.setNumOfBk(bookieNum);
+
+ standalone.startBookieWithMetadataStore();
+ List<ServerConfiguration> firstBsConfs =
standalone.bkCluster.getBsConfs();
+ Assert.assertEquals(firstBsConfs.size(), bookieNum);
+ standalone.close();
+
+ // start twice, read cookie from local folder
+ standalone.startBookieWithMetadataStore();
+ List<ServerConfiguration> secondBsConfs =
standalone.bkCluster.getBsConfs();
+ Assert.assertEquals(secondBsConfs.size(), bookieNum);
+
+ for (int i = 0; i < bookieNum; i++) {
+ ServerConfiguration conf1 = firstBsConfs.get(i);
+ ServerConfiguration conf2 = secondBsConfs.get(i);
+ Assert.assertEquals(conf1.getBookiePort(), conf2.getBookiePort());
+ }
+ standalone.close();
+ cleanDirectory(tempDir);
+ }
+
}
diff --git
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf
new file mode 100644
index 00000000000..d8b26bbbfa9
--- /dev/null
+++
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+applicationName="pulsar_broker"
+metadataStoreUrl=
+configurationMetadataStoreUrl=
+brokerServicePort=6650
+brokerServicePortTls=6651
+webServicePort=8080
+allowLoopback=true
+webServicePortTls=4443
+bindAddress=0.0.0.0
+advertisedAddress=
+advertisedListeners=
+internalListenerName=internal
+clusterName="test_cluster"
+brokerShutdownTimeoutMs=3000
+backlogQuotaCheckEnabled=true
+backlogQuotaCheckIntervalInSeconds=60
+backlogQuotaDefaultLimitGB=50
+brokerDeleteInactiveTopicsEnabled=true
+brokerDeleteInactiveTopicsFrequencySeconds=60
+allowAutoTopicCreation=true
+allowAutoTopicCreationType=non-partitioned
+defaultNumPartitions=1
+messageExpiryCheckIntervalInMinutes=5
+clientLibraryVersionCheckEnabled=false
+clientLibraryVersionCheckAllowUnversioned=true
+statusFilePath=/tmp/status.html
+tlsEnabled=false
+tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt
+tlsKeyFilePath=/home/local/conf/pulsar/server.key
+tlsTrustCertsFilePath=
+tlsAllowInsecureConnection=false
+authenticationEnabled=false
+authorizationEnabled=false
+superUserRoles="test_user"
+brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
+brokerClientAuthenticationParameters=
+bookkeeperClientAuthenticationPlugin="test_auth_plugin"
+bookkeeperClientAuthenticationAppId="test_auth_id"
+bookkeeperClientTimeoutInSeconds=30
+bookkeeperClientSpeculativeReadTimeoutInMillis=0
+bookkeeperClientHealthCheckEnabled=true
+bookkeeperClientHealthCheckIntervalSeconds=60
+bookkeeperClientHealthCheckErrorThresholdPerInterval=5
+bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800
+bookkeeperClientRackawarePolicyEnabled=true
+bookkeeperClientRegionawarePolicyEnabled=false
+bookkeeperClientMinNumRacksPerWriteQuorum=2
+bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
+bookkeeperClientReorderReadSequenceEnabled=false
+bookkeeperClientIsolationGroups="test_group"
+managedLedgerDefaultEnsembleSize=3
+managedLedgerDefaultWriteQuorum=2
+managedLedgerDefaultAckQuorum=2
+managedLedgerCacheSizeMB=1024
+managedLedgerCacheEvictionWatermark=10
+managedLedgerDefaultMarkDeleteRateLimit=0.1
+managedLedgerMaxEntriesPerLedger=50000
+managedLedgerMinLedgerRolloverTimeMinutes=10
+managedLedgerMaxLedgerRolloverTimeMinutes=240
+managedLedgerCursorMaxEntriesPerLedger=50000
+managedLedgerCursorRolloverTimeInSeconds = 14400
+managedLedgerDataReadPriority = bookkeeper-first
+loadBalancerEnabled = false
+loadBalancerReportUpdateThresholdPercentage=10
+loadBalancerReportUpdateMaxIntervalMinutes=15
+loadBalancerHostUsageCheckIntervalMinutes=1
+loadBalancerSheddingIntervalMinutes=30
+loadBalancerSheddingGracePeriodMinutes=30
+loadBalancerBrokerUnderloadedThresholdPercentage=50
+loadBalancerBrokerOverloadedThresholdPercentage=85
+replicationMetricsEnabled=true
+replicationConnectionsPerBroker=16
+replicationProducerQueueSize=1000
+replicatorPrefix=pulsar.repl
+brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
+supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
+defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
+maxMessagePublishBufferSizeInMB=-1
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
index 8a7ea62f7cb..c2f3f72ec21 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
@@ -30,9 +30,12 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.BookieImpl;
+import org.apache.bookkeeper.bookie.Cookie;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.component.ComponentStarter;
@@ -65,6 +68,7 @@ public class BKCluster implements AutoCloseable {
// BookKeeper related variables
private final List<File> tmpDirs = new ArrayList<>();
private final List<LifecycleComponentStack> bookieComponents = new
ArrayList<>();
+ @Getter
private final List<ServerConfiguration> bsConfs = new ArrayList<>();
protected final ServerConfiguration baseConf;
@@ -231,9 +235,24 @@ public class BKCluster implements AutoCloseable {
// and 2nd bookie's cookie validation fails
port = clusterConf.bkPort;
}
+ File[] cookieDir = dataDir.listFiles((file) ->
file.getName().equals("current"));
+ if (cookieDir != null && cookieDir.length > 0) {
+ String existBookieAddr =
parseBookieAddressFromCookie(cookieDir[0]);
+ if (existBookieAddr != null) {
+ baseConf.setAdvertisedAddress(existBookieAddr.split(":")[0]);
+ port = Integer.parseInt(existBookieAddr.split(":")[1]);
+ }
+ }
return newServerConfiguration(port, dataDir, new File[]{dataDir});
}
+ private String parseBookieAddressFromCookie(File dir) throws IOException {
+ Cookie cookie = Cookie.readFromDirectory(dir);
+ Pattern pattern = Pattern.compile(".*bookieHost: \"(.*?)\".*",
Pattern.DOTALL);
+ Matcher m = pattern.matcher(cookie.toString());
+ return m.find() ? m.group(1) : null;
+ }
+
private ClientConfiguration newClientConfiguration() {
return new ClientConfiguration(baseConf);
}