This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4f9b2ca7cd7f5a3d8ae73bdc8ef9da83324e1ec6 Author: Lari Hotari <[email protected]> AuthorDate: Mon Feb 16 12:12:32 2026 +0200 Reapply "[improve][meta] PIP-453: Improve the metadata store threading model (#25187)" This reverts commit a6aab863b4a86b5dcb9be21045f1333f1c4501f2. --- conf/broker.conf | 2 + conf/standalone.conf | 3 + pip/pip-453.md | 26 +-- .../IsolatedBookieEnsemblePlacementPolicy.java | 3 + .../apache/pulsar/broker/ServiceConfiguration.java | 6 + .../IsolatedBookieEnsemblePlacementPolicyTest.java | 34 ++-- .../org/apache/pulsar/broker/PulsarService.java | 2 + .../apache/pulsar/broker/PulsarServiceTest.java | 65 ++++++++ .../stats/OpenTelemetryMetadataStoreStatsTest.java | 12 -- .../pulsar/metadata/api/MetadataStoreConfig.java | 3 + .../metadata/cache/impl/MetadataCacheImpl.java | 185 ++++++++++++--------- .../metadata/impl/AbstractMetadataStore.java | 77 +++++---- .../pulsar/metadata/impl/EtcdMetadataStore.java | 19 +-- .../metadata/impl/LocalMemoryMetadataStore.java | 2 +- .../pulsar/metadata/impl/RocksdbMetadataStore.java | 2 +- .../pulsar/metadata/impl/ZKMetadataStore.java | 19 +-- .../batching/AbstractBatchedMetadataStore.java | 39 +++-- .../metadata/impl/oxia/OxiaMetadataStore.java | 4 +- .../impl/stats/BatchMetadataStoreStats.java | 43 +---- .../impl/MetadataStoreFactoryImplTest.java | 2 +- 20 files changed, 316 insertions(+), 232 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 85c0a133c47..503a7a0e11f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -951,6 +951,8 @@ metadataStoreBatchingMaxOperations=1000 # Maximum size of a batch metadataStoreBatchingMaxSizeKb=128 +# The number of threads used for serializing and deserializing data to and from the metadata store +metadataStoreSerDesThreads=1 ### --- Authentication --- ### diff --git a/conf/standalone.conf b/conf/standalone.conf index 77e2f37ac88..f9c7ccb658f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -430,6 +430,9 @@ metadataStoreBatchingMaxOperations=1000 # Maximum size of a batch metadataStoreBatchingMaxSizeKb=128 +# The number of threads used for serializing and deserializing data to and from the metadata store +metadataStoreSerDesThreads=1 + ### --- TLS --- ### # Deprecated - Use webServicePortTls and brokerServicePortTls instead tlsEnabled=false diff --git a/pip/pip-453.md b/pip/pip-453.md index a42736b9dda..f9109798ba7 100644 --- a/pip/pip-453.md +++ b/pip/pip-453.md @@ -40,8 +40,9 @@ Additionally, some code paths execute the compute intensive tasks in the metadat # High Level Design -Create 3 set of threads: +Create 4 sets of threads: - `<name>-event`: the original metadata store thread, which is now only responsible to handle notifications. This executor won't be a `ScheduledExecutorService` anymore. +- `<name>-scheduler`: a single thread, which is used to schedule tasks like flushing and retrying failed operations. - `<name>-batch-flusher`: a single thread, which is used to schedule the flushing task at a fixed rate. It won't be created if `metadataStoreBatchingEnabled` is false. - `<name>-worker`: a fixed thread pool shared by all `MetadataCache` instances to execute compute intensive tasks like serialization and deserialization. The same path will be handled by the same thread to keep the processing order on the same path. @@ -53,25 +54,6 @@ The only concern is that introducing a new thread to execute callbacks allows wa metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());; ``` -Other tasks like the retry on failure is executed in JVM's common `ForkJoinPool` by `CompletableFuture` APIs. For example: - -```diff ---- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java -+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java -@@ -245,9 +245,8 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore - countsByType, totalSize, opsForLog); - - // Retry with the individual operations -- executor.schedule(() -> { -- ops.forEach(o -> batchOperation(Collections.singletonList(o))); -- }, 100, TimeUnit.MILLISECONDS); -+ CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS).execute(() -> -+ ops.forEach(o -> batchOperation(Collections.singletonList(o)))); - } else { - MetadataStoreException e = getException(code, path); - ops.forEach(o -> o.getFuture().completeExceptionally(e)); -``` - # Detailed Design ## Public-facing Changes @@ -85,9 +67,11 @@ Add a configurations to specify the number of worker threads for `MetadataCache` category = CATEGORY_SERVER, doc = "The number of threads uses for serializing and deserializing data to and from the metadata store" ) - private int metadataStoreSerDesThreads = Runtime.getRuntime().availableProcessors(); + private int metadataStoreSerDesThreads = 1; ``` +Use 1 as the default value since the serialization and deserialization tasks are not frequent. This separated thread pool is mainly added to avoid blocking the metadata store callback thread. + ### Metrics The `pulsar_batch_metadata_store_executor_queue_size` metric will be removed because the `<name>-batch-flusher` thread won't execute other tasks except for flushing. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index 878bbc4d654..4ef1c594be4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException; import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy; @@ -57,6 +58,8 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac // the secondary group. private ImmutablePair<Set<String>, Set<String>> defaultIsolationGroups; + @Getter + @VisibleForTesting private MetadataCache<BookiesRackConfiguration> bookieMappingCache; private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*"; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index f7e50836583..dede4543fc3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -490,6 +490,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private boolean metadataStoreAllowReadOnlyOperations; + @FieldContext( + category = CATEGORY_SERVER, + doc = "The number of threads used for serializing and deserializing data to and from the metadata store" + ) + private int metadataStoreSerDesThreads = 1; + @Deprecated @FieldContext( category = CATEGORY_SERVER, diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java index 68f92ab416d..936b04386ff 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java @@ -22,12 +22,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import io.netty.util.HashedWheelTimer; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -288,8 +291,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest { secondaryBookieGroup.put(BOOKIE4, BookieInfo.builder().rack("rack0").build()); bookieMapping.put("group2", secondaryBookieGroup); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), null).getResult(); @@ -340,8 +342,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest { + "\": {\"rack\": \"rack0\", \"hostname\": \"bookie3.example.com\"}, \"" + BOOKIE4 + "\": {\"rack\": \"rack2\", \"hostname\": \"bookie4.example.com\"}}}"; - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(StandardCharsets.UTF_8), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, data.getBytes(StandardCharsets.UTF_8)); List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>()).getResult(); @@ -399,8 +400,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest { bookieMapping.put("group1", mainBookieGroup); bookieMapping.put("group2", secondaryBookieGroup); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(), new HashSet<>()).getResult(); @@ -784,8 +784,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest { bookieMapping.put(isolationGroup2, group2); bookieMapping.put(isolationGroup3, group3); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3)); groups.setRight(Sets.newHashSet("")); @@ -808,8 +807,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest { bookieMapping.put(isolationGroup1, group1); bookieMapping.put(isolationGroup2, group2); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); groups.setLeft(Sets.newHashSet(isolationGroup1)); groups.setRight(Sets.newHashSet(isolationGroup2)); @@ -831,12 +829,24 @@ public class IsolatedBookieEnsemblePlacementPolicyTest { bookieMapping.put(isolationGroup1, group1); bookieMapping.put(isolationGroup2, group2); - store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - Optional.empty()).join(); + updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping)); groups.setLeft(Sets.newHashSet(isolationGroup1)); groups.setRight(Sets.newHashSet(isolationGroup2)); blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups); assertTrue(blacklist.isEmpty()); } + + // The policy gets the bookie info asynchronously before each query or update, when putting the bookie info into + // the metadata store, the cache needs some time to receive the notification and update accordingly. + private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy isolationPolicy, byte[] bookieInfo) { + final var cache = isolationPolicy.getBookieMappingCache(); + assertNotNull(cache); // the policy must have been initialized + + final var key = BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH; + final var previousBookieInfo = cache.getIfCached(key); + store.put(key, bookieInfo, Optional.empty()).join(); + Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> + assertNotEquals(cache.getIfCached(key), previousBookieInfo)); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 56949873621..d4b69f8d5fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -429,6 +429,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { .synchronizer(synchronizer) .openTelemetry(openTelemetry) .nodeSizeStats(new DefaultMetadataNodeSizeStats()) + .numSerDesThreads(config.getMetadataStoreSerDesThreads()) .build()); } @@ -1299,6 +1300,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { .metadataStoreName(MetadataStoreConfig.METADATA_STORE) .openTelemetry(openTelemetry) .nodeSizeStats(new DefaultMetadataNodeSizeStats()) + .numSerDesThreads(config.getMetadataStoreSerDesThreads()) .build()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index 6c04889d8f1..6195e9cdae5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -25,10 +25,15 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertSame; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -38,6 +43,10 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; +import org.apache.pulsar.metadata.api.MetadataCacheConfig; +import org.apache.pulsar.metadata.api.MetadataSerde; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.Stat; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; @@ -339,4 +348,60 @@ public class PulsarServiceTest extends MockedPulsarServiceBaseTest { assertTrue(e instanceof PulsarClientException.TimeoutException); } } + + @Test + public void testMetadataSerDesThreads() throws Exception { + final var numSerDesThreads = 5; + final var config = new ServiceConfiguration(); + config.setMetadataStoreSerDesThreads(numSerDesThreads); + config.setClusterName("test"); + config.setMetadataStoreUrl("memory:local"); + config.setConfigurationMetadataStoreUrl("memory:local"); + + @Cleanup final var pulsar = new PulsarService(config); + pulsar.start(); + + BiConsumer<MetadataStore, String> verifier = (store, prefix) -> { + final var serDes = new CustomMetadataSerDes(); + final var cache = store.getMetadataCache(prefix, serDes, MetadataCacheConfig.builder().build()); + for (int i = 0; i < 100 && serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) { + cache.create(prefix + i, "value-" + i).join(); + final var value = cache.get(prefix + i).join(); + assertEquals(value.orElseThrow(), "value-" + i); + final var newValue = cache.readModifyUpdate(prefix + i, s -> s + "-updated").join(); + assertEquals(newValue, "value-" + i + "-updated"); + // Verify the serialization and deserialization are handled by the same thread + assertEquals(serDes.threadNameToSerializedPaths, serDes.threadNameToDeserializedPaths); + } + log.info("SerDes thread mapping: {}", serDes.threadNameToSerializedPaths); + assertEquals(serDes.threadNameToSerializedPaths.keySet().size(), numSerDesThreads); + // Verify a path cannot be handled by multiple threads + final var paths = serDes.threadNameToSerializedPaths.values().stream() + .flatMap(Set::stream).sorted().toList(); + assertEquals(paths.stream().distinct().toList(), paths); + }; + + verifier.accept(pulsar.getLocalMetadataStore(), "/test-local/"); + verifier.accept(pulsar.getConfigurationMetadataStore(), "/test-config/"); + } + + private static class CustomMetadataSerDes implements MetadataSerde<String> { + + final Map<String, Set<String>> threadNameToSerializedPaths = new ConcurrentHashMap<>(); + final Map<String, Set<String>> threadNameToDeserializedPaths = new ConcurrentHashMap<>(); + + @Override + public byte[] serialize(String path, String value) throws IOException{ + threadNameToSerializedPaths.computeIfAbsent(Thread.currentThread().getName(), + __ -> ConcurrentHashMap.newKeySet()).add(path); + return value.getBytes(); + } + + @Override + public String deserialize(String path, byte[] data, Stat stat) throws IOException { + threadNameToDeserializedPaths.computeIfAbsent(Thread.currentThread().getName(), + __ -> ConcurrentHashMap.newKeySet()).add(path); + return new String(data); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java index 9e8bde20b88..390aa1e49e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java @@ -21,7 +21,6 @@ package org.apache.pulsar.broker.stats; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.Attributes; -import java.util.concurrent.ExecutorService; import lombok.Cleanup; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.BrokerTestUtil; @@ -29,7 +28,6 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.metadata.api.MetadataStore; -import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats; import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -53,14 +51,6 @@ public class OpenTelemetryMetadataStoreStatsTest extends BrokerTestBase { var newStats = new MetadataStoreStats( localMetadataStoreName, pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry()); FieldUtils.writeField(localMetadataStore, "metadataStoreStats", newStats, true); - - var currentBatchedStats = (BatchMetadataStoreStats) FieldUtils.readField(localMetadataStore, - "batchMetadataStoreStats", true); - currentBatchedStats.close(); - var currentExecutor = (ExecutorService) FieldUtils.readField(currentBatchedStats, "executor", true); - var newBatchedStats = new BatchMetadataStoreStats(localMetadataStoreName, currentExecutor, - pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry()); - FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats", newBatchedStats, true); } @AfterMethod(alwaysRun = true) @@ -89,7 +79,5 @@ public class OpenTelemetryMetadataStoreStatsTest extends BrokerTestBase { var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); assertMetricLongSumValue(metrics, MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME, attributes, value -> assertThat(value).isPositive()); - assertMetricLongSumValue(metrics, BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes, - value -> assertThat(value).isPositive()); } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java index ef50dc87691..fcde0dce840 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java @@ -104,4 +104,7 @@ public class MetadataStoreConfig { * The estimator to estimate the payload length of metadata node, which used to limit the batch size requested. */ private MetadataNodeSizeStats nodeSizeStats; + + @Builder.Default + private final int numSerDesThreads = 1; } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index b1f0572547c..ca165f0464e 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -39,10 +39,10 @@ import java.util.function.Supplier; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.metadata.api.CacheGetResult; -import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataCacheConfig; import org.apache.pulsar.metadata.api.MetadataSerde; @@ -62,23 +62,26 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica private final MetadataStore store; private final MetadataStoreExtended storeExtended; private final MetadataSerde<T> serde; - private final ScheduledExecutorService executor; + private final OrderedExecutor executor; + private final ScheduledExecutorService schedulerExecutor; private final MetadataCacheConfig<T> cacheConfig; private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>> objCache; public MetadataCacheImpl(String cacheName, MetadataStore store, TypeReference<T> typeRef, - MetadataCacheConfig<T> cacheConfig, ScheduledExecutorService executor) { - this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig, executor); + MetadataCacheConfig<T> cacheConfig, OrderedExecutor executor, + ScheduledExecutorService schedulerExecutor) { + this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig, executor, schedulerExecutor); } public MetadataCacheImpl(String cacheName, MetadataStore store, JavaType type, MetadataCacheConfig<T> cacheConfig, - ScheduledExecutorService executor) { - this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig, executor); + OrderedExecutor executor, ScheduledExecutorService schedulerExecutor) { + this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig, executor, schedulerExecutor); } public MetadataCacheImpl(String cacheName, MetadataStore store, MetadataSerde<T> serde, - MetadataCacheConfig<T> cacheConfig, ScheduledExecutorService executor) { + MetadataCacheConfig<T> cacheConfig, OrderedExecutor executor, + ScheduledExecutorService schedulerExecutor) { this.store = store; if (store instanceof MetadataStoreExtended) { this.storeExtended = (MetadataStoreExtended) store; @@ -88,6 +91,7 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica this.serde = serde; this.cacheConfig = cacheConfig; this.executor = executor; + this.schedulerExecutor = schedulerExecutor; Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder(); if (cacheConfig.getRefreshAfterWriteMillis() > 0) { @@ -101,6 +105,9 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica .buildAsync(new AsyncCacheLoader<String, Optional<CacheGetResult<T>>>() { @Override public CompletableFuture<Optional<CacheGetResult<T>>> asyncLoad(String key, Executor executor) { + if (log.isDebugEnabled()) { + log.debug("Loading key {} into metadata cache {}", key, cacheName); + } return readValueFromStore(key); } @@ -110,12 +117,16 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica Optional<CacheGetResult<T>> oldValue, Executor executor) { if (store instanceof AbstractMetadataStore && ((AbstractMetadataStore) store).isConnected()) { - return readValueFromStore(key).thenApply(val -> { + if (log.isDebugEnabled()) { + log.debug("Reloading key {} into metadata cache {}", key, cacheName); + } + final var future = readValueFromStore(key); + future.thenAccept(val -> { if (cacheConfig.getAsyncReloadConsumer() != null) { cacheConfig.getAsyncReloadConsumer().accept(key, val); } - return val; }); + return future; } else { // Do not try to refresh the cache item if we know that we're not connected to the // metadata store @@ -128,22 +139,46 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica } private CompletableFuture<Optional<CacheGetResult<T>>> readValueFromStore(String path) { - return store.get(path) - .thenCompose(optRes -> { - if (!optRes.isPresent()) { - return FutureUtils.value(Optional.empty()); - } - - try { - GetResult res = optRes.get(); - T obj = serde.deserialize(path, res.getValue(), res.getStat()); - return FutureUtils - .value(Optional.of(new CacheGetResult<>(obj, res.getStat()))); - } catch (Throwable t) { - return FutureUtils.exception(new ContentDeserializationException( - "Failed to deserialize payload for key '" + path + "'", t)); - } - }); + final var future = new CompletableFuture<Optional<CacheGetResult<T>>>(); + store.get(path).thenComposeAsync(optRes -> { + // There could be multiple pending reads for the same path, for example, when a path is created, + // 1. The `accept` method will call `refresh` + // 2. The `put` method will call `refresh` after the metadata store put operation is done + // Both will call this method and the same result will be read. In this case, we only need to deserialize + // the value once. + if (!optRes.isPresent()) { + if (log.isDebugEnabled()) { + log.debug("Key {} not found in metadata store", path); + } + return FutureUtils.value(Optional.<CacheGetResult<T>>empty()); + } + final var res = optRes.get(); + final var cachedFuture = objCache.getIfPresent(path); + if (cachedFuture != null && cachedFuture != future) { + if (log.isDebugEnabled()) { + log.debug("A new read on key {} is in progress or completed, ignore this one", path); + } + return cachedFuture; + } + try { + T obj = serde.deserialize(path, res.getValue(), res.getStat()); + if (log.isDebugEnabled()) { + log.debug("Deserialized value for key {} (version: {}): {}", path, res.getStat().getVersion(), + obj); + } + return FutureUtils.value(Optional.of(new CacheGetResult<>(obj, res.getStat()))); + } catch (Throwable t) { + return FutureUtils.exception(new ContentDeserializationException( + "Failed to deserialize payload for key '" + path + "'", t)); + } + }, executor.chooseThread(path)).whenComplete((result, e) -> { + if (e != null) { + future.completeExceptionally(e.getCause()); + } else { + future.complete(result); + } + }); + return future; } @Override @@ -169,8 +204,9 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica @Override public CompletableFuture<T> readModifyUpdateOrCreate(String path, Function<Optional<T>, T> modifyFunction) { + final var executor = this.executor.chooseThread(path); return executeWithRetry(() -> objCache.get(path) - .thenCompose(optEntry -> { + .thenComposeAsync(optEntry -> { Optional<T> currentValue; long expectedVersion; @@ -202,13 +238,14 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica return store.put(path, newValue, Optional.of(expectedVersion)).thenAccept(__ -> { refresh(path); }).thenApply(__ -> newValueObj); - }), path); + }, executor), path); } @Override public CompletableFuture<T> readModifyUpdate(String path, Function<T, T> modifyFunction) { + final var executor = this.executor.chooseThread(path); return executeWithRetry(() -> objCache.get(path) - .thenCompose(optEntry -> { + .thenComposeAsync(optEntry -> { if (!optEntry.isPresent()) { return FutureUtils.exception(new NotFoundException("")); } @@ -231,59 +268,57 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica return store.put(path, newValue, Optional.of(expectedVersion)).thenAccept(__ -> { refresh(path); }).thenApply(__ -> newValueObj); - }), path); + }, executor), path); + } + + private CompletableFuture<byte[]> serialize(String path, T value) { + final var future = new CompletableFuture<byte[]>(); + executor.executeOrdered(path, () -> { + try { + future.complete(serde.serialize(path, value)); + } catch (Throwable t) { + future.completeExceptionally(t); + } + }); + return future; } @Override public CompletableFuture<Void> create(String path, T value) { - byte[] content; - try { - content = serde.serialize(path, value); - } catch (Throwable t) { - return FutureUtils.exception(t); - } - - CompletableFuture<Void> future = new CompletableFuture<>(); - store.put(path, content, Optional.of(-1L)) - .thenAccept(stat -> { - // Make sure we have the value cached before the operation is completed - // In addition to caching the value, we need to add a watch on the path, - // so when/if it changes on any other node, we are notified and we can - // update the cache - objCache.get(path).whenComplete((stat2, ex) -> { - if (ex == null) { - future.complete(null); - } else { - log.error("Exception while getting path {}", path, ex); - future.completeExceptionally(ex.getCause()); - } - }); - }).exceptionally(ex -> { - if (ex.getCause() instanceof BadVersionException) { - // Use already exists exception to provide more self-explanatory error message - future.completeExceptionally(new AlreadyExistsException(ex.getCause())); - } else { - future.completeExceptionally(ex.getCause()); - } - return null; - }); - + final var future = new CompletableFuture<Void>(); + serialize(path, value).thenCompose(content -> store.put(path, content, Optional.of(-1L))) + // Make sure we have the value cached before the operation is completed + // In addition to caching the value, we need to add a watch on the path, + // so when/if it changes on any other node, we are notified and we can + // update the cache + .thenCompose(__ -> objCache.get(path)) + .whenComplete((__, ex) -> { + if (ex == null) { + future.complete(null); + } else if (ex.getCause() instanceof BadVersionException) { + // Use already exists exception to provide more self-explanatory error message + future.completeExceptionally(new AlreadyExistsException(ex.getCause())); + } else { + future.completeExceptionally(ex.getCause()); + } + }); return future; } @Override public CompletableFuture<Void> put(String path, T value, EnumSet<CreateOption> options) { - final byte[] bytes; - try { - bytes = serde.serialize(path, value); - } catch (IOException e) { - return CompletableFuture.failedFuture(e); - } - if (storeExtended != null) { - return storeExtended.put(path, bytes, Optional.empty(), options).thenAccept(__ -> refresh(path)); - } else { - return store.put(path, bytes, Optional.empty()).thenAccept(__ -> refresh(path)); - } + return serialize(path, value).thenCompose(bytes -> { + if (storeExtended != null) { + return storeExtended.put(path, bytes, Optional.empty(), options); + } else { + return store.put(path, bytes, Optional.empty()); + } + }).thenAccept(__ -> { + if (log.isDebugEnabled()) { + log.debug("Refreshing path {} after put operation", path); + } + refresh(path); + }); } @Override @@ -323,6 +358,9 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica switch (t.getType()) { case Created: case Modified: + if (log.isDebugEnabled()) { + log.debug("Refreshing path {} for {} notification", path, t.getType()); + } refresh(path); break; @@ -354,8 +392,7 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica final var next = backoff.next(); log.info("Update key {} conflicts. Retrying in {} ms. Mandatory stop: {}. Elapsed time: {} ms", key, next, backoff.isMandatoryStopMade(), elapsed); - executor.schedule(() -> execute(op, key, result, backoff), next, - TimeUnit.MILLISECONDS); + schedulerExecutor.schedule(() -> execute(op, key, result, backoff), next, TimeUnit.MILLISECONDS); return null; } result.completeExceptionally(ex.getCause()); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index b0e4b43f700..d118a792e2f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -26,6 +26,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.util.concurrent.DefaultThreadFactory; import io.opentelemetry.api.OpenTelemetry; import java.time.Instant; @@ -38,16 +39,17 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.FutureUtil; @@ -76,7 +78,9 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>(); protected final String metadataStoreName; - protected final ScheduledExecutorService executor; + private final OrderedExecutor serDesExecutor; + private final ExecutorService eventExecutor; + private final ScheduledExecutorService schedulerExecutor; private final AsyncLoadingCache<String, List<String>> childrenCache; private final AsyncLoadingCache<String, Boolean> existsCache; private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList<>(); @@ -93,13 +97,21 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co protected MetadataNodeSizeStats nodeSizeStats; - protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTelemetry, - MetadataNodeSizeStats nodeSizeStats) { + protected AbstractMetadataStore( + String metadataStoreName, OpenTelemetry openTelemetry, MetadataNodeSizeStats nodeSizeStats, + int numSerDesThreads) { this.nodeSizeStats = nodeSizeStats == null ? new DummyMetadataNodeSizeStats() : nodeSizeStats; - this.executor = new ScheduledThreadPoolExecutor(1, - new DefaultThreadFactory( - StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName())); + final var namePrefix = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName + : getClass().getSimpleName(); + this.eventExecutor = Executors.newSingleThreadExecutor( + new DefaultThreadFactory(namePrefix + "-event")); + this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory(namePrefix + "-scheduler")); + this.serDesExecutor = OrderedExecutor.newBuilder() + .numThreads(numSerDesThreads) + .name(namePrefix + "-worker") + .build(); registerListener(this); this.childrenCache = Caffeine.newBuilder() @@ -249,7 +261,8 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co JavaType typeRef = TypeFactory.defaultInstance().constructSimpleType(clazz, null); String cacheName = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName(); MetadataCacheImpl<T> metadataCache = - new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.executor); + new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.serDesExecutor, + this.schedulerExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -258,7 +271,8 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) { String cacheName = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName(); MetadataCacheImpl<T> metadataCache = - new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.executor); + new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.serDesExecutor, + this.schedulerExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -268,7 +282,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co MetadataCacheConfig cacheConfig) { MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig, - this.executor); + this.serDesExecutor, this.schedulerExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -348,7 +362,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co }); return null; - }, executor); + }, eventExecutor); } catch (RejectedExecutionException e) { return FutureUtil.failedFuture(e); } @@ -531,7 +545,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co // Notice listeners. try { - executor.execute(() -> { + eventExecutor.execute(() -> { sessionListeners.forEach(l -> { try { l.accept(event); @@ -556,8 +570,9 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co @Override public void close() throws Exception { - executor.shutdownNow(); - executor.awaitTermination(10, TimeUnit.SECONDS); + MoreExecutors.shutdownAndAwaitTermination(serDesExecutor, 10, TimeUnit.SECONDS); + MoreExecutors.shutdownAndAwaitTermination(schedulerExecutor, 10, TimeUnit.SECONDS); + MoreExecutors.shutdownAndAwaitTermination(eventExecutor, 10, TimeUnit.SECONDS); this.metadataStoreStats.close(); } @@ -574,30 +589,30 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co } } - /** - * Run the task in the executor thread and fail the future if the executor is shutting down. - */ - @VisibleForTesting - public void execute(Runnable task, CompletableFuture<?> future) { + protected final <T> void processEvent(Consumer<T> eventProcessor, T event) { try { - executor.execute(task); - } catch (Throwable t) { - future.completeExceptionally(t); + eventExecutor.execute(() -> eventProcessor.accept(event)); + } catch (RejectedExecutionException e) { + log.warn("Rejected processing event {}", event); } } - /** - * Run the task in the executor thread and fail the future if the executor is shutting down. - */ - @VisibleForTesting - public void execute(Runnable task, Supplier<List<CompletableFuture<?>>> futures) { + protected final void scheduleDelayedTask(long delay, TimeUnit unit, Runnable task) { + schedulerExecutor.schedule(task, delay, unit); + } + + protected final void safeExecuteCallback(Runnable task, Consumer<Throwable> exceptionHandler) { try { - executor.execute(task); - } catch (final Throwable t) { - futures.get().forEach(f -> f.completeExceptionally(t)); + eventExecutor.execute(task); + } catch (Throwable t) { + exceptionHandler.accept(t); } } + protected final void safeExecuteCallback(Runnable task, CompletableFuture<?> future) { + safeExecuteCallback(task, future::completeExceptionally); + } + protected static String parent(String path) { int idx = path.lastIndexOf('/'); if (idx <= 0) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java index 3937fd712dc..e1311fccfe0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java @@ -188,7 +188,7 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore { @Override protected CompletableFuture<Boolean> existsFromStore(String path) { return kv.get(ByteSequence.from(path, StandardCharsets.UTF_8), EXISTS_GET_OPTION) - .thenApplyAsync(gr -> gr.getCount() == 1, executor); + .thenApply(gr -> gr.getCount() == 1); } @Override @@ -204,9 +204,8 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore { } return super.storePut(parent, new byte[0], Optional.empty(), EnumSet.noneOf(CreateOption.class)) // Then create the unique key with the version added in the path - .thenComposeAsync( - stat -> super.storePut(path + stat.getVersion(), data, optExpectedVersion, options), - executor); + .thenCompose( + stat -> super.storePut(path + stat.getVersion(), data, optExpectedVersion, options)); } } @@ -313,9 +312,7 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore { } } else { log.warn("Failed to commit: {}", cause.getMessage()); - executor.execute(() -> { - ops.forEach(o -> o.getFuture().completeExceptionally(ex)); - }); + ops.forEach(o -> o.getFuture().completeExceptionally(ex)); } return null; }); @@ -326,7 +323,7 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore { private void handleBatchOperationResult(TxnResponse txnResponse, List<MetadataOp> ops) { - executor.execute(() -> { + safeExecuteCallbacks(() -> { if (!txnResponse.isSucceeded()) { if (ops.size() > 1) { // Retry individually @@ -404,7 +401,7 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore { } } } - }); + }, ops); } private synchronized CompletableFuture<Void> createLease(boolean retryOnFailure) { @@ -444,9 +441,7 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore { if (retryOnFailure) { future.exceptionally(ex -> { log.warn("Failed to create Etcd lease. Retrying later", ex); - executor.schedule(() -> { - createLease(true); - }, 1, TimeUnit.SECONDS); + scheduleDelayedTask(1, TimeUnit.SECONDS, () -> createLease(true)); return null; }); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index 079cb3130e0..627304b2edc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -79,7 +79,7 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry(), - metadataStoreConfig.getNodeSizeStats()); + metadataStoreConfig.getNodeSizeStats(), metadataStoreConfig.getNumSerDesThreads()); String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length()); // Local means a private data set // update synchronizer and register sync listener diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java index 74bddda7454..08e5478ffcc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java @@ -210,7 +210,7 @@ public class RocksdbMetadataStore extends AbstractMetadataStore { private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { super(metadataStoreConfig.getMetadataStoreName(), metadataStoreConfig.getOpenTelemetry(), - metadataStoreConfig.getNodeSizeStats()); + metadataStoreConfig.getNodeSizeStats(), metadataStoreConfig.getNumSerDesThreads()); this.metadataUrl = metadataURL; try { RocksDB.loadLibrary(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 5bf7e2272f0..f56d6c6941f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -119,7 +119,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore private void processSessionWatcher(WatchedEvent event) { if (sessionWatcher != null) { - executor.execute(() -> sessionWatcher.process(event)); + processEvent(sessionWatcher::process, event); } } @@ -245,9 +245,8 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore countsByType, totalSize, opsForLog); // Retry with the individual operations - executor.schedule(() -> { - ops.forEach(o -> batchOperation(Collections.singletonList(o))); - }, 100, TimeUnit.MILLISECONDS); + scheduleDelayedTask(100, TimeUnit.MILLISECONDS, + () -> ops.forEach(o -> batchOperation(Collections.singletonList(o)))); } else { MetadataStoreException e = getException(code, path); ops.forEach(o -> o.getFuture().completeExceptionally(e)); @@ -256,7 +255,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore } // Trigger all the futures in the batch - execute(() -> { + safeExecuteCallbacks(() -> { for (int i = 0; i < ops.size(); i++) { OpResult opr = results.get(i); MetadataOp op = ops.get(i); @@ -278,7 +277,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore "Operation type not supported in multi: " + op.getType())); } } - }, () -> ops.stream().map(MetadataOp::getFuture).collect(Collectors.toList())); + }, ops); }, null); } catch (Throwable t) { ops.forEach(o -> o.getFuture().completeExceptionally(new MetadataStoreException(t))); @@ -395,7 +394,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore try { zkc.exists(path, null, (rc, path1, ctx, stat) -> { - execute(() -> { + safeExecuteCallback(() -> { Code code = Code.get(rc); if (code == Code.OK) { future.complete(true); @@ -421,7 +420,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore try { zkc.delete(op.getPath(), expectedVersion, (rc, path1, ctx) -> { - execute(() -> { + safeExecuteCallback(() -> { Code code = Code.get(rc); if (code == Code.OK) { future.complete(null); @@ -446,7 +445,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore CreateMode createMode = getCreateMode(opPut.getOptions()); asyncCreateFullPathOptimistic(zkc, opPut.getPath(), opPut.getData(), createMode, (rc, path1, ctx, name) -> { - execute(() -> { + safeExecuteCallback(() -> { Code code = Code.get(rc); if (code == Code.OK) { future.complete(new Stat(name, 0, 0, 0, createMode.isEphemeral(), true)); @@ -460,7 +459,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore }); } else { zkc.setData(opPut.getPath(), opPut.getData(), expectedVersion, (rc, path1, ctx, stat) -> { - execute(() -> { + safeExecuteCallback(() -> { Code code = Code.get(rc); if (code == Code.OK) { future.complete(getStat(path1, stat)); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java index a9319a50fec..30989a41bd1 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java @@ -18,16 +18,20 @@ */ package org.apache.pulsar.metadata.impl.batching; +import com.google.common.util.concurrent.MoreExecutors; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataEventSynchronizer; import org.apache.pulsar.metadata.api.MetadataStoreConfig; @@ -38,6 +42,7 @@ import org.apache.pulsar.metadata.impl.AbstractMetadataStore; import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscUnboundedArrayQueue; +import org.jspecify.annotations.Nullable; @Slf4j public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore { @@ -46,8 +51,6 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore private final MessagePassingQueue<MetadataOp> readOps; private final MessagePassingQueue<MetadataOp> writeOps; - private final AtomicBoolean flushInProgress = new AtomicBoolean(false); - private final boolean enabled; private final int maxDelayMillis; protected final int maxOperations; @@ -55,9 +58,12 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore private MetadataEventSynchronizer synchronizer; private final BatchMetadataStoreStats batchMetadataStoreStats; protected MetadataStoreBatchStrategy metadataStoreBatchStrategy; + @Nullable + private final ScheduledExecutorService flushExecutor; protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) { - super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), conf.getNodeSizeStats()); + super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), conf.getNodeSizeStats(), + conf.getNumSerDesThreads()); this.enabled = conf.isBatchingEnabled(); this.maxDelayMillis = conf.getBatchingMaxDelayMillis(); @@ -67,18 +73,22 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore if (enabled) { readOps = new MpscUnboundedArrayQueue<>(10_000); writeOps = new MpscUnboundedArrayQueue<>(10_000); - scheduledTask = - executor.scheduleAtFixedRate(this::flush, maxDelayMillis, maxDelayMillis, TimeUnit.MILLISECONDS); + final var name = StringUtils.isNotBlank(conf.getMetadataStoreName()) ? conf.getMetadataStoreName() + : getClass().getSimpleName(); + flushExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory( + name + "-batch-flusher")); + scheduledTask = flushExecutor.scheduleAtFixedRate(this::flush, maxDelayMillis, maxDelayMillis, + TimeUnit.MILLISECONDS); } else { scheduledTask = null; readOps = null; writeOps = null; + flushExecutor = null; } // update synchronizer and register sync listener updateMetadataEventSynchronizer(conf.getSynchronizer()); - this.batchMetadataStoreStats = - new BatchMetadataStoreStats(metadataStoreName, executor, conf.getOpenTelemetry()); + this.batchMetadataStoreStats = new BatchMetadataStoreStats(metadataStoreName); this.metadataStoreBatchStrategy = new DefaultMetadataStoreBatchStrategy(maxOperations, maxSize); } @@ -96,12 +106,13 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore op.getFuture().completeExceptionally(ex); } scheduledTask.cancel(true); + MoreExecutors.shutdownAndAwaitTermination(flushExecutor, 10, TimeUnit.SECONDS); } super.close(); this.batchMetadataStoreStats.close(); } - private void flush() { + private synchronized void flush() { List<MetadataOp> currentBatch; if (!readOps.isEmpty()) { while (CollectionUtils.isNotEmpty(currentBatch = metadataStoreBatchStrategy.nextBatch(readOps))) { @@ -113,8 +124,6 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore internalBatchOperation(currentBatch); } } - - flushInProgress.set(false); } @Override @@ -169,8 +178,8 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore internalBatchOperation(Collections.singletonList(op)); return; } - if (queue.size() > maxOperations && flushInProgress.compareAndSet(false, true)) { - executor.execute(this::flush); + if (queue.size() > maxOperations) { + flush(); } } else { internalBatchOperation(Collections.singletonList(op)); @@ -194,4 +203,8 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore } protected abstract void batchOperation(List<MetadataOp> ops); + + protected final void safeExecuteCallbacks(Runnable runnable, List<MetadataOp> ops) { + safeExecuteCallback(runnable, t -> ops.forEach(op -> op.getFuture().completeExceptionally(t))); + } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java index d055dd7da55..407a927bda4 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -61,7 +61,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { private Optional<MetadataEventSynchronizer> synchronizer; public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) { - super("oxia-metadata", OpenTelemetry.noop(), null); + super("oxia-metadata", OpenTelemetry.noop(), null, 1); this.client = oxia; this.identity = identity; this.synchronizer = Optional.empty(); @@ -75,7 +75,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore { boolean enableSessionWatcher) throws Exception { super("oxia-metadata", Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry(), - metadataStoreConfig.getNodeSizeStats()); + metadataStoreConfig.getNodeSizeStats(), metadataStoreConfig.getNumSerDesThreads()); var linger = metadataStoreConfig.getBatchingMaxDelayMillis(); if (!metadataStoreConfig.isBatchingEnabled()) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java index 9549a8df8f9..82cc15d8aaf 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java @@ -18,23 +18,13 @@ */ package org.apache.pulsar.metadata.impl.stats; -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; -import io.prometheus.client.Gauge; import io.prometheus.client.Histogram; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; public final class BatchMetadataStoreStats implements AutoCloseable { private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50, 100, 200, 500, 1000}; private static final String NAME = "name"; - private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge - .build("pulsar_batch_metadata_store_executor_queue_size", "-") - .labelNames(NAME) - .register(); private static final Histogram OPS_WAITING = Histogram .build("pulsar_batch_metadata_store_queue_wait_time", "-") .unit("ms") @@ -54,46 +44,17 @@ public final class BatchMetadataStoreStats implements AutoCloseable { .register(); private final AtomicBoolean closed = new AtomicBoolean(false); - private final ThreadPoolExecutor executor; private final String metadataStoreName; private final Histogram.Child batchOpsWaitingChild; private final Histogram.Child batchExecuteTimeChild; private final Histogram.Child opsPerBatchChild; - public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME = "pulsar.broker.metadata.store.executor.queue.size"; - private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter; - - public BatchMetadataStoreStats(String metadataStoreName, ExecutorService executor, OpenTelemetry openTelemetry) { - if (executor instanceof ThreadPoolExecutor tx) { - this.executor = tx; - } else { - this.executor = null; - } + public BatchMetadataStoreStats(String metadataStoreName) { this.metadataStoreName = metadataStoreName; - - EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() { - @Override - public double get() { - return getQueueSize(); - } - }, metadataStoreName); - this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName); this.batchExecuteTimeChild = BATCH_EXECUTE_TIME.labels(metadataStoreName); this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName); - - var meter = openTelemetry.getMeter("org.apache.pulsar"); - var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, metadataStoreName); - this.batchMetadataStoreSizeCounter = meter - .upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME) - .setDescription("The number of batch operations in the metadata store executor queue") - .setUnit("{operation}") - .buildWithCallback(measurement -> measurement.record(getQueueSize(), attributes)); - } - - private int getQueueSize() { - return executor == null ? 0 : executor.getQueue().size(); } public void recordOpWaiting(long millis) { @@ -111,11 +72,9 @@ public final class BatchMetadataStoreStats implements AutoCloseable { @Override public void close() throws Exception { if (closed.compareAndSet(false, true)) { - EXECUTOR_QUEUE_SIZE.remove(this.metadataStoreName); OPS_WAITING.remove(this.metadataStoreName); BATCH_EXECUTE_TIME.remove(this.metadataStoreName); OPS_PER_BATCH.remove(metadataStoreName); - batchMetadataStoreSizeCounter.close(); } } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java index d42b2228346..0ae0b022a35 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java @@ -96,7 +96,7 @@ public class MetadataStoreFactoryImplTest { public static class MyMetadataStore extends AbstractMetadataStore { protected MyMetadataStore() { - super("custom", OpenTelemetry.noop(), null); + super("custom", OpenTelemetry.noop(), null, 1); } @Override
