This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eb5725a2a2d [Improve][broker] Support clear old bookie data for
BKCluster (#16744)
eb5725a2a2d is described below
commit eb5725a2a2de92b42f0efcfeb38c506ba7882401
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Jul 25 16:41:20 2022 +0800
[Improve][broker] Support clear old bookie data for BKCluster (#16744)
---
.../java/org/apache/pulsar/PulsarStandalone.java | 1 +
.../pulsar/broker/EmbeddedPulsarCluster.java | 6 +++++-
.../apache/pulsar/broker/EndToEndMetadataTest.java | 24 ++++++++++++++++++++++
.../pulsar/metadata/bookkeeper/BKCluster.java | 12 +++++++++++
.../pulsar/metadata/BaseMetadataStoreTest.java | 4 ++--
5 files changed, 44 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index b8c175c0a4d..8d19971880f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -443,6 +443,7 @@ public class PulsarStandalone implements AutoCloseable {
.bkPort(bkPort)
.numBookies(numOfBk)
.dataDir(bkDir)
+ .clearOldData(wipeData)
.build();
config.setBookkeeperNumberOfChannelsPerBookie(1);
config.setMetadataStoreUrl(metadataStoreUrl);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
index dc0a029ec8b..4577ee74ae3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.metadata.bookkeeper.BKCluster;
+import org.junit.platform.commons.util.StringUtils;
public class EmbeddedPulsarCluster implements AutoCloseable {
@@ -52,13 +53,16 @@ public class EmbeddedPulsarCluster implements AutoCloseable
{
private final PulsarAdmin admin;
@Builder
- private EmbeddedPulsarCluster(int numBrokers, int numBookies, String
metadataStoreUrl) throws Exception {
+ private EmbeddedPulsarCluster(int numBrokers, int numBookies, String
metadataStoreUrl,
+ String dataDir, boolean clearOldData) throws
Exception {
this.numBrokers = numBrokers;
this.numBookies = numBookies;
this.metadataStoreUrl = metadataStoreUrl;
this.bkCluster = BKCluster.builder()
.metadataServiceUri(metadataStoreUrl)
.numBookies(numBookies)
+ .dataDir(StringUtils.isNotBlank(dataDir) ? dataDir : null)
+ .clearOldData(clearOldData)
.build();
for (int i = 0; i < numBrokers; i++) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EndToEndMetadataTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EndToEndMetadataTest.java
index 11097b53f28..3ea3aa3d9fb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EndToEndMetadataTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EndToEndMetadataTest.java
@@ -19,27 +19,51 @@
package org.apache.pulsar.broker;
import static org.testng.Assert.assertEquals;
+import java.io.File;
import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.IOUtils;
+import org.apache.commons.io.FileUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.metadata.BaseMetadataStoreTest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
public class EndToEndMetadataTest extends BaseMetadataStoreTest {
+ private File tempDir;
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ tempDir = IOUtils.createTempDir("bookies", "test");
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.cleanup();
+ FileUtils.deleteDirectory(tempDir);
+ }
+
@Test(dataProvider = "impl")
public void testPublishConsume(String provider, Supplier<String>
urlSupplier) throws Exception {
+
@Cleanup
EmbeddedPulsarCluster epc = EmbeddedPulsarCluster.builder()
.numBrokers(1)
.numBookies(1)
.metadataStoreUrl(urlSupplier.get())
+ .dataDir(tempDir.getAbsolutePath())
+ .clearOldData(true)
.build();
@Cleanup
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
index 96cb3cf2daa..e902b34e881 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.metadata.bookkeeper;
+import static org.apache.commons.io.FileUtils.cleanDirectory;
import java.io.File;
import java.io.IOException;
import java.net.NetworkInterface;
@@ -75,6 +76,8 @@ public class BKCluster implements AutoCloseable {
private String dataDir;
private int bkPort = 0;
+ private boolean clearOldData;
+
public BKClusterConf metadataServiceUri(String metadataServiceUri) {
this.metadataServiceUri = metadataServiceUri;
return this;
@@ -95,6 +98,11 @@ public class BKCluster implements AutoCloseable {
return this;
}
+ public BKClusterConf clearOldData(boolean clearOldData) {
+ this.clearOldData = clearOldData;
+ return this;
+ }
+
public BKCluster build() throws Exception {
return new BKCluster(this);
}
@@ -198,6 +206,10 @@ public class BKCluster implements AutoCloseable {
dataDir = createTempDir("bookie", "test");
}
+ if (clusterConf.clearOldData) {
+ cleanDirectory(dataDir);
+ }
+
int port;
if (baseConf.isEnableLocalTransport() ||
!baseConf.getAllowEphemeralPorts() || clusterConf.bkPort == 0) {
port = PortManager.nextFreePort();
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index 4dc3fb3589c..6a15e1e8cc2 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -41,14 +41,14 @@ public abstract class BaseMetadataStoreTest extends
TestRetrySupport {
@BeforeClass(alwaysRun = true)
@Override
- public final void setup() throws Exception {
+ public void setup() throws Exception {
incrementSetupNumber();
zks = new TestZKServer();
}
@AfterClass(alwaysRun = true)
@Override
- public final void cleanup() throws Exception {
+ public void cleanup() throws Exception {
markCurrentSetupNumberCleaned();
if (zks != null) {
zks.close();