This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new c5f7271499e [improve][meta] PIP-453: Improve the metadata store
threading model (#25187)
c5f7271499e is described below
commit c5f7271499e8879353139b9b1ee021a8dda111ad
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Feb 4 20:01:35 2026 +0800
[improve][meta] PIP-453: Improve the metadata store threading model (#25187)
(cherry picked from commit 6d415c6dbfb8bf9f2f8a5b823927cf422a6bd675)
---
conf/broker.conf | 2 +
conf/standalone.conf | 3 +
pip/pip-453.md | 26 +--
.../IsolatedBookieEnsemblePlacementPolicy.java | 3 +
.../apache/pulsar/broker/ServiceConfiguration.java | 6 +
.../IsolatedBookieEnsemblePlacementPolicyTest.java | 34 ++--
.../org/apache/pulsar/broker/PulsarService.java | 2 +
.../apache/pulsar/broker/PulsarServiceTest.java | 65 ++++++++
.../stats/OpenTelemetryMetadataStoreStatsTest.java | 12 --
.../pulsar/metadata/api/MetadataStoreConfig.java | 3 +
.../metadata/cache/impl/MetadataCacheImpl.java | 185 ++++++++++++---------
.../metadata/impl/AbstractMetadataStore.java | 77 +++++----
.../pulsar/metadata/impl/EtcdMetadataStore.java | 19 +--
.../metadata/impl/LocalMemoryMetadataStore.java | 2 +-
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 2 +-
.../pulsar/metadata/impl/ZKMetadataStore.java | 19 +--
.../batching/AbstractBatchedMetadataStore.java | 39 +++--
.../metadata/impl/oxia/OxiaMetadataStore.java | 4 +-
.../impl/stats/BatchMetadataStoreStats.java | 43 +----
.../impl/MetadataStoreFactoryImplTest.java | 2 +-
20 files changed, 316 insertions(+), 232 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 0ec722348a8..25e5bcc15cc 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -951,6 +951,8 @@ metadataStoreBatchingMaxOperations=1000
# Maximum size of a batch
metadataStoreBatchingMaxSizeKb=128
+# The number of threads used for serializing and deserializing data to and
from the metadata store
+metadataStoreSerDesThreads=1
### --- Authentication --- ###
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 2d0c004fbb7..091cbf5661f 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -430,6 +430,9 @@ metadataStoreBatchingMaxOperations=1000
# Maximum size of a batch
metadataStoreBatchingMaxSizeKb=128
+# The number of threads used for serializing and deserializing data to and
from the metadata store
+metadataStoreSerDesThreads=1
+
### --- TLS --- ###
# Deprecated - Use webServicePortTls and brokerServicePortTls instead
tlsEnabled=false
diff --git a/pip/pip-453.md b/pip/pip-453.md
index a42736b9dda..f9109798ba7 100644
--- a/pip/pip-453.md
+++ b/pip/pip-453.md
@@ -40,8 +40,9 @@ Additionally, some code paths execute the compute intensive
tasks in the metadat
# High Level Design
-Create 3 set of threads:
+Create 4 sets of threads:
- `<name>-event`: the original metadata store thread, which is now only
responsible to handle notifications. This executor won't be a
`ScheduledExecutorService` anymore.
+- `<name>-scheduler`: a single thread, which is used to schedule tasks like
flushing and retrying failed operations.
- `<name>-batch-flusher`: a single thread, which is used to schedule the
flushing task at a fixed rate. It won't be created if
`metadataStoreBatchingEnabled` is false.
- `<name>-worker`: a fixed thread pool shared by all `MetadataCache` instances
to execute compute intensive tasks like serialization and deserialization. The
same path will be handled by the same thread to keep the processing order on
the same path.
@@ -53,25 +54,6 @@ The only concern is that introducing a new thread to execute
callbacks allows wa
metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());;
```
-Other tasks like the retry on failure is executed in JVM's common
`ForkJoinPool` by `CompletableFuture` APIs. For example:
-
-```diff
----
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
-+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
-@@ -245,9 +245,8 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
- countsByType, totalSize, opsForLog);
-
- // Retry with the individual operations
-- executor.schedule(() -> {
-- ops.forEach(o ->
batchOperation(Collections.singletonList(o)));
-- }, 100, TimeUnit.MILLISECONDS);
-+ CompletableFuture.delayedExecutor(100,
TimeUnit.MILLISECONDS).execute(() ->
-+ ops.forEach(o ->
batchOperation(Collections.singletonList(o))));
- } else {
- MetadataStoreException e = getException(code, path);
- ops.forEach(o ->
o.getFuture().completeExceptionally(e));
-```
-
# Detailed Design
## Public-facing Changes
@@ -85,9 +67,11 @@ Add a configurations to specify the number of worker threads
for `MetadataCache`
category = CATEGORY_SERVER,
doc = "The number of threads uses for serializing and
deserializing data to and from the metadata store"
)
- private int metadataStoreSerDesThreads =
Runtime.getRuntime().availableProcessors();
+ private int metadataStoreSerDesThreads = 1;
```
+Use 1 as the default value since the serialization and deserialization tasks
are not frequent. This separated thread pool is mainly added to avoid blocking
the metadata store callback thread.
+
### Metrics
The `pulsar_batch_metadata_store_executor_queue_size` metric will be removed
because the `<name>-batch-flusher` thread won't execute other tasks except for
flushing.
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index 878bbc4d654..4ef1c594be4 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -57,6 +58,8 @@ public class IsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePlac
// the secondary group.
private ImmutablePair<Set<String>, Set<String>> defaultIsolationGroups;
+ @Getter
+ @VisibleForTesting
private MetadataCache<BookiesRackConfiguration> bookieMappingCache;
private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*";
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d04d8d9be26..1345f180b5c 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -484,6 +484,12 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private boolean metadataStoreAllowReadOnlyOperations;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "The number of threads used for serializing and
deserializing data to and from the metadata store"
+ )
+ private int metadataStoreSerDesThreads = 1;
+
@Deprecated
@FieldContext(
category = CATEGORY_SERVER,
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index 68f92ab416d..936b04386ff 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -22,12 +22,15 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -288,8 +291,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
secondaryBookieGroup.put(BOOKIE4,
BookieInfo.builder().rack("rack0").build());
bookieMapping.put("group2", secondaryBookieGroup);
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
- Optional.empty()).join();
+ updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(),
null).getResult();
@@ -340,8 +342,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
+ "\": {\"rack\": \"rack0\", \"hostname\":
\"bookie3.example.com\"}, \"" + BOOKIE4
+ "\": {\"rack\": \"rack2\", \"hostname\":
\"bookie4.example.com\"}}}";
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
data.getBytes(StandardCharsets.UTF_8),
- Optional.empty()).join();
+ updateBookieInfo(isolationPolicy,
data.getBytes(StandardCharsets.UTF_8));
List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2,
Collections.emptyMap(),
new HashSet<>()).getResult();
@@ -399,8 +400,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
bookieMapping.put("group1", mainBookieGroup);
bookieMapping.put("group2", secondaryBookieGroup);
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
- Optional.empty()).join();
+ updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(),
new HashSet<>()).getResult();
@@ -784,8 +784,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
bookieMapping.put(isolationGroup2, group2);
bookieMapping.put(isolationGroup3, group3);
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
- Optional.empty()).join();
+ updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3));
groups.setRight(Sets.newHashSet(""));
@@ -808,8 +807,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
bookieMapping.put(isolationGroup1, group1);
bookieMapping.put(isolationGroup2, group2);
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
- Optional.empty()).join();
+ updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
groups.setLeft(Sets.newHashSet(isolationGroup1));
groups.setRight(Sets.newHashSet(isolationGroup2));
@@ -831,12 +829,24 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
bookieMapping.put(isolationGroup1, group1);
bookieMapping.put(isolationGroup2, group2);
- store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
- Optional.empty()).join();
+ updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
groups.setLeft(Sets.newHashSet(isolationGroup1));
groups.setRight(Sets.newHashSet(isolationGroup2));
blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2,
groups);
assertTrue(blacklist.isEmpty());
}
+
+ // The policy gets the bookie info asynchronously before each query or
update, when putting the bookie info into
+ // the metadata store, the cache needs some time to receive the
notification and update accordingly.
+ private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy
isolationPolicy, byte[] bookieInfo) {
+ final var cache = isolationPolicy.getBookieMappingCache();
+ assertNotNull(cache); // the policy must have been initialized
+
+ final var key = BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH;
+ final var previousBookieInfo = cache.getIfCached(key);
+ store.put(key, bookieInfo, Optional.empty()).join();
+ Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() ->
+ assertNotEquals(cache.getIfCached(key), previousBookieInfo));
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 56949873621..d4b69f8d5fb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -429,6 +429,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.synchronizer(synchronizer)
.openTelemetry(openTelemetry)
.nodeSizeStats(new DefaultMetadataNodeSizeStats())
+
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
.build());
}
@@ -1299,6 +1300,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.openTelemetry(openTelemetry)
.nodeSizeStats(new DefaultMetadataNodeSizeStats())
+
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
.build());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 6c04889d8f1..6195e9cdae5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -25,10 +25,15 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertSame;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -38,6 +43,10 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
+import org.apache.pulsar.metadata.api.MetadataSerde;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.Stat;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -339,4 +348,60 @@ public class PulsarServiceTest extends
MockedPulsarServiceBaseTest {
assertTrue(e instanceof PulsarClientException.TimeoutException);
}
}
+
+ @Test
+ public void testMetadataSerDesThreads() throws Exception {
+ final var numSerDesThreads = 5;
+ final var config = new ServiceConfiguration();
+ config.setMetadataStoreSerDesThreads(numSerDesThreads);
+ config.setClusterName("test");
+ config.setMetadataStoreUrl("memory:local");
+ config.setConfigurationMetadataStoreUrl("memory:local");
+
+ @Cleanup final var pulsar = new PulsarService(config);
+ pulsar.start();
+
+ BiConsumer<MetadataStore, String> verifier = (store, prefix) -> {
+ final var serDes = new CustomMetadataSerDes();
+ final var cache = store.getMetadataCache(prefix, serDes,
MetadataCacheConfig.builder().build());
+ for (int i = 0; i < 100 &&
serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) {
+ cache.create(prefix + i, "value-" + i).join();
+ final var value = cache.get(prefix + i).join();
+ assertEquals(value.orElseThrow(), "value-" + i);
+ final var newValue = cache.readModifyUpdate(prefix + i, s -> s
+ "-updated").join();
+ assertEquals(newValue, "value-" + i + "-updated");
+ // Verify the serialization and deserialization are handled by
the same thread
+ assertEquals(serDes.threadNameToSerializedPaths,
serDes.threadNameToDeserializedPaths);
+ }
+ log.info("SerDes thread mapping: {}",
serDes.threadNameToSerializedPaths);
+ assertEquals(serDes.threadNameToSerializedPaths.keySet().size(),
numSerDesThreads);
+ // Verify a path cannot be handled by multiple threads
+ final var paths =
serDes.threadNameToSerializedPaths.values().stream()
+ .flatMap(Set::stream).sorted().toList();
+ assertEquals(paths.stream().distinct().toList(), paths);
+ };
+
+ verifier.accept(pulsar.getLocalMetadataStore(), "/test-local/");
+ verifier.accept(pulsar.getConfigurationMetadataStore(),
"/test-config/");
+ }
+
+ private static class CustomMetadataSerDes implements MetadataSerde<String>
{
+
+ final Map<String, Set<String>> threadNameToSerializedPaths = new
ConcurrentHashMap<>();
+ final Map<String, Set<String>> threadNameToDeserializedPaths = new
ConcurrentHashMap<>();
+
+ @Override
+ public byte[] serialize(String path, String value) throws IOException{
+
threadNameToSerializedPaths.computeIfAbsent(Thread.currentThread().getName(),
+ __ -> ConcurrentHashMap.newKeySet()).add(path);
+ return value.getBytes();
+ }
+
+ @Override
+ public String deserialize(String path, byte[] data, Stat stat) throws
IOException {
+
threadNameToDeserializedPaths.computeIfAbsent(Thread.currentThread().getName(),
+ __ -> ConcurrentHashMap.newKeySet()).add(path);
+ return new String(data);
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
index 9e8bde20b88..390aa1e49e2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.stats;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes;
-import java.util.concurrent.ExecutorService;
import lombok.Cleanup;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -29,7 +28,6 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -53,14 +51,6 @@ public class OpenTelemetryMetadataStoreStatsTest extends
BrokerTestBase {
var newStats = new MetadataStoreStats(
localMetadataStoreName,
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
FieldUtils.writeField(localMetadataStore, "metadataStoreStats",
newStats, true);
-
- var currentBatchedStats = (BatchMetadataStoreStats)
FieldUtils.readField(localMetadataStore,
- "batchMetadataStoreStats", true);
- currentBatchedStats.close();
- var currentExecutor = (ExecutorService)
FieldUtils.readField(currentBatchedStats, "executor", true);
- var newBatchedStats = new
BatchMetadataStoreStats(localMetadataStoreName, currentExecutor,
-
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
- FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats",
newBatchedStats, true);
}
@AfterMethod(alwaysRun = true)
@@ -89,7 +79,5 @@ public class OpenTelemetryMetadataStoreStatsTest extends
BrokerTestBase {
var metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
assertMetricLongSumValue(metrics,
MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME,
attributes, value -> assertThat(value).isPositive());
- assertMetricLongSumValue(metrics,
BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes,
- value -> assertThat(value).isPositive());
}
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index ef50dc87691..fcde0dce840 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -104,4 +104,7 @@ public class MetadataStoreConfig {
* The estimator to estimate the payload length of metadata node, which
used to limit the batch size requested.
*/
private MetadataNodeSizeStats nodeSizeStats;
+
+ @Builder.Default
+ private final int numSerDesThreads = 1;
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index b1f0572547c..ca165f0464e 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -39,10 +39,10 @@ import java.util.function.Supplier;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.common.stats.CacheMetricsCollector;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.metadata.api.CacheGetResult;
-import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -62,23 +62,26 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
private final MetadataStore store;
private final MetadataStoreExtended storeExtended;
private final MetadataSerde<T> serde;
- private final ScheduledExecutorService executor;
+ private final OrderedExecutor executor;
+ private final ScheduledExecutorService schedulerExecutor;
private final MetadataCacheConfig<T> cacheConfig;
private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>>
objCache;
public MetadataCacheImpl(String cacheName, MetadataStore store,
TypeReference<T> typeRef,
- MetadataCacheConfig<T> cacheConfig,
ScheduledExecutorService executor) {
- this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef),
cacheConfig, executor);
+ MetadataCacheConfig<T> cacheConfig,
OrderedExecutor executor,
+ ScheduledExecutorService schedulerExecutor) {
+ this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef),
cacheConfig, executor, schedulerExecutor);
}
public MetadataCacheImpl(String cacheName, MetadataStore store, JavaType
type, MetadataCacheConfig<T> cacheConfig,
- ScheduledExecutorService executor) {
- this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type),
cacheConfig, executor);
+ OrderedExecutor executor,
ScheduledExecutorService schedulerExecutor) {
+ this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type),
cacheConfig, executor, schedulerExecutor);
}
public MetadataCacheImpl(String cacheName, MetadataStore store,
MetadataSerde<T> serde,
- MetadataCacheConfig<T> cacheConfig,
ScheduledExecutorService executor) {
+ MetadataCacheConfig<T> cacheConfig,
OrderedExecutor executor,
+ ScheduledExecutorService schedulerExecutor) {
this.store = store;
if (store instanceof MetadataStoreExtended) {
this.storeExtended = (MetadataStoreExtended) store;
@@ -88,6 +91,7 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
this.serde = serde;
this.cacheConfig = cacheConfig;
this.executor = executor;
+ this.schedulerExecutor = schedulerExecutor;
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
if (cacheConfig.getRefreshAfterWriteMillis() > 0) {
@@ -101,6 +105,9 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
.buildAsync(new AsyncCacheLoader<String,
Optional<CacheGetResult<T>>>() {
@Override
public CompletableFuture<Optional<CacheGetResult<T>>>
asyncLoad(String key, Executor executor) {
+ if (log.isDebugEnabled()) {
+ log.debug("Loading key {} into metadata cache {}",
key, cacheName);
+ }
return readValueFromStore(key);
}
@@ -110,12 +117,16 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
Optional<CacheGetResult<T>> oldValue,
Executor executor) {
if (store instanceof AbstractMetadataStore &&
((AbstractMetadataStore) store).isConnected()) {
- return readValueFromStore(key).thenApply(val -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Reloading key {} into metadata
cache {}", key, cacheName);
+ }
+ final var future = readValueFromStore(key);
+ future.thenAccept(val -> {
if (cacheConfig.getAsyncReloadConsumer() !=
null) {
cacheConfig.getAsyncReloadConsumer().accept(key, val);
}
- return val;
});
+ return future;
} else {
// Do not try to refresh the cache item if we know
that we're not connected to the
// metadata store
@@ -128,22 +139,46 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
}
private CompletableFuture<Optional<CacheGetResult<T>>>
readValueFromStore(String path) {
- return store.get(path)
- .thenCompose(optRes -> {
- if (!optRes.isPresent()) {
- return FutureUtils.value(Optional.empty());
- }
-
- try {
- GetResult res = optRes.get();
- T obj = serde.deserialize(path, res.getValue(),
res.getStat());
- return FutureUtils
- .value(Optional.of(new CacheGetResult<>(obj,
res.getStat())));
- } catch (Throwable t) {
- return FutureUtils.exception(new
ContentDeserializationException(
- "Failed to deserialize payload for key '" +
path + "'", t));
- }
- });
+ final var future = new
CompletableFuture<Optional<CacheGetResult<T>>>();
+ store.get(path).thenComposeAsync(optRes -> {
+ // There could be multiple pending reads for the same path, for
example, when a path is created,
+ // 1. The `accept` method will call `refresh`
+ // 2. The `put` method will call `refresh` after the metadata
store put operation is done
+ // Both will call this method and the same result will be read. In
this case, we only need to deserialize
+ // the value once.
+ if (!optRes.isPresent()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Key {} not found in metadata store", path);
+ }
+ return FutureUtils.value(Optional.<CacheGetResult<T>>empty());
+ }
+ final var res = optRes.get();
+ final var cachedFuture = objCache.getIfPresent(path);
+ if (cachedFuture != null && cachedFuture != future) {
+ if (log.isDebugEnabled()) {
+ log.debug("A new read on key {} is in progress or
completed, ignore this one", path);
+ }
+ return cachedFuture;
+ }
+ try {
+ T obj = serde.deserialize(path, res.getValue(), res.getStat());
+ if (log.isDebugEnabled()) {
+ log.debug("Deserialized value for key {} (version: {}):
{}", path, res.getStat().getVersion(),
+ obj);
+ }
+ return FutureUtils.value(Optional.of(new CacheGetResult<>(obj,
res.getStat())));
+ } catch (Throwable t) {
+ return FutureUtils.exception(new
ContentDeserializationException(
+ "Failed to deserialize payload for key '" + path + "'",
t));
+ }
+ }, executor.chooseThread(path)).whenComplete((result, e) -> {
+ if (e != null) {
+ future.completeExceptionally(e.getCause());
+ } else {
+ future.complete(result);
+ }
+ });
+ return future;
}
@Override
@@ -169,8 +204,9 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
@Override
public CompletableFuture<T> readModifyUpdateOrCreate(String path,
Function<Optional<T>, T> modifyFunction) {
+ final var executor = this.executor.chooseThread(path);
return executeWithRetry(() -> objCache.get(path)
- .thenCompose(optEntry -> {
+ .thenComposeAsync(optEntry -> {
Optional<T> currentValue;
long expectedVersion;
@@ -202,13 +238,14 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
return store.put(path, newValue,
Optional.of(expectedVersion)).thenAccept(__ -> {
refresh(path);
}).thenApply(__ -> newValueObj);
- }), path);
+ }, executor), path);
}
@Override
public CompletableFuture<T> readModifyUpdate(String path, Function<T, T>
modifyFunction) {
+ final var executor = this.executor.chooseThread(path);
return executeWithRetry(() -> objCache.get(path)
- .thenCompose(optEntry -> {
+ .thenComposeAsync(optEntry -> {
if (!optEntry.isPresent()) {
return FutureUtils.exception(new
NotFoundException(""));
}
@@ -231,59 +268,57 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
return store.put(path, newValue,
Optional.of(expectedVersion)).thenAccept(__ -> {
refresh(path);
}).thenApply(__ -> newValueObj);
- }), path);
+ }, executor), path);
+ }
+
+ private CompletableFuture<byte[]> serialize(String path, T value) {
+ final var future = new CompletableFuture<byte[]>();
+ executor.executeOrdered(path, () -> {
+ try {
+ future.complete(serde.serialize(path, value));
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ });
+ return future;
}
@Override
public CompletableFuture<Void> create(String path, T value) {
- byte[] content;
- try {
- content = serde.serialize(path, value);
- } catch (Throwable t) {
- return FutureUtils.exception(t);
- }
-
- CompletableFuture<Void> future = new CompletableFuture<>();
- store.put(path, content, Optional.of(-1L))
- .thenAccept(stat -> {
- // Make sure we have the value cached before the operation
is completed
- // In addition to caching the value, we need to add a
watch on the path,
- // so when/if it changes on any other node, we are
notified and we can
- // update the cache
- objCache.get(path).whenComplete((stat2, ex) -> {
- if (ex == null) {
- future.complete(null);
- } else {
- log.error("Exception while getting path {}", path,
ex);
- future.completeExceptionally(ex.getCause());
- }
- });
- }).exceptionally(ex -> {
- if (ex.getCause() instanceof BadVersionException) {
- // Use already exists exception to provide more
self-explanatory error message
- future.completeExceptionally(new
AlreadyExistsException(ex.getCause()));
- } else {
- future.completeExceptionally(ex.getCause());
- }
- return null;
- });
-
+ final var future = new CompletableFuture<Void>();
+ serialize(path, value).thenCompose(content -> store.put(path, content,
Optional.of(-1L)))
+ // Make sure we have the value cached before the operation is
completed
+ // In addition to caching the value, we need to add a watch on the
path,
+ // so when/if it changes on any other node, we are notified and we
can
+ // update the cache
+ .thenCompose(__ -> objCache.get(path))
+ .whenComplete((__, ex) -> {
+ if (ex == null) {
+ future.complete(null);
+ } else if (ex.getCause() instanceof BadVersionException) {
+ // Use already exists exception to provide more
self-explanatory error message
+ future.completeExceptionally(new
AlreadyExistsException(ex.getCause()));
+ } else {
+ future.completeExceptionally(ex.getCause());
+ }
+ });
return future;
}
@Override
public CompletableFuture<Void> put(String path, T value,
EnumSet<CreateOption> options) {
- final byte[] bytes;
- try {
- bytes = serde.serialize(path, value);
- } catch (IOException e) {
- return CompletableFuture.failedFuture(e);
- }
- if (storeExtended != null) {
- return storeExtended.put(path, bytes, Optional.empty(),
options).thenAccept(__ -> refresh(path));
- } else {
- return store.put(path, bytes, Optional.empty()).thenAccept(__ ->
refresh(path));
- }
+ return serialize(path, value).thenCompose(bytes -> {
+ if (storeExtended != null) {
+ return storeExtended.put(path, bytes, Optional.empty(),
options);
+ } else {
+ return store.put(path, bytes, Optional.empty());
+ }
+ }).thenAccept(__ -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Refreshing path {} after put operation", path);
+ }
+ refresh(path);
+ });
}
@Override
@@ -323,6 +358,9 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
switch (t.getType()) {
case Created:
case Modified:
+ if (log.isDebugEnabled()) {
+ log.debug("Refreshing path {} for {} notification", path,
t.getType());
+ }
refresh(path);
break;
@@ -354,8 +392,7 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
final var next = backoff.next();
log.info("Update key {} conflicts. Retrying in {} ms.
Mandatory stop: {}. Elapsed time: {} ms", key,
next, backoff.isMandatoryStopMade(), elapsed);
- executor.schedule(() -> execute(op, key, result, backoff),
next,
- TimeUnit.MILLISECONDS);
+ schedulerExecutor.schedule(() -> execute(op, key, result,
backoff), next, TimeUnit.MILLISECONDS);
return null;
}
result.completeExceptionally(ex.getCause());
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index b0e4b43f700..d118a792e2f 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -26,6 +26,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import java.time.Instant;
@@ -38,16 +39,17 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.stats.CacheMetricsCollector;
import org.apache.pulsar.common.util.FutureUtil;
@@ -76,7 +78,9 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new
CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Consumer<SessionEvent>>
sessionListeners = new CopyOnWriteArrayList<>();
protected final String metadataStoreName;
- protected final ScheduledExecutorService executor;
+ private final OrderedExecutor serDesExecutor;
+ private final ExecutorService eventExecutor;
+ private final ScheduledExecutorService schedulerExecutor;
private final AsyncLoadingCache<String, List<String>> childrenCache;
private final AsyncLoadingCache<String, Boolean> existsCache;
private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches =
new CopyOnWriteArrayList<>();
@@ -93,13 +97,21 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected MetadataNodeSizeStats nodeSizeStats;
- protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry
openTelemetry,
- MetadataNodeSizeStats nodeSizeStats) {
+ protected AbstractMetadataStore(
+ String metadataStoreName, OpenTelemetry openTelemetry,
MetadataNodeSizeStats nodeSizeStats,
+ int numSerDesThreads) {
this.nodeSizeStats = nodeSizeStats == null ? new
DummyMetadataNodeSizeStats()
: nodeSizeStats;
- this.executor = new ScheduledThreadPoolExecutor(1,
- new DefaultThreadFactory(
- StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName()));
+ final var namePrefix = StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName
+ : getClass().getSimpleName();
+ this.eventExecutor = Executors.newSingleThreadExecutor(
+ new DefaultThreadFactory(namePrefix + "-event"));
+ this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory(namePrefix + "-scheduler"));
+ this.serDesExecutor = OrderedExecutor.newBuilder()
+ .numThreads(numSerDesThreads)
+ .name(namePrefix + "-worker")
+ .build();
registerListener(this);
this.childrenCache = Caffeine.newBuilder()
@@ -249,7 +261,8 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
JavaType typeRef =
TypeFactory.defaultInstance().constructSimpleType(clazz, null);
String cacheName = StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName();
MetadataCacheImpl<T> metadataCache =
- new MetadataCacheImpl<T>(cacheName, this, typeRef,
cacheConfig, this.executor);
+ new MetadataCacheImpl<T>(cacheName, this, typeRef,
cacheConfig, this.serDesExecutor,
+ this.schedulerExecutor);
metadataCaches.add(metadataCache);
return metadataCache;
}
@@ -258,7 +271,8 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef,
MetadataCacheConfig cacheConfig) {
String cacheName = StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName();
MetadataCacheImpl<T> metadataCache =
- new MetadataCacheImpl<T>(cacheName, this, typeRef,
cacheConfig, this.executor);
+ new MetadataCacheImpl<T>(cacheName, this, typeRef,
cacheConfig, this.serDesExecutor,
+ this.schedulerExecutor);
metadataCaches.add(metadataCache);
return metadataCache;
}
@@ -268,7 +282,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
MetadataCacheConfig
cacheConfig) {
MetadataCacheImpl<T> metadataCache =
new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig,
- this.executor);
+ this.serDesExecutor, this.schedulerExecutor);
metadataCaches.add(metadataCache);
return metadataCache;
}
@@ -348,7 +362,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
});
return null;
- }, executor);
+ }, eventExecutor);
} catch (RejectedExecutionException e) {
return FutureUtil.failedFuture(e);
}
@@ -531,7 +545,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
// Notice listeners.
try {
- executor.execute(() -> {
+ eventExecutor.execute(() -> {
sessionListeners.forEach(l -> {
try {
l.accept(event);
@@ -556,8 +570,9 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
@Override
public void close() throws Exception {
- executor.shutdownNow();
- executor.awaitTermination(10, TimeUnit.SECONDS);
+ MoreExecutors.shutdownAndAwaitTermination(serDesExecutor, 10,
TimeUnit.SECONDS);
+ MoreExecutors.shutdownAndAwaitTermination(schedulerExecutor, 10,
TimeUnit.SECONDS);
+ MoreExecutors.shutdownAndAwaitTermination(eventExecutor, 10,
TimeUnit.SECONDS);
this.metadataStoreStats.close();
}
@@ -574,30 +589,30 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
}
- /**
- * Run the task in the executor thread and fail the future if the executor
is shutting down.
- */
- @VisibleForTesting
- public void execute(Runnable task, CompletableFuture<?> future) {
+ protected final <T> void processEvent(Consumer<T> eventProcessor, T event)
{
try {
- executor.execute(task);
- } catch (Throwable t) {
- future.completeExceptionally(t);
+ eventExecutor.execute(() -> eventProcessor.accept(event));
+ } catch (RejectedExecutionException e) {
+ log.warn("Rejected processing event {}", event);
}
}
- /**
- * Run the task in the executor thread and fail the future if the executor
is shutting down.
- */
- @VisibleForTesting
- public void execute(Runnable task, Supplier<List<CompletableFuture<?>>>
futures) {
+ protected final void scheduleDelayedTask(long delay, TimeUnit unit,
Runnable task) {
+ schedulerExecutor.schedule(task, delay, unit);
+ }
+
+ protected final void safeExecuteCallback(Runnable task,
Consumer<Throwable> exceptionHandler) {
try {
- executor.execute(task);
- } catch (final Throwable t) {
- futures.get().forEach(f -> f.completeExceptionally(t));
+ eventExecutor.execute(task);
+ } catch (Throwable t) {
+ exceptionHandler.accept(t);
}
}
+ protected final void safeExecuteCallback(Runnable task,
CompletableFuture<?> future) {
+ safeExecuteCallback(task, future::completeExceptionally);
+ }
+
protected static String parent(String path) {
int idx = path.lastIndexOf('/');
if (idx <= 0) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
index 3937fd712dc..e1311fccfe0 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
@@ -188,7 +188,7 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
@Override
protected CompletableFuture<Boolean> existsFromStore(String path) {
return kv.get(ByteSequence.from(path, StandardCharsets.UTF_8),
EXISTS_GET_OPTION)
- .thenApplyAsync(gr -> gr.getCount() == 1, executor);
+ .thenApply(gr -> gr.getCount() == 1);
}
@Override
@@ -204,9 +204,8 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
}
return super.storePut(parent, new byte[0], Optional.empty(),
EnumSet.noneOf(CreateOption.class))
// Then create the unique key with the version added in
the path
- .thenComposeAsync(
- stat -> super.storePut(path + stat.getVersion(),
data, optExpectedVersion, options),
- executor);
+ .thenCompose(
+ stat -> super.storePut(path + stat.getVersion(),
data, optExpectedVersion, options));
}
}
@@ -313,9 +312,7 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
}
} else {
log.warn("Failed to commit: {}", cause.getMessage());
- executor.execute(() -> {
- ops.forEach(o ->
o.getFuture().completeExceptionally(ex));
- });
+ ops.forEach(o -> o.getFuture().completeExceptionally(ex));
}
return null;
});
@@ -326,7 +323,7 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
private void handleBatchOperationResult(TxnResponse txnResponse,
List<MetadataOp> ops) {
- executor.execute(() -> {
+ safeExecuteCallbacks(() -> {
if (!txnResponse.isSucceeded()) {
if (ops.size() > 1) {
// Retry individually
@@ -404,7 +401,7 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
}
}
}
- });
+ }, ops);
}
private synchronized CompletableFuture<Void> createLease(boolean
retryOnFailure) {
@@ -444,9 +441,7 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
if (retryOnFailure) {
future.exceptionally(ex -> {
log.warn("Failed to create Etcd lease. Retrying later", ex);
- executor.schedule(() -> {
- createLease(true);
- }, 1, TimeUnit.SECONDS);
+ scheduleDelayedTask(1, TimeUnit.SECONDS, () ->
createLease(true));
return null;
});
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 079cb3130e0..627304b2edc 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -79,7 +79,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig)
throws MetadataStoreException {
super(metadataStoreConfig.getMetadataStoreName(),
metadataStoreConfig.getOpenTelemetry(),
- metadataStoreConfig.getNodeSizeStats());
+ metadataStoreConfig.getNodeSizeStats(),
metadataStoreConfig.getNumSerDesThreads());
String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
// Local means a private data set
// update synchronizer and register sync listener
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 74bddda7454..08e5478ffcc 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -210,7 +210,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig)
throws MetadataStoreException {
super(metadataStoreConfig.getMetadataStoreName(),
metadataStoreConfig.getOpenTelemetry(),
- metadataStoreConfig.getNodeSizeStats());
+ metadataStoreConfig.getNodeSizeStats(),
metadataStoreConfig.getNumSerDesThreads());
this.metadataUrl = metadataURL;
try {
RocksDB.loadLibrary();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 5bf7e2272f0..f56d6c6941f 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -119,7 +119,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
private void processSessionWatcher(WatchedEvent event) {
if (sessionWatcher != null) {
- executor.execute(() -> sessionWatcher.process(event));
+ processEvent(sessionWatcher::process, event);
}
}
@@ -245,9 +245,8 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
countsByType, totalSize, opsForLog);
// Retry with the individual operations
- executor.schedule(() -> {
- ops.forEach(o ->
batchOperation(Collections.singletonList(o)));
- }, 100, TimeUnit.MILLISECONDS);
+ scheduleDelayedTask(100, TimeUnit.MILLISECONDS,
+ () -> ops.forEach(o ->
batchOperation(Collections.singletonList(o))));
} else {
MetadataStoreException e = getException(code, path);
ops.forEach(o ->
o.getFuture().completeExceptionally(e));
@@ -256,7 +255,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
}
// Trigger all the futures in the batch
- execute(() -> {
+ safeExecuteCallbacks(() -> {
for (int i = 0; i < ops.size(); i++) {
OpResult opr = results.get(i);
MetadataOp op = ops.get(i);
@@ -278,7 +277,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
"Operation type not supported in
multi: " + op.getType()));
}
}
- }, () ->
ops.stream().map(MetadataOp::getFuture).collect(Collectors.toList()));
+ }, ops);
}, null);
} catch (Throwable t) {
ops.forEach(o -> o.getFuture().completeExceptionally(new
MetadataStoreException(t)));
@@ -395,7 +394,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
try {
zkc.exists(path, null, (rc, path1, ctx, stat) -> {
- execute(() -> {
+ safeExecuteCallback(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(true);
@@ -421,7 +420,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
try {
zkc.delete(op.getPath(), expectedVersion, (rc, path1, ctx) -> {
- execute(() -> {
+ safeExecuteCallback(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(null);
@@ -446,7 +445,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
CreateMode createMode = getCreateMode(opPut.getOptions());
asyncCreateFullPathOptimistic(zkc, opPut.getPath(),
opPut.getData(), createMode,
(rc, path1, ctx, name) -> {
- execute(() -> {
+ safeExecuteCallback(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(new Stat(name, 0, 0, 0,
createMode.isEphemeral(), true));
@@ -460,7 +459,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
});
} else {
zkc.setData(opPut.getPath(), opPut.getData(), expectedVersion,
(rc, path1, ctx, stat) -> {
- execute(() -> {
+ safeExecuteCallback(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(getStat(path1, stat));
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index a9319a50fec..30989a41bd1 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -18,16 +18,20 @@
*/
package org.apache.pulsar.metadata.impl.batching;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -38,6 +42,7 @@ import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscUnboundedArrayQueue;
+import org.jspecify.annotations.Nullable;
@Slf4j
public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore {
@@ -46,8 +51,6 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
private final MessagePassingQueue<MetadataOp> readOps;
private final MessagePassingQueue<MetadataOp> writeOps;
- private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
-
private final boolean enabled;
private final int maxDelayMillis;
protected final int maxOperations;
@@ -55,9 +58,12 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
private MetadataEventSynchronizer synchronizer;
private final BatchMetadataStoreStats batchMetadataStoreStats;
protected MetadataStoreBatchStrategy metadataStoreBatchStrategy;
+ @Nullable
+ private final ScheduledExecutorService flushExecutor;
protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
- super(conf.getMetadataStoreName(), conf.getOpenTelemetry(),
conf.getNodeSizeStats());
+ super(conf.getMetadataStoreName(), conf.getOpenTelemetry(),
conf.getNodeSizeStats(),
+ conf.getNumSerDesThreads());
this.enabled = conf.isBatchingEnabled();
this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
@@ -67,18 +73,22 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
if (enabled) {
readOps = new MpscUnboundedArrayQueue<>(10_000);
writeOps = new MpscUnboundedArrayQueue<>(10_000);
- scheduledTask =
- executor.scheduleAtFixedRate(this::flush, maxDelayMillis,
maxDelayMillis, TimeUnit.MILLISECONDS);
+ final var name =
StringUtils.isNotBlank(conf.getMetadataStoreName()) ?
conf.getMetadataStoreName()
+ : getClass().getSimpleName();
+ flushExecutor = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory(
+ name + "-batch-flusher"));
+ scheduledTask = flushExecutor.scheduleAtFixedRate(this::flush,
maxDelayMillis, maxDelayMillis,
+ TimeUnit.MILLISECONDS);
} else {
scheduledTask = null;
readOps = null;
writeOps = null;
+ flushExecutor = null;
}
// update synchronizer and register sync listener
updateMetadataEventSynchronizer(conf.getSynchronizer());
- this.batchMetadataStoreStats =
- new BatchMetadataStoreStats(metadataStoreName, executor,
conf.getOpenTelemetry());
+ this.batchMetadataStoreStats = new
BatchMetadataStoreStats(metadataStoreName);
this.metadataStoreBatchStrategy = new
DefaultMetadataStoreBatchStrategy(maxOperations, maxSize);
}
@@ -96,12 +106,13 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
op.getFuture().completeExceptionally(ex);
}
scheduledTask.cancel(true);
+ MoreExecutors.shutdownAndAwaitTermination(flushExecutor, 10,
TimeUnit.SECONDS);
}
super.close();
this.batchMetadataStoreStats.close();
}
- private void flush() {
+ private synchronized void flush() {
List<MetadataOp> currentBatch;
if (!readOps.isEmpty()) {
while (CollectionUtils.isNotEmpty(currentBatch =
metadataStoreBatchStrategy.nextBatch(readOps))) {
@@ -113,8 +124,6 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
internalBatchOperation(currentBatch);
}
}
-
- flushInProgress.set(false);
}
@Override
@@ -169,8 +178,8 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
internalBatchOperation(Collections.singletonList(op));
return;
}
- if (queue.size() > maxOperations &&
flushInProgress.compareAndSet(false, true)) {
- executor.execute(this::flush);
+ if (queue.size() > maxOperations) {
+ flush();
}
} else {
internalBatchOperation(Collections.singletonList(op));
@@ -194,4 +203,8 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
}
protected abstract void batchOperation(List<MetadataOp> ops);
+
+ protected final void safeExecuteCallbacks(Runnable runnable,
List<MetadataOp> ops) {
+ safeExecuteCallback(runnable, t -> ops.forEach(op ->
op.getFuture().completeExceptionally(t)));
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index d055dd7da55..407a927bda4 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -61,7 +61,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
private Optional<MetadataEventSynchronizer> synchronizer;
public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) {
- super("oxia-metadata", OpenTelemetry.noop(), null);
+ super("oxia-metadata", OpenTelemetry.noop(), null, 1);
this.client = oxia;
this.identity = identity;
this.synchronizer = Optional.empty();
@@ -75,7 +75,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
boolean enableSessionWatcher)
throws Exception {
super("oxia-metadata",
Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry(),
- metadataStoreConfig.getNodeSizeStats());
+ metadataStoreConfig.getNodeSizeStats(),
metadataStoreConfig.getNumSerDesThreads());
var linger = metadataStoreConfig.getBatchingMaxDelayMillis();
if (!metadataStoreConfig.isBatchingEnabled()) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
index 9549a8df8f9..82cc15d8aaf 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
@@ -18,23 +18,13 @@
*/
package org.apache.pulsar.metadata.impl.stats;
-import io.opentelemetry.api.OpenTelemetry;
-import io.opentelemetry.api.common.Attributes;
-import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
-import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
public final class BatchMetadataStoreStats implements AutoCloseable {
private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50,
100, 200, 500, 1000};
private static final String NAME = "name";
- private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge
- .build("pulsar_batch_metadata_store_executor_queue_size", "-")
- .labelNames(NAME)
- .register();
private static final Histogram OPS_WAITING = Histogram
.build("pulsar_batch_metadata_store_queue_wait_time", "-")
.unit("ms")
@@ -54,46 +44,17 @@ public final class BatchMetadataStoreStats implements
AutoCloseable {
.register();
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final ThreadPoolExecutor executor;
private final String metadataStoreName;
private final Histogram.Child batchOpsWaitingChild;
private final Histogram.Child batchExecuteTimeChild;
private final Histogram.Child opsPerBatchChild;
- public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME =
"pulsar.broker.metadata.store.executor.queue.size";
- private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter;
-
- public BatchMetadataStoreStats(String metadataStoreName, ExecutorService
executor, OpenTelemetry openTelemetry) {
- if (executor instanceof ThreadPoolExecutor tx) {
- this.executor = tx;
- } else {
- this.executor = null;
- }
+ public BatchMetadataStoreStats(String metadataStoreName) {
this.metadataStoreName = metadataStoreName;
-
- EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() {
- @Override
- public double get() {
- return getQueueSize();
- }
- }, metadataStoreName);
-
this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName);
this.batchExecuteTimeChild =
BATCH_EXECUTE_TIME.labels(metadataStoreName);
this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName);
-
- var meter = openTelemetry.getMeter("org.apache.pulsar");
- var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME,
metadataStoreName);
- this.batchMetadataStoreSizeCounter = meter
- .upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME)
- .setDescription("The number of batch operations in the
metadata store executor queue")
- .setUnit("{operation}")
- .buildWithCallback(measurement ->
measurement.record(getQueueSize(), attributes));
- }
-
- private int getQueueSize() {
- return executor == null ? 0 : executor.getQueue().size();
}
public void recordOpWaiting(long millis) {
@@ -111,11 +72,9 @@ public final class BatchMetadataStoreStats implements
AutoCloseable {
@Override
public void close() throws Exception {
if (closed.compareAndSet(false, true)) {
- EXECUTOR_QUEUE_SIZE.remove(this.metadataStoreName);
OPS_WAITING.remove(this.metadataStoreName);
BATCH_EXECUTE_TIME.remove(this.metadataStoreName);
OPS_PER_BATCH.remove(metadataStoreName);
- batchMetadataStoreSizeCounter.close();
}
}
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index d42b2228346..0ae0b022a35 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -96,7 +96,7 @@ public class MetadataStoreFactoryImplTest {
public static class MyMetadataStore extends AbstractMetadataStore {
protected MyMetadataStore() {
- super("custom", OpenTelemetry.noop(), null);
+ super("custom", OpenTelemetry.noop(), null, 1);
}
@Override