This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new eabc2cd9593 [Improve][broker] Support clear old bookie data for
BKCluster (#16744)
eabc2cd9593 is described below
commit eabc2cd9593d9307d7e622fd99a5e4aa7a69e245
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Jul 25 16:41:20 2022 +0800
[Improve][broker] Support clear old bookie data for BKCluster (#16744)
(cherry picked from commit eb5725a2a2de92b42f0efcfeb38c506ba7882401)
---
.../pulsar/broker/EmbeddedPulsarCluster.java | 5 +++--
.../apache/pulsar/broker/EndToEndMetadataTest.java | 23 ++++++++++++++++++++++
.../pulsar/metadata/bookkeeper/BKCluster.java | 11 ++++++++---
.../pulsar/metadata/BaseMetadataStoreTest.java | 4 ++--
.../pulsar/metadata/bookkeeper/EndToEndTest.java | 4 ++--
5 files changed, 38 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
index 95b0477ee11..b9a017504e0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
@@ -52,11 +52,12 @@ public class EmbeddedPulsarCluster implements AutoCloseable
{
private final PulsarAdmin admin;
@Builder
- private EmbeddedPulsarCluster(int numBrokers, int numBookies, String
metadataStoreUrl) throws Exception {
+ private EmbeddedPulsarCluster(int numBrokers, int numBookies, String
metadataStoreUrl,
+ boolean clearOldData) throws Exception {
this.numBrokers = numBrokers;
this.numBookies = numBookies;
this.metadataStoreUrl = metadataStoreUrl;
- this.bkCluster = new BKCluster(metadataStoreUrl, numBookies);
+ this.bkCluster = new BKCluster(metadataStoreUrl, numBookies,
clearOldData);
for (int i = 0; i < numBrokers; i++) {
PulsarService s = new PulsarService(getConf());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EndToEndMetadataTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EndToEndMetadataTest.java
index 11097b53f28..0eb573a6f50 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EndToEndMetadataTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EndToEndMetadataTest.java
@@ -19,27 +19,50 @@
package org.apache.pulsar.broker;
import static org.testng.Assert.assertEquals;
+import java.io.File;
import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.util.IOUtils;
+import org.apache.commons.io.FileUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.metadata.BaseMetadataStoreTest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Slf4j
public class EndToEndMetadataTest extends BaseMetadataStoreTest {
+ private File tempDir;
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ tempDir = IOUtils.createTempDir("bookies", "test");
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.cleanup();
+ FileUtils.deleteDirectory(tempDir);
+ }
+
@Test(dataProvider = "impl")
public void testPublishConsume(String provider, Supplier<String>
urlSupplier) throws Exception {
+
@Cleanup
EmbeddedPulsarCluster epc = EmbeddedPulsarCluster.builder()
.numBrokers(1)
.numBookies(1)
.metadataStoreUrl(urlSupplier.get())
+ .clearOldData(true)
.build();
@Cleanup
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
index 6cc96dbee77..3aa49a24979 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.metadata.bookkeeper;
+import static org.apache.commons.io.FileUtils.cleanDirectory;
import java.io.File;
import java.io.IOException;
import java.net.NetworkInterface;
@@ -67,9 +68,10 @@ public class BKCluster implements AutoCloseable {
protected final ServerConfiguration baseConf =
newBaseServerConfiguration();
protected final ClientConfiguration baseClientConf =
newBaseClientConfiguration();
+ private final boolean clearOldData;
-
- public BKCluster(String metadataServiceUri, int numBookies) throws
Exception {
+ public BKCluster(String metadataServiceUri, int numBookies, boolean
clearOldData) throws Exception {
+ this.clearOldData = clearOldData;
this.metadataServiceUri = metadataServiceUri;
this.store = MetadataStoreExtended.create(metadataServiceUri,
MetadataStoreConfig.builder().build());
baseConf.setJournalRemovePagesFromCache(false);
@@ -87,7 +89,6 @@ public class BKCluster implements AutoCloseable {
@Override
public void close() throws Exception {
- boolean failed = false;
// stop bookkeeper service
try {
stopBKCluster();
@@ -161,6 +162,10 @@ public class BKCluster implements AutoCloseable {
private ServerConfiguration newServerConfiguration() throws Exception {
File f = createTempDir("bookie", "test");
+ if (clearOldData) {
+ cleanDirectory(f);
+ }
+
int port;
if (baseConf.isEnableLocalTransport() ||
!baseConf.getAllowEphemeralPorts()) {
port = PortManager.nextFreePort();
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index 4dc3fb3589c..6a15e1e8cc2 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -41,14 +41,14 @@ public abstract class BaseMetadataStoreTest extends
TestRetrySupport {
@BeforeClass(alwaysRun = true)
@Override
- public final void setup() throws Exception {
+ public void setup() throws Exception {
incrementSetupNumber();
zks = new TestZKServer();
}
@AfterClass(alwaysRun = true)
@Override
- public final void cleanup() throws Exception {
+ public void cleanup() throws Exception {
markCurrentSetupNumberCleaned();
if (zks != null) {
zks.close();
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/EndToEndTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/EndToEndTest.java
index 442def9b42d..36a100195c2 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/EndToEndTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/EndToEndTest.java
@@ -41,7 +41,7 @@ public class EndToEndTest extends BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testBasic(String provider, Supplier<String> urlSupplier)
throws Exception {
@Cleanup
- BKCluster bktc = new BKCluster(urlSupplier.get(), 1);
+ BKCluster bktc = new BKCluster(urlSupplier.get(), 1, true);
@Cleanup
BookKeeper bkc = bktc.newClient();
@@ -85,7 +85,7 @@ public class EndToEndTest extends BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testWithLedgerRecovery(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
- BKCluster bktc = new BKCluster(urlSupplier.get(), 3);
+ BKCluster bktc = new BKCluster(urlSupplier.get(), 3, true);
@Cleanup
BookKeeper bkc = bktc.newClient();