This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5a44e024abe8fa8d267528b425a4d479772bd21a Author: Lari Hotari <[email protected]> AuthorDate: Mon Feb 16 12:12:49 2026 +0200 Reapply "[improve][meta] PIP-453: Improve the metadata store threading model (#25187)" This reverts commit 56cd23c301dd1a9ae6249c51439da5e142037185. --- conf/broker.conf | 2 + conf/standalone.conf | 3 + pip/pip-453.md | 82 +++++++++ .../IsolatedBookieEnsemblePlacementPolicy.java | 3 + .../apache/pulsar/broker/ServiceConfiguration.java | 6 + .../IsolatedBookieEnsemblePlacementPolicyTest.java | 34 ++-- .../org/apache/pulsar/broker/PulsarService.java | 2 + .../apache/pulsar/broker/PulsarServiceTest.java | 65 ++++++++ .../stats/OpenTelemetryMetadataStoreStatsTest.java | 12 -- .../pulsar/metadata/api/MetadataStoreConfig.java | 3 + .../metadata/cache/impl/MetadataCacheImpl.java | 185 ++++++++++++--------- .../metadata/impl/AbstractMetadataStore.java | 77 +++++---- .../pulsar/metadata/impl/EtcdMetadataStore.java | 19 +-- .../metadata/impl/LocalMemoryMetadataStore.java | 2 +- .../pulsar/metadata/impl/RocksdbMetadataStore.java | 2 +- .../pulsar/metadata/impl/ZKMetadataStore.java | 19 +-- .../batching/AbstractBatchedMetadataStore.java | 39 +++-- .../metadata/impl/oxia/OxiaMetadataStore.java | 4 +- .../impl/stats/BatchMetadataStoreStats.java | 43 +---- .../impl/MetadataStoreFactoryImplTest.java | 2 +- 20 files changed, 393 insertions(+), 211 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 8b6ec12d7f2..b2e74569f14 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -954,6 +954,8 @@ metadataStoreBatchingMaxOperations=1000 # Maximum size of a batch metadataStoreBatchingMaxSizeKb=128 +# The number of threads used for serializing and deserializing data to and from the metadata store +metadataStoreSerDesThreads=1 ### --- Authentication --- ### diff --git a/conf/standalone.conf b/conf/standalone.conf index e5adf9a9637..571cc0fbbe8 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -430,6 +430,9 @@ metadataStoreBatchingMaxOperations=1000 # Maximum size of a batch metadataStoreBatchingMaxSizeKb=128 +# The number of threads used for serializing and deserializing data to and from the metadata store +metadataStoreSerDesThreads=1 + ### --- TLS --- ### # Deprecated - Use webServicePortTls and brokerServicePortTls instead tlsEnabled=false diff --git a/pip/pip-453.md b/pip/pip-453.md new file mode 100644 index 00000000000..f9109798ba7 --- /dev/null +++ b/pip/pip-453.md @@ -0,0 +1,82 @@ +# PIP-453: Improve the metadata store threading model + +# Background knowledge + +The `pulsar-metadata` module provides two abstractions for interacting with metadata stores: +- `MetadataStore`: the wrapper on the actual underlying metadata store (e.g. ZooKeeper), which has caches for value and children of a given key. +- `MetadataCache<T>`: a typed cache layer on top of `MetadataStore`, which performs serialization and deserialization of data between `T` and `byte[]`. + +The `MetadataStore` instance is unique in each broker, and is shared by multiple `MetadataCache<T>` instances. + +However, a single thread whose name starts with the metadata store name (e.g. `ZK-MetadataStore`) is used by implementations of them. This thread is used in the following tasks: +1. Executing callbacks of APIs like `put`. +2. Executing notification handlers, including `AbstractMetadataStore#accept`, which calls `accept` methods of all `MetadataCache` instances and all listeners registered by `MetadataStore#registerListener`. +3. For ZooKeeper and Etcd, which support batching requests, it's used to schedule flushing tasks at a fixed rate, which is determined by the `metadataStoreBatchingMaxDelayMillis` config (default: 5 ms). +4. Scheduling some other tasks, e.g. retrying failed operations. + +It should be noted that `MetadataCache` executes the compute sensitive tasks like serialization in the `MetadataStore` callback. When the number of metadata operations grows, this thread is easy to be overwhelmed. It also affects the topic loading, which involves many metadata operations, this thread can be overwhelmed and block other tasks. For example, in a production environment, it's observed that the `pulsar_batch_metadata_store_queue_wait_time` metric is high (100 ms), which should [...] + +# Motivation + +The single thread model is inefficient when there are many metadata operations. For example, when a broker is down and the topics owned by this broker will be transferred to the new owner broker. Since the new owner broker might never owned them before, even the `MetadataCache` caches are cold, which results in many metadata operations. However, the CPU-bound tasks like serialization and deserialization are executed in the `MetadataStore` thread, which makes it easy to be overwhelmed. Th [...] + +In a production environment, there is a case when the metadata operation rate increased suddenly, the `pulsar_batch_metadata_store_queue_wait_time_ms_bucket` metric increased to ~100 ms, which is a part of the total latency of a metadata operation. As a result, the total P99 get latency (`pulsar_metadata_store_ops_latency_ms_bucket{type="get"}`) increased to 2 seconds. + +The 3rd task in the previous section is scheduled via `scheduleAtFixedRate`, which means if the task is not executed in time (5 ms by default), the task will be executed immediately again in a short time, which also burdens the single metadata store thread. + +# Goals + +## In Scope + +Improve the existing thread model to handle various tasks on metadata store, which could avoid a single thread being overwhelmed when there are many metadata operations. + +## Out of Scope + +Actually the batching mechanism introduced by [#13043](https://github.com/apache/pulsar/pull/13043) is harmful. The `flush` method, which is responsible to send a batch of metadata operations to broker, is called in the metadata store thread rather than the caller thread. The trade-off of the higher throughput is the lower latency. The benefit is limited because in most time the metadata operation rate is not so high. See this [test report](https://github.com/BewareMyPower/zookeeper-benc [...] + +This proposal doesn't intend to change the existing batching mechanism or disable it by default. It only improves the threading model to avoid the single thread being overwhelmed. + +Additionally, some code paths execute the compute intensive tasks in the metadata store thread directly (e.g. `store.get(path).thenApply(/* ... */)`), this proposal does not aim at changing them to asynchronous methods (e.g. `thenApplyAsync`). + +# High Level Design + +Create 4 sets of threads: +- `<name>-event`: the original metadata store thread, which is now only responsible to handle notifications. This executor won't be a `ScheduledExecutorService` anymore. +- `<name>-scheduler`: a single thread, which is used to schedule tasks like flushing and retrying failed operations. +- `<name>-batch-flusher`: a single thread, which is used to schedule the flushing task at a fixed rate. It won't be created if `metadataStoreBatchingEnabled` is false. +- `<name>-worker`: a fixed thread pool shared by all `MetadataCache` instances to execute compute intensive tasks like serialization and deserialization. The same path will be handled by the same thread to keep the processing order on the same path. + +Regarding the callbacks, don't switch to a different thread. This change is not breaking because the underlying metadata store usually executes the callback in a single thread (e.g. `<name>-EventThread` in ZooKeeper) like the single thread in the current implementation. The caller should be responsible to manage worker threads on the metadata operation result if the callback is compute intensive. + +The only concern is that introducing a new thread to execute callbacks allows waiting for the future of metadata store APIs in the callback. After this change, the following use case could be a dead lock: + +```java +metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());; +``` + +# Detailed Design + +## Public-facing Changes + +### Configuration + +Add a configurations to specify the number of worker threads for `MetadataCache`: + +```java + @FieldContext( + category = CATEGORY_SERVER, + doc = "The number of threads uses for serializing and deserializing data to and from the metadata store" + ) + private int metadataStoreSerDesThreads = 1; +``` + +Use 1 as the default value since the serialization and deserialization tasks are not frequent. This separated thread pool is mainly added to avoid blocking the metadata store callback thread. + +### Metrics + +The `pulsar_batch_metadata_store_executor_queue_size` metric will be removed because the `<name>-batch-flusher` thread won't execute other tasks except for flushing. + +# Links + +* Mailing List discussion thread: https://lists.apache.org/thread/0cfdyvj96gw1sp1mo2zghl0lmsms5w1d +* Mailing List voting thread: https://lists.apache.org/thread/cktj2k8myw076yggn63k8yxs5357yd61 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index 878bbc4d654..4ef1c594be4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy; @@ -57,6 +58,8 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac // the secondary group. private ImmutablePair<Set<String>, Set<String>> defaultIsolationGroups; + @Getter + @VisibleForTesting private MetadataCache<BookiesRackConfiguration> bookieMappingCache; private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*"; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 44b62709fab..93670bb2377 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -498,6 +498,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private boolean metadataStoreAllowReadOnlyOperations; + @FieldContext( + category = CATEGORY_SERVER, + doc = "The number of threads used for serializing and deserializing data to and from the metadata store" + ) + private int metadataStoreSerDesThreads = 1; + @Deprecated @FieldContext( category = CATEGORY_SERVER, diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java index 68f92ab416d..936b04386ff 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java @@ -22,12 +22,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import io.netty.util.HashedWheelTimer; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -288,8 +291,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest { secondaryBookieGroup.put(BOOKIE4, BookieInfo.builder().rack("rack0").build()); bookieMapping.put("group2", secondaryBookieGroup); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), null).getResult(); @@ -340,8 +342,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest { + "\": {\"rack\": \"rack0\", \"hostname\": \"bookie3.example.com\"}, \"" + BOOKIE4 + "\": {\"rack\": \"rack2\", \"hostname\": \"bookie4.example.com\"}}}"; - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(StandardCharsets.UTF_8), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, data.getBytes(StandardCharsets.UTF_8)); List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>()).getResult(); @@ -399,8 +400,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest { bookieMapping.put("group1", mainBookieGroup); bookieMapping.put("group2", secondaryBookieGroup); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(), new HashSet<>()).getResult(); @@ -784,8 +784,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest { bookieMapping.put(isolationGroup2, group2); bookieMapping.put(isolationGroup3, group3); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3)); groups.setRight(Sets.newHashSet("")); @@ -808,8 +807,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest { bookieMapping.put(isolationGroup1, group1); bookieMapping.put(isolationGroup2, group2); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); groups.setLeft(Sets.newHashSet(isolationGroup1)); groups.setRight(Sets.newHashSet(isolationGroup2)); @@ -831,12 +829,24 @@ public class IsolatedBookieEnsemblePlacementPolicyTest { bookieMapping.put(isolationGroup1, group1); bookieMapping.put(isolationGroup2, group2); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); groups.setLeft(Sets.newHashSet(isolationGroup1)); groups.setRight(Sets.newHashSet(isolationGroup2)); blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); assertTrue(blacklist.isEmpty()); } + + // The policy gets the bookie info asynchronously before each query or update, when putting the bookie info into + // the metadata store, the cache needs some time to receive the notification and update accordingly. + private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy isolationPolicy, byte[] bookieInfo) { + final var cache = isolationPolicy.getBookieMappingCache(); + assertNotNull(cache); // the policy must have been initialized + + final var key = BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH; + final var previousBookieInfo = cache.getIfCached(key); + store.put(key, bookieInfo, Optional.empty()).join(); + Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> + assertNotEquals(cache.getIfCached(key), previousBookieInfo)); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index e61fbfac566..eaebd7fa58c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -434,6 +434,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { .synchronizer(synchronizer) .openTelemetry(openTelemetry) .nodeSizeStats(new DefaultMetadataNodeSizeStats()) + .numSerDesThreads(config.getMetadataStoreSerDesThreads()) .build()); } @@ -1305,6 +1306,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { .metadataStoreName(MetadataStoreConfig.METADATA_STORE) .openTelemetry(openTelemetry) .nodeSizeStats(new DefaultMetadataNodeSizeStats()) + .numSerDesThreads(config.getMetadataStoreSerDesThreads()) .build()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index 6c04889d8f1..6195e9cdae5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -25,10 +25,15 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertSame; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -38,6 +43,10 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.metadata.api.MetadataCacheConfig; +import org.apache.pulsar.metadata.api.MetadataSerde; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.Stat; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; @@ -339,4 +348,60 @@ public class PulsarServiceTest extends MockedPulsarServiceBaseTest { assertTrue(e instanceof PulsarClientException.TimeoutException); } } + + @Test + public void testMetadataSerDesThreads() throws Exception { + final var numSerDesThreads = 5; + final var config = new ServiceConfiguration(); + config.setMetadataStoreSerDesThreads(numSerDesThreads); + config.setClusterName("test"); + config.setMetadataStoreUrl("memory:local"); + config.setConfigurationMetadataStoreUrl("memory:local"); + + @Cleanup final var pulsar = new PulsarService(config); + pulsar.start(); + + BiConsumer<MetadataStore, String> verifier = (store, prefix) -> { + final var serDes = new CustomMetadataSerDes(); + final var cache = store.getMetadataCache(prefix, serDes, MetadataCacheConfig.builder().build()); + for (int i = 0; i < 100 && serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) { + cache.create(prefix + i, "value-" + i).join(); + final var value = cache.get(prefix + i).join(); + assertEquals(value.orElseThrow(), "value-" + i); + final var newValue = cache.readModifyUpdate(prefix + i, s -> s + "-updated").join(); + assertEquals(newValue, "value-" + i + "-updated"); + // Verify the serialization and deserialization are handled by the same thread + assertEquals(serDes.threadNameToSerializedPaths, serDes.threadNameToDeserializedPaths); + } + log.info("SerDes thread mapping: {}", serDes.threadNameToSerializedPaths); + assertEquals(serDes.threadNameToSerializedPaths.keySet().size(), numSerDesThreads); + // Verify a path cannot be handled by multiple threads + final var paths = serDes.threadNameToSerializedPaths.values().stream() + .flatMap(Set::stream).sorted().toList(); + assertEquals(paths.stream().distinct().toList(), paths); + }; + + verifier.accept(pulsar.getLocalMetadataStore(), "/test-local/"); + verifier.accept(pulsar.getConfigurationMetadataStore(), "/test-config/"); + } + + private static class CustomMetadataSerDes implements MetadataSerde<String> { + + final Map<String, Set<String>> threadNameToSerializedPaths = new ConcurrentHashMap<>(); + final Map<String, Set<String>> threadNameToDeserializedPaths = new ConcurrentHashMap<>(); + + @Override + public byte[] serialize(String path, String value) throws IOException{ + threadNameToSerializedPaths.computeIfAbsent(Thread.currentThread().getName(), + __ -> ConcurrentHashMap.newKeySet()).add(path); + return value.getBytes(); + } + + @Override + public String deserialize(String path, byte[] data, Stat stat) throws IOException { + threadNameToDeserializedPaths.computeIfAbsent(Thread.currentThread().getName(), + __ -> ConcurrentHashMap.newKeySet()).add(path); + return new String(data); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java index 9e8bde20b88..390aa1e49e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java @@ -21,7 +21,6 @@ package org.apache.pulsar.broker.stats; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.Attributes; -import java.util.concurrent.ExecutorService; import lombok.Cleanup; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.BrokerTestUtil; @@ -29,7 +28,6 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.metadata.api.MetadataStore; -import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats; import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -53,14 +51,6 @@ public class OpenTelemetryMetadataStoreStatsTest extends BrokerTestBase { var newStats = new MetadataStoreStats( localMetadataStoreName, pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry()); FieldUtils.writeField(localMetadataStore, "metadataStoreStats", newStats, true); - - var currentBatchedStats = (BatchMetadataStoreStats) FieldUtils.readField(localMetadataStore, - "batchMetadataStoreStats", true); - currentBatchedStats.close(); - var currentExecutor = (ExecutorService) FieldUtils.readField(currentBatchedStats, "executor", true); - var newBatchedStats = new BatchMetadataStoreStats(localMetadataStoreName, currentExecutor, - pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry()); - FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats", newBatchedStats, true); } @AfterMethod(alwaysRun = true) @@ -89,7 +79,5 @@ public class OpenTelemetryMetadataStoreStatsTest extends BrokerTestBase { var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); assertMetricLongSumValue(metrics, MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME, attributes, value -> assertThat(value).isPositive()); - assertMetricLongSumValue(metrics, BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes, - value -> assertThat(value).isPositive()); } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java index ef50dc87691..fcde0dce840 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java @@ -104,4 +104,7 @@ public class MetadataStoreConfig { * The estimator to estimate the payload length of metadata node, which used to limit the batch size requested. */ private MetadataNodeSizeStats nodeSizeStats; + + @Builder.Default + private final int numSerDesThreads = 1; } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index b1f0572547c..ca165f0464e 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -39,10 +39,10 @@ import java.util.function.Supplier; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.metadata.api.CacheGetResult; -import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataCacheConfig; import org.apache.pulsar.metadata.api.MetadataSerde; @@ -62,23 +62,26 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica private final MetadataStore store; private final MetadataStoreExtended storeExtended; private final MetadataSerde<T> serde; - private final ScheduledExecutorService executor; + private final OrderedExecutor executor; + private final ScheduledExecutorService schedulerExecutor; private final MetadataCacheConfig<T> cacheConfig; private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>> objCache; public MetadataCacheImpl(String cacheName, MetadataStore store, TypeReference<T> typeRef, - MetadataCacheConfig<T> cacheConfig, ScheduledExecutorService executor) { - this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig, executor); + MetadataCacheConfig<T> cacheConfig, OrderedExecutor executor, + ScheduledExecutorService schedulerExecutor) { + this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig, executor, schedulerExecutor); } public MetadataCacheImpl(String cacheName, MetadataStore store, JavaType type, MetadataCacheConfig<T> cacheConfig, - ScheduledExecutorService executor) { - this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig, executor); + OrderedExecutor executor, ScheduledExecutorService schedulerExecutor) { + this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig, executor, schedulerExecutor); } public MetadataCacheImpl(String cacheName, MetadataStore store, MetadataSerde<T> serde, - MetadataCacheConfig<T> cacheConfig, ScheduledExecutorService executor) { + MetadataCacheConfig<T> cacheConfig, OrderedExecutor executor, + ScheduledExecutorService schedulerExecutor) { this.store = store; if (store instanceof MetadataStoreExtended) { this.storeExtended = (MetadataStoreExtended) store; @@ -88,6 +91,7 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica this.serde = serde; this.cacheConfig = cacheConfig; this.executor = executor; + this.schedulerExecutor = schedulerExecutor; Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder(); if (cacheConfig.getRefreshAfterWriteMillis() > 0) { @@ -101,6 +105,9 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica .buildAsync(new AsyncCacheLoader<String, Optional<CacheGetResult<T>>>() { @Override public CompletableFuture<Optional<CacheGetResult<T>>> asyncLoad(String key, Executor executor) { + if (log.isDebugEnabled()) { + log.debug("Loading key {} into metadata cache {}", key, cacheName); + } return readValueFromStore(key); } @@ -110,12 +117,16 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica Optional<CacheGetResult<T>> oldValue, Executor executor) { if (store instanceof AbstractMetadataStore && ((AbstractMetadataStore) store).isConnected()) { - return readValueFromStore(key).thenApply(val -> { + if (log.isDebugEnabled()) { + log.debug("Reloading key {} into metadata cache {}", key, cacheName); + } + final var future = readValueFromStore(key); + future.thenAccept(val -> { if (cacheConfig.getAsyncReloadConsumer() != null) { cacheConfig.getAsyncReloadConsumer().accept(key, val); } - return val; }); + return future; } else { // Do not try to refresh the cache item if we know that we're not connected to the // metadata store @@ -128,22 +139,46 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica } private CompletableFuture<Optional<CacheGetResult<T>>> readValueFromStore(String path) { - return store.get(path) - .thenCompose(optRes -> { - if (!optRes.isPresent()) { - return FutureUtils.value(Optional.empty()); - } - - try { - GetResult res = optRes.get(); - T obj = serde.deserialize(path, res.getValue(), res.getStat()); - return FutureUtils - .value(Optional.of(new CacheGetResult<>(obj, res.getStat()))); - } catch (Throwable t) { - return FutureUtils.exception(new ContentDeserializationException( - "Failed to deserialize payload for key '" + path + "'", t)); - } - }); + final var future = new CompletableFuture<Optional<CacheGetResult<T>>>(); + store.get(path).thenComposeAsync(optRes -> { + // There could be multiple pending reads for the same path, for example, when a path is created, + // 1. The `accept` method will call `refresh` + // 2. The `put` method will call `refresh` after the metadata store put operation is done + // Both will call this method and the same result will be read. In this case, we only need to deserialize + // the value once. + if (!optRes.isPresent()) { + if (log.isDebugEnabled()) { + log.debug("Key {} not found in metadata store", path); + } + return FutureUtils.value(Optional.<CacheGetResult<T>>empty()); + } + final var res = optRes.get(); + final var cachedFuture = objCache.getIfPresent(path); + if (cachedFuture != null && cachedFuture != future) { + if (log.isDebugEnabled()) { + log.debug("A new read on key {} is in progress or completed, ignore this one", path); + } + return cachedFuture; + } + try { + T obj = serde.deserialize(path, res.getValue(), res.getStat()); + if (log.isDebugEnabled()) { + log.debug("Deserialized value for key {} (version: {}): {}", path, res.getStat().getVersion(), + obj); + } + return FutureUtils.value(Optional.of(new CacheGetResult<>(obj, res.getStat()))); + } catch (Throwable t) { + return FutureUtils.exception(new ContentDeserializationException( + "Failed to deserialize payload for key '" + path + "'", t)); + } + }, executor.chooseThread(path)).whenComplete((result, e) -> { + if (e != null) { + future.completeExceptionally(e.getCause()); + } else { + future.complete(result); + } + }); + return future; } @Override @@ -169,8 +204,9 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica @Override public CompletableFuture<T> readModifyUpdateOrCreate(String path, Function<Optional<T>, T> modifyFunction) { + final var executor = this.executor.chooseThread(path); return executeWithRetry(() -> objCache.get(path) - .thenCompose(optEntry -> { + .thenComposeAsync(optEntry -> { Optional<T> currentValue; long expectedVersion; @@ -202,13 +238,14 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica return store.put(path, newValue, Optional.of(expectedVersion)).thenAccept(__ -> { refresh(path); }).thenApply(__ -> newValueObj); - }), path); + }, executor), path); } @Override public CompletableFuture<T> readModifyUpdate(String path, Function<T, T> modifyFunction) { + final var executor = this.executor.chooseThread(path); return executeWithRetry(() -> objCache.get(path) - .thenCompose(optEntry -> { + .thenComposeAsync(optEntry -> { if (!optEntry.isPresent()) { return FutureUtils.exception(new NotFoundException("")); } @@ -231,59 +268,57 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica return store.put(path, newValue, Optional.of(expectedVersion)).thenAccept(__ -> { refresh(path); }).thenApply(__ -> newValueObj); - }), path); + }, executor), path); + } + + private CompletableFuture<byte[]> serialize(String path, T value) { + final var future = new CompletableFuture<byte[]>(); + executor.executeOrdered(path, () -> { + try { + future.complete(serde.serialize(path, value)); + } catch (Throwable t) { + future.completeExceptionally(t); + } + }); + return future; } @Override public CompletableFuture<Void> create(String path, T value) { - byte[] content; - try { - content = serde.serialize(path, value); - } catch (Throwable t) { - return FutureUtils.exception(t); - } - - CompletableFuture<Void> future = new CompletableFuture<>(); - store.put(path, content, Optional.of(-1L)) - .thenAccept(stat -> { - // Make sure we have the value cached before the operation is completed - // In addition to caching the value, we need to add a watch on the path, - // so when/if it changes on any other node, we are notified and we can - // update the cache - objCache.get(path).whenComplete((stat2, ex) -> { - if (ex == null) { - future.complete(null); - } else { - log.error("Exception while getting path {}", path, ex); - future.completeExceptionally(ex.getCause()); - } - }); - }).exceptionally(ex -> { - if (ex.getCause() instanceof BadVersionException) { - // Use already exists exception to provide more self-explanatory error message - future.completeExceptionally(new AlreadyExistsException(ex.getCause())); - } else { - future.completeExceptionally(ex.getCause()); - } - return null; - }); - + final var future = new CompletableFuture<Void>(); + serialize(path, value).thenCompose(content -> store.put(path, content, Optional.of(-1L))) + // Make sure we have the value cached before the operation is completed + // In addition to caching the value, we need to add a watch on the path, + // so when/if it changes on any other node, we are notified and we can + // update the cache + .thenCompose(__ -> objCache.get(path)) + .whenComplete((__, ex) -> { + if (ex == null) { + future.complete(null); + } else if (ex.getCause() instanceof BadVersionException) { + // Use already exists exception to provide more self-explanatory error message + future.completeExceptionally(new AlreadyExistsException(ex.getCause())); + } else { + future.completeExceptionally(ex.getCause()); + } + }); return future; } @Override public CompletableFuture<Void> put(String path, T value, EnumSet<CreateOption> options) { - final byte[] bytes; - try { - bytes = serde.serialize(path, value); - } catch (IOException e) { - return CompletableFuture.failedFuture(e); - } - if (storeExtended != null) { - return storeExtended.put(path, bytes, Optional.empty(), options).thenAccept(__ -> refresh(path)); - } else { - return store.put(path, bytes, Optional.empty()).thenAccept(__ -> refresh(path)); - } + return serialize(path, value).thenCompose(bytes -> { + if (storeExtended != null) { + return storeExtended.put(path, bytes, Optional.empty(), options); + } else { + return store.put(path, bytes, Optional.empty()); + } + }).thenAccept(__ -> { + if (log.isDebugEnabled()) { + log.debug("Refreshing path {} after put operation", path); + } + refresh(path); + }); } @Override @@ -323,6 +358,9 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica switch (t.getType()) { case Created: case Modified: + if (log.isDebugEnabled()) { + log.debug("Refreshing path {} for {} notification", path, t.getType()); + } refresh(path); break; @@ -354,8 +392,7 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica final var next = backoff.next(); log.info("Update key {} conflicts. Retrying in {} ms. Mandatory stop: {}. Elapsed time: {} ms", key, next, backoff.isMandatoryStopMade(), elapsed); - executor.schedule(() -> execute(op, key, result, backoff), next, - TimeUnit.MILLISECONDS); + schedulerExecutor.schedule(() -> execute(op, key, result, backoff), next, TimeUnit.MILLISECONDS); return null; } result.completeExceptionally(ex.getCause()); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index b0e4b43f700..d118a792e2f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -26,6 +26,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.util.concurrent.DefaultThreadFactory; import io.opentelemetry.api.OpenTelemetry; import java.time.Instant; @@ -38,16 +39,17 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.FutureUtil; @@ -76,7 +78,9 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>(); protected final String metadataStoreName; - protected final ScheduledExecutorService executor; + private final OrderedExecutor serDesExecutor; + private final ExecutorService eventExecutor; + private final ScheduledExecutorService schedulerExecutor; private final AsyncLoadingCache<String, List<String>> childrenCache; private final AsyncLoadingCache<String, Boolean> existsCache; private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList<>(); @@ -93,13 +97,21 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co protected MetadataNodeSizeStats nodeSizeStats; - protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTelemetry, - MetadataNodeSizeStats nodeSizeStats) { + protected AbstractMetadataStore( + String metadataStoreName, OpenTelemetry openTelemetry, MetadataNodeSizeStats nodeSizeStats, + int numSerDesThreads) { this.nodeSizeStats = nodeSizeStats == null ? new DummyMetadataNodeSizeStats() : nodeSizeStats; - this.executor = new ScheduledThreadPoolExecutor(1, - new DefaultThreadFactory( - StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName())); + final var namePrefix = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName + : getClass().getSimpleName(); + this.eventExecutor = Executors.newSingleThreadExecutor( + new DefaultThreadFactory(namePrefix + "-event")); + this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory(namePrefix + "-scheduler")); + this.serDesExecutor = OrderedExecutor.newBuilder() + .numThreads(numSerDesThreads) + .name(namePrefix + "-worker") + .build(); registerListener(this); this.childrenCache = Caffeine.newBuilder() @@ -249,7 +261,8 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co JavaType typeRef = TypeFactory.defaultInstance().constructSimpleType(clazz, null); String cacheName = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName(); MetadataCacheImpl<T> metadataCache = - new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.executor); + new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.serDesExecutor, + this.schedulerExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -258,7 +271,8 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) { String cacheName = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName(); MetadataCacheImpl<T> metadataCache = - new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.executor); + new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.serDesExecutor, + this.schedulerExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -268,7 +282,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co MetadataCacheConfig cacheConfig) { MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig, - this.executor); + this.serDesExecutor, this.schedulerExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -348,7 +362,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co }); return null; - }, executor); + }, eventExecutor); } catch (RejectedExecutionException e) { return FutureUtil.failedFuture(e); } @@ -531,7 +545,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co // Notice listeners. try { - executor.execute(() -> { + eventExecutor.execute(() -> { sessionListeners.forEach(l -> { try { l.accept(event); @@ -556,8 +570,9 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co @Override public void close() throws Exception { - executor.shutdownNow(); - executor.awaitTermination(10, TimeUnit.SECONDS); + MoreExecutors.shutdownAndAwaitTermination(serDesExecutor, 10, TimeUnit.SECONDS); + MoreExecutors.shutdownAndAwaitTermination(schedulerExecutor, 10, TimeUnit.SECONDS); + MoreExecutors.shutdownAndAwaitTermination(eventExecutor, 10, TimeUnit.SECONDS); this.metadataStoreStats.close(); } @@ -574,30 +589,30 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co } } - /** - * Run the task in the executor thread and fail the future if the executor is shutting down. - */ - @VisibleForTesting - public void execute(Runnable task, CompletableFuture<?> future) { + protected final <T> void processEvent(Consumer<T> eventProcessor, T event) { try { - executor.execute(task); - } catch (Throwable t) { - future.completeExceptionally(t); + eventExecutor.execute(() -> eventProcessor.accept(event)); + } catch (RejectedExecutionException e) { + log.warn("Rejected processing event {}", event); } } - /** - * Run the task in the executor thread and fail the future if the executor is shutting down. - */ - @VisibleForTesting - public void execute(Runnable task, Supplier<List<CompletableFuture<?>>> futures) { + protected final void scheduleDelayedTask(long delay, TimeUnit unit, Runnable task) { + schedulerExecutor.schedule(task, delay, unit); + } + + protected final void safeExecuteCallback(Runnable task, Consumer<Throwable> exceptionHandler) { try { - executor.execute(task); - } catch (final Throwable t) { - futures.get().forEach(f -> f.completeExceptionally(t)); + eventExecutor.execute(task); + } catch (Throwable t) { + exceptionHandler.accept(t); } } + protected final void safeExecuteCallback(Runnable task, CompletableFuture<?> future) { + safeExecuteCallback(task, future::completeExceptionally); + } + protected static String parent(String path) { int idx = path.lastIndexOf('/'); if (idx <= 0) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java index 3937fd712dc..e1311fccfe0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java @@ -188,7 +188,7 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore { @Override protected CompletableFuture<Boolean> existsFromStore(String path) { return kv.get(ByteSequence.from(path, StandardCharsets.UTF_8), EXISTS_GET_OPTION) - .thenApplyAsync(gr -> gr.getCount() == 1, executor); + .thenApply(gr -> gr.getCount() == 1); } @Override @@ -204,9 +204,8 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore { } return super.storePut(parent, new byte[0], Optional.empty(), EnumSet.noneOf(CreateOption.class)) // Then create the unique key with the version added in the path - .thenComposeAsync( - stat -> super.storePut(path + stat.getVersion(), data, optExpectedVersion, options), - executor); + .thenCompose( + stat -> super.storePut(path + stat.getVersion(), data, optExpectedVersion, options)); } } @@ -313,9 +312,7 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore { } } else { log.warn("Failed to commit: {}", cause.getMessage()); - executor.execute(() -> { - ops.forEach(o -> o.getFuture().completeExceptionally(ex)); - }); + ops.forEach(o -> o.getFuture().completeExceptionally(ex)); } return null; }); @@ -326,7 +323,7 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore { private void handleBatchOperationResult(TxnResponse txnResponse, List<MetadataOp> ops) { - executor.execute(() -> { + safeExecuteCallbacks(() -> { if (!txnResponse.isSucceeded()) { if (ops.size() > 1) { // Retry individually @@ -404,7 +401,7 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore { } } } - }); + }, ops); } private synchronized CompletableFuture<Void> createLease(boolean retryOnFailure) { @@ -444,9 +441,7 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore { if (retryOnFailure) { future.exceptionally(ex -> { log.warn("Failed to create Etcd lease. Retrying later", ex); - executor.schedule(() -> { - createLease(true); - }, 1, TimeUnit.SECONDS); + scheduleDelayedTask(1, TimeUnit.SECONDS, () -> createLease(true)); return null; }); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index 079cb3130e0..627304b2edc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -79,7 +79,7 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry(), - metadataStoreConfig.getNodeSizeStats()); + metadataStoreConfig.getNodeSizeStats(), metadataStoreConfig.getNumSerDesThreads()); String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length()); // Local means a private data set // update synchronizer and register sync listener diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java index 74bddda7454..08e5478ffcc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java @@ -210,7 +210,7 @@ public class RocksdbMetadataStore extends AbstractMetadataStore { private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry(), - metadataStoreConfig.getNodeSizeStats()); + metadataStoreConfig.getNodeSizeStats(), metadataStoreConfig.getNumSerDesThreads()); this.metadataUrl = metadataURL; try { RocksDB.loadLibrary(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 5bf7e2272f0..f56d6c6941f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -119,7 +119,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore private void processSessionWatcher(WatchedEvent event) { if (sessionWatcher != null) { - executor.execute(() -> sessionWatcher.process(event)); + processEvent(sessionWatcher::process, event); } } @@ -245,9 +245,8 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore countsByType, totalSize, opsForLog); // Retry with the individual operations - executor.schedule(() -> { - ops.forEach(o -> batchOperation(Collections.singletonList(o))); - }, 100, TimeUnit.MILLISECONDS); + scheduleDelayedTask(100, TimeUnit.MILLISECONDS, + () -> ops.forEach(o -> batchOperation(Collections.singletonList(o)))); } else { MetadataStoreException e = getException(code, path); ops.forEach(o -> o.getFuture().completeExceptionally(e)); @@ -256,7 +255,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore } // Trigger all the futures in the batch - execute(() -> { + safeExecuteCallbacks(() -> { for (int i = 0; i < ops.size(); i++) { OpResult opr = results.get(i); MetadataOp op = ops.get(i); @@ -278,7 +277,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore "Operation type not supported in multi: " + op.getType())); } } - }, () -> ops.stream().map(MetadataOp::getFuture).collect(Collectors.toList())); + }, ops); }, null); } catch (Throwable t) { ops.forEach(o -> o.getFuture().completeExceptionally(new MetadataStoreException(t))); @@ -395,7 +394,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore try { zkc.exists(path, null, (rc, path1, ctx, stat) -> { - execute(() -> { + safeExecuteCallback(() -> { Code code = Code.get(rc); if (code == Code.OK) { future.complete(true); @@ -421,7 +420,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore try { zkc.delete(op.getPath(), expectedVersion, (rc, path1, ctx) -> { - execute(() -> { + safeExecuteCallback(() -> { Code code = Code.get(rc); if (code == Code.OK) { future.complete(null); @@ -446,7 +445,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore CreateMode createMode = getCreateMode(opPut.getOptions()); asyncCreateFullPathOptimistic(zkc, opPut.getPath(), opPut.getData(), createMode, (rc, path1, ctx, name) -> { - execute(() -> { + safeExecuteCallback(() -> { Code code = Code.get(rc); if (code == Code.OK) { future.complete(new Stat(name, 0, 0, 0, createMode.isEphemeral(), true)); @@ -460,7 +459,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore }); } else { zkc.setData(opPut.getPath(), opPut.getData(), expectedVersion, (rc, path1, ctx, stat) -> { - execute(() -> { + safeExecuteCallback(() -> { Code code = Code.get(rc); if (code == Code.OK) { future.complete(getStat(path1, stat)); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index a9319a50fec..30989a41bd1 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -18,16 +18,20 @@ */ package org.apache.pulsar.metadata.impl.batching; +import com.google.common.util.concurrent.MoreExecutors; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataEventSynchronizer; import org.apache.pulsar.metadata.api.MetadataStoreConfig; @@ -38,6 +42,7 @@ import org.apache.pulsar.metadata.impl.AbstractMetadataStore; import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscUnboundedArrayQueue; +import org.jspecify.annotations.Nullable; @Slf4j public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore { @@ -46,8 +51,6 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore private final MessagePassingQueue<MetadataOp> readOps; private final MessagePassingQueue<MetadataOp> writeOps; - private final AtomicBoolean flushInProgress = new AtomicBoolean(false); - private final boolean enabled; private final int maxDelayMillis; protected final int maxOperations; @@ -55,9 +58,12 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore private MetadataEventSynchronizer synchronizer; private final BatchMetadataStoreStats batchMetadataStoreStats; protected MetadataStoreBatchStrategy metadataStoreBatchStrategy; + @Nullable + private final ScheduledExecutorService flushExecutor; protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) { - super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), conf.getNodeSizeStats()); + super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), conf.getNodeSizeStats(), + conf.getNumSerDesThreads()); this.enabled = conf.isBatchingEnabled(); this.maxDelayMillis = conf.getBatchingMaxDelayMillis(); @@ -67,18 +73,22 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore if (enabled) { readOps = new MpscUnboundedArrayQueue<>(10_000); writeOps = new MpscUnboundedArrayQueue<>(10_000); - scheduledTask = - executor.scheduleAtFixedRate(this::flush, maxDelayMillis, maxDelayMillis, TimeUnit.MILLISECONDS); + final var name = StringUtils.isNotBlank(conf.getMetadataStoreName()) ? conf.getMetadataStoreName() + : getClass().getSimpleName(); + flushExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory( + name + "-batch-flusher")); + scheduledTask = flushExecutor.scheduleAtFixedRate(this::flush, maxDelayMillis, maxDelayMillis, + TimeUnit.MILLISECONDS); } else { scheduledTask = null; readOps = null; writeOps = null; + flushExecutor = null; } // update synchronizer and register sync listener updateMetadataEventSynchronizer(conf.getSynchronizer()); - this.batchMetadataStoreStats = - new BatchMetadataStoreStats(metadataStoreName, executor, conf.getOpenTelemetry()); + this.batchMetadataStoreStats = new BatchMetadataStoreStats(metadataStoreName); this.metadataStoreBatchStrategy = new DefaultMetadataStoreBatchStrategy(maxOperations, maxSize); } @@ -96,12 +106,13 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore op.getFuture().completeExceptionally(ex); } scheduledTask.cancel(true); + MoreExecutors.shutdownAndAwaitTermination(flushExecutor, 10, TimeUnit.SECONDS); } super.close(); this.batchMetadataStoreStats.close(); } - private void flush() { + private synchronized void flush() { List<MetadataOp> currentBatch; if (!readOps.isEmpty()) { while (CollectionUtils.isNotEmpty(currentBatch = metadataStoreBatchStrategy.nextBatch(readOps))) { @@ -113,8 +124,6 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore internalBatchOperation(currentBatch); } } - - flushInProgress.set(false); } @Override @@ -169,8 +178,8 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore internalBatchOperation(Collections.singletonList(op)); return; } - if (queue.size() > maxOperations && flushInProgress.compareAndSet(false, true)) { - executor.execute(this::flush); + if (queue.size() > maxOperations) { + flush(); } } else { internalBatchOperation(Collections.singletonList(op)); @@ -194,4 +203,8 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore } protected abstract void batchOperation(List<MetadataOp> ops); + + protected final void safeExecuteCallbacks(Runnable runnable, List<MetadataOp> ops) { + safeExecuteCallback(runnable, t -> ops.forEach(op -> op.getFuture().completeExceptionally(t))); + } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java index d055dd7da55..407a927bda4 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -61,7 +61,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { private Optional<MetadataEventSynchronizer> synchronizer; public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) { - super("oxia-metadata", OpenTelemetry.noop(), null); + super("oxia-metadata", OpenTelemetry.noop(), null, 1); this.client = oxia; this.identity = identity; this.synchronizer = Optional.empty(); @@ -75,7 +75,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { boolean enableSessionWatcher) throws Exception { super("oxia-metadata", Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry(), - metadataStoreConfig.getNodeSizeStats()); + metadataStoreConfig.getNodeSizeStats(), metadataStoreConfig.getNumSerDesThreads()); var linger = metadataStoreConfig.getBatchingMaxDelayMillis(); if (!metadataStoreConfig.isBatchingEnabled()) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java index 9549a8df8f9..82cc15d8aaf 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java @@ -18,23 +18,13 @@ */ package org.apache.pulsar.metadata.impl.stats; -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; -import io.prometheus.client.Gauge; import io.prometheus.client.Histogram; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; public final class BatchMetadataStoreStats implements AutoCloseable { private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50, 100, 200, 500, 1000}; private static final String NAME = "name"; - private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge - .build("pulsar_batch_metadata_store_executor_queue_size", "-") - .labelNames(NAME) - .register(); private static final Histogram OPS_WAITING = Histogram .build("pulsar_batch_metadata_store_queue_wait_time", "-") .unit("ms") @@ -54,46 +44,17 @@ public final class BatchMetadataStoreStats implements AutoCloseable { .register(); private final AtomicBoolean closed = new AtomicBoolean(false); - private final ThreadPoolExecutor executor; private final String metadataStoreName; private final Histogram.Child batchOpsWaitingChild; private final Histogram.Child batchExecuteTimeChild; private final Histogram.Child opsPerBatchChild; - public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME = "pulsar.broker.metadata.store.executor.queue.size"; - private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter; - - public BatchMetadataStoreStats(String metadataStoreName, ExecutorService executor, OpenTelemetry openTelemetry) { - if (executor instanceof ThreadPoolExecutor tx) { - this.executor = tx; - } else { - this.executor = null; - } + public BatchMetadataStoreStats(String metadataStoreName) { this.metadataStoreName = metadataStoreName; - - EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() { - @Override - public double get() { - return getQueueSize(); - } - }, metadataStoreName); - this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName); this.batchExecuteTimeChild = BATCH_EXECUTE_TIME.labels(metadataStoreName); this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName); - - var meter = openTelemetry.getMeter("org.apache.pulsar"); - var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, metadataStoreName); - this.batchMetadataStoreSizeCounter = meter - .upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME) - .setDescription("The number of batch operations in the metadata store executor queue") - .setUnit("{operation}") - .buildWithCallback(measurement -> measurement.record(getQueueSize(), attributes)); - } - - private int getQueueSize() { - return executor == null ? 0 : executor.getQueue().size(); } public void recordOpWaiting(long millis) { @@ -111,11 +72,9 @@ public final class BatchMetadataStoreStats implements AutoCloseable { @Override public void close() throws Exception { if (closed.compareAndSet(false, true)) { - EXECUTOR_QUEUE_SIZE.remove(this.metadataStoreName); OPS_WAITING.remove(this.metadataStoreName); BATCH_EXECUTE_TIME.remove(this.metadataStoreName); OPS_PER_BATCH.remove(metadataStoreName); - batchMetadataStoreSizeCounter.close(); } } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java index d42b2228346..0ae0b022a35 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java @@ -96,7 +96,7 @@ public class MetadataStoreFactoryImplTest { public static class MyMetadataStore extends AbstractMetadataStore { protected MyMetadataStore() { - super("custom", OpenTelemetry.noop(), null); + super("custom", OpenTelemetry.noop(), null, 1); } @Override
