This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eb5725a2a2d [Improve][broker] Support clear old bookie data for 
BKCluster (#16744)
eb5725a2a2d is described below

commit eb5725a2a2de92b42f0efcfeb38c506ba7882401
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Jul 25 16:41:20 2022 +0800

    [Improve][broker] Support clear old bookie data for BKCluster (#16744)
---
 .../java/org/apache/pulsar/PulsarStandalone.java   |  1 +
 .../pulsar/broker/EmbeddedPulsarCluster.java       |  6 +++++-
 .../apache/pulsar/broker/EndToEndMetadataTest.java | 24 ++++++++++++++++++++++
 .../pulsar/metadata/bookkeeper/BKCluster.java      | 12 +++++++++++
 .../pulsar/metadata/BaseMetadataStoreTest.java     |  4 ++--
 5 files changed, 44 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index b8c175c0a4d..8d19971880f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -443,6 +443,7 @@ public class PulsarStandalone implements AutoCloseable {
                 .bkPort(bkPort)
                 .numBookies(numOfBk)
                 .dataDir(bkDir)
+                .clearOldData(wipeData)
                 .build();
         config.setBookkeeperNumberOfChannelsPerBookie(1);
         config.setMetadataStoreUrl(metadataStoreUrl);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
index dc0a029ec8b..4577ee74ae3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.metadata.bookkeeper.BKCluster;
+import org.junit.platform.commons.util.StringUtils;
 
 
 public class EmbeddedPulsarCluster implements AutoCloseable {
@@ -52,13 +53,16 @@ public class EmbeddedPulsarCluster implements AutoCloseable 
{
     private final PulsarAdmin admin;
 
     @Builder
-    private EmbeddedPulsarCluster(int numBrokers, int numBookies, String 
metadataStoreUrl) throws Exception {
+    private EmbeddedPulsarCluster(int numBrokers, int numBookies, String 
metadataStoreUrl,
+                                  String dataDir, boolean clearOldData) throws 
Exception {
         this.numBrokers = numBrokers;
         this.numBookies = numBookies;
         this.metadataStoreUrl = metadataStoreUrl;
         this.bkCluster = BKCluster.builder()
                 .metadataServiceUri(metadataStoreUrl)
                 .numBookies(numBookies)
+                .dataDir(StringUtils.isNotBlank(dataDir) ? dataDir : null)
+                .clearOldData(clearOldData)
                 .build();
 
         for (int i = 0; i < numBrokers; i++) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EndToEndMetadataTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EndToEndMetadataTest.java
index 11097b53f28..3ea3aa3d9fb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EndToEndMetadataTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EndToEndMetadataTest.java
@@ -19,27 +19,51 @@
 package org.apache.pulsar.broker;
 
 import static org.testng.Assert.assertEquals;
+import java.io.File;
 import java.util.function.Supplier;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.IOUtils;
+import org.apache.commons.io.FileUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.metadata.BaseMetadataStoreTest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Slf4j
 public class EndToEndMetadataTest extends BaseMetadataStoreTest {
 
+    private File tempDir;
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        tempDir = IOUtils.createTempDir("bookies", "test");
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.cleanup();
+        FileUtils.deleteDirectory(tempDir);
+    }
+
     @Test(dataProvider = "impl")
     public void testPublishConsume(String provider, Supplier<String> 
urlSupplier) throws Exception {
+
         @Cleanup
         EmbeddedPulsarCluster epc = EmbeddedPulsarCluster.builder()
                 .numBrokers(1)
                 .numBookies(1)
                 .metadataStoreUrl(urlSupplier.get())
+                .dataDir(tempDir.getAbsolutePath())
+                .clearOldData(true)
                 .build();
 
         @Cleanup
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
index 96cb3cf2daa..e902b34e881 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.metadata.bookkeeper;
 
+import static org.apache.commons.io.FileUtils.cleanDirectory;
 import java.io.File;
 import java.io.IOException;
 import java.net.NetworkInterface;
@@ -75,6 +76,8 @@ public class BKCluster implements AutoCloseable {
         private String dataDir;
         private int bkPort = 0;
 
+        private boolean clearOldData;
+
         public BKClusterConf metadataServiceUri(String metadataServiceUri) {
             this.metadataServiceUri = metadataServiceUri;
             return this;
@@ -95,6 +98,11 @@ public class BKCluster implements AutoCloseable {
             return this;
         }
 
+        public BKClusterConf clearOldData(boolean clearOldData) {
+            this.clearOldData = clearOldData;
+            return this;
+        }
+
         public BKCluster build() throws Exception {
             return new BKCluster(this);
         }
@@ -198,6 +206,10 @@ public class BKCluster implements AutoCloseable {
             dataDir = createTempDir("bookie", "test");
         }
 
+        if (clusterConf.clearOldData) {
+            cleanDirectory(dataDir);
+        }
+
         int port;
         if (baseConf.isEnableLocalTransport() || 
!baseConf.getAllowEphemeralPorts() || clusterConf.bkPort == 0) {
             port = PortManager.nextFreePort();
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index 4dc3fb3589c..6a15e1e8cc2 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -41,14 +41,14 @@ public abstract class BaseMetadataStoreTest extends 
TestRetrySupport {
 
     @BeforeClass(alwaysRun = true)
     @Override
-    public final void setup() throws Exception {
+    public void setup() throws Exception {
         incrementSetupNumber();
         zks = new TestZKServer();
     }
 
     @AfterClass(alwaysRun = true)
     @Override
-    public final void cleanup() throws Exception {
+    public void cleanup() throws Exception {
         markCurrentSetupNumberCleaned();
         if (zks != null) {
             zks.close();

Reply via email to