This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f4e4fd5c965 [improve][broker] Add config `fsyncEnable` for
`RocksdbMetadataStore` (#18801)
f4e4fd5c965 is described below
commit f4e4fd5c96587b8afadd134dd595549f4b283d6f
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Dec 14 11:06:48 2022 +0800
[improve][broker] Add config `fsyncEnable` for `RocksdbMetadataStore`
(#18801)
---
conf/standalone.conf | 1 -
.../apache/pulsar/broker/ServiceConfiguration.java | 1 -
.../pulsar/metadata/api/MetadataStoreConfig.java | 6 ++++
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 17 ++++-----
.../org/apache/pulsar/metadata/CounterTest.java | 5 ++-
.../apache/pulsar/metadata/LeaderElectionTest.java | 8 ++---
.../apache/pulsar/metadata/LockManagerTest.java | 12 +++----
.../apache/pulsar/metadata/MetadataCacheTest.java | 3 +-
.../apache/pulsar/metadata/MetadataStoreTest.java | 42 ++++++++++++++--------
.../LedgerUnderreplicationManagerTest.java | 3 +-
.../bookkeeper/PulsarLedgerIdGeneratorTest.java | 4 +--
.../bookkeeper/PulsarRegistrationClientTest.java | 19 +++++-----
.../bookkeeper/PulsarRegistrationManagerTest.java | 3 +-
13 files changed, 70 insertions(+), 54 deletions(-)
diff --git a/conf/standalone.conf b/conf/standalone.conf
index c8ce7f8a232..ed883406883 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -29,7 +29,6 @@ metadataStoreUrl=
# Configuration file path for metadata store. It's supported by
RocksdbMetadataStore and EtcdMetadataStore for now
metadataStoreConfigPath=
-
# The metadata store URL for the configuration data. If empty, we fall back to
use metadataStoreUrl
configurationMetadataStoreUrl=
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 228561d8c30..a39ebff49a0 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -512,7 +512,6 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private String metadataStoreConfigPath = null;
-
@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index b742d03f9db..5ddfe33c391 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -81,6 +81,12 @@ public class MetadataStoreConfig {
@Builder.Default
private final String metadataStoreName = "";
+ /**
+ * Whether we should enable fsync for local metadata store, It's supported
by RocksdbMetadataStore for now.
+ */
+ @Builder.Default
+ private final boolean fsyncEnable = true;
+
/**
* Pluggable MetadataEventSynchronizer to sync metadata events across the
* separate clusters.
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index d06d35db7ac..42f807e1c3b 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -87,8 +87,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
private final ReentrantReadWriteLock dbStateLock;
private volatile State state;
- private final WriteOptions optionSync;
- private final WriteOptions optionDontSync;
+ private final WriteOptions writeOptions;
private final ReadOptions optionCache;
private final ReadOptions optionDontCache;
private MetadataEventSynchronizer synchronizer;
@@ -235,8 +234,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
db = openDB(dataPath.toString(),
metadataStoreConfig.getConfigFilePath());
- this.optionSync = new WriteOptions().setSync(true);
- this.optionDontSync = new WriteOptions().setSync(false);
+ this.writeOptions = new
WriteOptions().setSync(metadataStoreConfig.isFsyncEnable());
this.optionCache = new ReadOptions().setFillCache(true);
this.optionDontCache = new ReadOptions().setFillCache(false);
@@ -261,7 +259,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
} else {
instanceId = 0;
}
- db.put(optionSync, INSTANCE_ID_KEY, toBytes(instanceId));
+ db.put(writeOptions, INSTANCE_ID_KEY, toBytes(instanceId));
return instanceId;
}
@@ -271,7 +269,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
if (value != null) {
generator.set(toLong(value));
} else {
- db.put(optionSync, INSTANCE_ID_KEY, toBytes(generator.get()));
+ db.put(writeOptions, INSTANCE_ID_KEY, toBytes(generator.get()));
}
return generator;
}
@@ -369,8 +367,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
state = State.CLOSED;
log.info("close.instanceId={}", instanceId);
db.close();
- optionSync.close();
- optionDontSync.close();
+ writeOptions.close();
optionCache.close();
optionDontCache.close();
super.close();
@@ -496,7 +493,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
if (state == State.CLOSED) {
throw new MetadataStoreException.AlreadyClosedException("");
}
- try (Transaction transaction = db.beginTransaction(optionSync)) {
+ try (Transaction transaction = db.beginTransaction(writeOptions)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData =
transaction.getForUpdate(optionDontCache, pathBytes, true);
MetaValue metaValue = MetaValue.parse(oldValueData);
@@ -535,7 +532,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
if (state == State.CLOSED) {
throw new MetadataStoreException.AlreadyClosedException("");
}
- try (Transaction transaction = db.beginTransaction(optionSync)) {
+ try (Transaction transaction = db.beginTransaction(writeOptions)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData =
transaction.getForUpdate(optionDontCache, pathBytes, true);
MetaValue metaValue = MetaValue.parse(oldValueData);
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java
index e642bb4fea7..ead80a02873 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;
-
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
@@ -45,7 +44,7 @@ public class CounterTest extends BaseMetadataStoreTest {
public void basicTest(String provider, Supplier<String> urlSupplier)
throws Exception {
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
@Cleanup
CoordinationService cs1 = new CoordinationServiceImpl(store);
@@ -102,7 +101,7 @@ public class CounterTest extends BaseMetadataStoreTest {
public void testGetNextCounterRetry(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
MetadataStoreExtended spy = spy(store);
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
index c187d20ed2d..632c0c0fedf 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java
@@ -43,7 +43,7 @@ public class LeaderElectionTest extends BaseMetadataStoreTest
{
public void basicTest(String provider, Supplier<String> urlSupplier)
throws Exception {
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
@Cleanup
CoordinationService coordinationService = new
CoordinationServiceImpl(store);
@@ -133,7 +133,7 @@ public class LeaderElectionTest extends
BaseMetadataStoreTest {
public void leaderNodeIsDeletedExternally(String provider,
Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
@Cleanup
CoordinationService coordinationService = new
CoordinationServiceImpl(store);
@@ -161,7 +161,7 @@ public class LeaderElectionTest extends
BaseMetadataStoreTest {
public void closeAll(String provider, Supplier<String> urlSupplier) throws
Exception {
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
MetadataCache<String> cache = store.getMetadataCache(String.class);
CoordinationService cs = new CoordinationServiceImpl(store);
@@ -191,7 +191,7 @@ public class LeaderElectionTest extends
BaseMetadataStoreTest {
public void revalidateLeaderWithinSameSession(String provider,
Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
String path = newKey();
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
index be0b0448e9f..250e9f02dd5 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
@@ -53,7 +53,7 @@ public class LockManagerTest extends BaseMetadataStoreTest {
public void acquireLocks(String provider, Supplier<String> urlSupplier)
throws Exception {
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
@Cleanup
CoordinationService coordinationService = new
CoordinationServiceImpl(store);
@@ -104,7 +104,7 @@ public class LockManagerTest extends BaseMetadataStoreTest {
public void cleanupOnClose(String provider, Supplier<String> urlSupplier)
throws Exception {
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
@Cleanup
CoordinationService coordinationService = new
CoordinationServiceImpl(store);
@@ -135,7 +135,7 @@ public class LockManagerTest extends BaseMetadataStoreTest {
public void updateValue(String provider, Supplier<String> urlSupplier)
throws Exception {
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
MetadataCache<String> cache = store.getMetadataCache(String.class);
@@ -159,7 +159,7 @@ public class LockManagerTest extends BaseMetadataStoreTest {
public void updateValueWhenVersionIsOutOfSync(String provider,
Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
MetadataCache<String> cache = store.getMetadataCache(String.class);
@@ -187,7 +187,7 @@ public class LockManagerTest extends BaseMetadataStoreTest {
public void updateValueWhenKeyDisappears(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
MetadataCache<String> cache = store.getMetadataCache(String.class);
@@ -213,7 +213,7 @@ public class LockManagerTest extends BaseMetadataStoreTest {
public void revalidateLockWithinSameSession(String provider,
Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
- MetadataStoreConfig.builder().build());
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
@Cleanup
CoordinationService cs2 = new CoordinationServiceImpl(store);
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index b3a670dc43d..e7ccd355712 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -245,7 +245,8 @@ public class MetadataCacheTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void insertionDeletion(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
MetadataCache<MyClass> objCache =
store.getMetadataCache(MyClass.class);
String key1 = newKey();
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index ece23583c4d..b8a383da58d 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -60,7 +60,8 @@ public class MetadataStoreTest extends BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void emptyStoreTest(String provider, Supplier<String> urlSupplier)
throws Exception {
@Cleanup
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
assertFalse(store.exists("/non-existing-key").join());
assertFalse(store.exists("/non-existing-key/child").join());
@@ -89,7 +90,8 @@ public class MetadataStoreTest extends BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void concurrentPutTest(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
String data = "data";
String path = "/non-existing-key";
@@ -109,7 +111,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void insertionTestWithExpectedVersion(String provider,
Supplier<String> urlSupplier) throws Exception {
@Cleanup
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
String key1 = newKey();
@@ -167,7 +170,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void getChildrenTest(String provider, Supplier<String> urlSupplier)
throws Exception {
@Cleanup
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
String key = newKey();
int n = 10;
@@ -218,7 +222,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void deletionTest(String provider, Supplier<String> urlSupplier)
throws Exception {
@Cleanup
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
String key = newKey();
int n = 10;
@@ -252,7 +257,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void emptyKeyTest(String provider, Supplier<String> urlSupplier)
throws Exception {
@Cleanup
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
try {
store.delete("", Optional.empty()).join();
@@ -293,7 +299,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void notificationListeners(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
BlockingQueue<Notification> notifications = new
LinkedBlockingDeque<>();
store.registerListener(n -> {
@@ -359,7 +366,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testDeleteRecursive(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
String prefix = newKey();
@@ -384,7 +392,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testDeleteUnusedDirectories(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
String prefix = newKey();
@@ -413,7 +422,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testPersistent(String provider, Supplier<String> urlSupplier)
throws Exception {
String metadataUrl = urlSupplier.get();
- MetadataStore store = MetadataStoreFactory.create(metadataUrl,
MetadataStoreConfig.builder().build());
+ MetadataStore store =
+ MetadataStoreFactory.create(metadataUrl,
MetadataStoreConfig.builder().fsyncEnable(false).build());
byte[] data = "testPersistent".getBytes(StandardCharsets.UTF_8);
String key = newKey() + "/a/b/c";
@@ -429,7 +439,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testConcurrentPutGetOneKey(String provider, Supplier<String>
urlSupplier) throws Exception {
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
byte[] data = new byte[]{0};
String path = newKey();
int maxValue = 100;
@@ -474,7 +485,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testConcurrentPut(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
String k = newKey();
CompletableFuture<Void> f1 =
@@ -489,7 +501,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testConcurrentDelete(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
String k = newKey();
store.put(k, new byte[0], Optional.of(-1L)).join();
@@ -505,7 +518,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testGetChildren(String provider, Supplier<String> urlSupplier)
throws Exception {
@Cleanup
- MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
store.put("/a/a-1", "value1".getBytes(StandardCharsets.UTF_8),
Optional.empty()).join();
store.put("/a/a-2", "value1".getBytes(StandardCharsets.UTF_8),
Optional.empty()).join();
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
index 0dd40c47794..0df325b3c57 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
@@ -93,7 +93,8 @@ public class LedgerUnderreplicationManagerTest extends
BaseMetadataStoreTest {
private void methodSetup(Supplier<String> urlSupplier) throws Exception {
this.executor = Executors.newSingleThreadExecutor();
String ledgersRoot = "/ledgers-" + UUID.randomUUID();
- this.store = MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ this.store = MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
this.layoutManager = new PulsarLayoutManager(store, ledgersRoot);
this.lmf = new PulsarLedgerManagerFactory();
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
index 09bb85b140c..73d5f451c1f 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGeneratorTest.java
@@ -49,8 +49,8 @@ public class PulsarLedgerIdGeneratorTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testGenerateLedgerId(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
- MetadataStoreExtended store =
- MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+
MetadataStoreConfig.builder().fsyncEnable(false).build());
@Cleanup
PulsarLedgerIdGenerator ledgerIdGenerator = new
PulsarLedgerIdGenerator(store, "/ledgers");
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
index d2a21a2249d..f599451c007 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
@@ -19,12 +19,11 @@
package org.apache.pulsar.metadata.bookkeeper;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
-import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -73,8 +72,8 @@ public class PulsarRegistrationClientTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testGetWritableBookies(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
- MetadataStoreExtended store =
- MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
@@ -100,8 +99,8 @@ public class PulsarRegistrationClientTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testGetReadonlyBookies(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
- MetadataStoreExtended store =
- MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
@@ -126,8 +125,8 @@ public class PulsarRegistrationClientTest extends
BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void testGetBookieServiceInfo(String provider, Supplier<String>
urlSupplier) throws Exception {
@Cleanup
- MetadataStoreExtended store =
- MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+
MetadataStoreConfig.builder().fsyncEnable(false).build());
String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
@@ -318,8 +317,8 @@ public class PulsarRegistrationClientTest extends
BaseMetadataStoreTest {
throws Exception {
@Cleanup
- MetadataStoreExtended store =
- MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+
MetadataStoreConfig.builder().fsyncEnable(false).build());
String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManagerTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManagerTest.java
index 1a677914f8f..a61a66bf2c9 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManagerTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationManagerTest.java
@@ -51,7 +51,8 @@ public class PulsarRegistrationManagerTest extends
BaseMetadataStoreTest {
private void methodSetup(Supplier<String> urlSupplier) throws Exception {
this.ledgersRootPath = "/ledgers-" + UUID.randomUUID();
- this.store = MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
+ this.store = MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
this.registrationManager = new PulsarRegistrationManager(store,
ledgersRootPath, new ServerConfiguration());
}