This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new a6aab863b4a Revert "[improve][meta] PIP-453: Improve the metadata
store threading model (#25187)"
a6aab863b4a is described below
commit a6aab863b4a86b5dcb9be21045f1333f1c4501f2
Author: coderzc <[email protected]>
AuthorDate: Fri Feb 13 10:49:13 2026 +0800
Revert "[improve][meta] PIP-453: Improve the metadata store threading model
(#25187)"
This reverts commit c5f7271499e8879353139b9b1ee021a8dda111ad.
---
conf/broker.conf | 2 -
conf/standalone.conf | 3 -
pip/pip-453.md | 26 ++-
.../IsolatedBookieEnsemblePlacementPolicy.java | 3 -
.../apache/pulsar/broker/ServiceConfiguration.java | 6 -
.../IsolatedBookieEnsemblePlacementPolicyTest.java | 34 ++--
.../org/apache/pulsar/broker/PulsarService.java | 2 -
.../apache/pulsar/broker/PulsarServiceTest.java | 65 --------
.../stats/OpenTelemetryMetadataStoreStatsTest.java | 12 ++
.../pulsar/metadata/api/MetadataStoreConfig.java | 3 -
.../metadata/cache/impl/MetadataCacheImpl.java | 185 +++++++++------------
.../metadata/impl/AbstractMetadataStore.java | 77 ++++-----
.../pulsar/metadata/impl/EtcdMetadataStore.java | 19 ++-
.../metadata/impl/LocalMemoryMetadataStore.java | 2 +-
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 2 +-
.../pulsar/metadata/impl/ZKMetadataStore.java | 19 ++-
.../batching/AbstractBatchedMetadataStore.java | 39 ++---
.../metadata/impl/oxia/OxiaMetadataStore.java | 4 +-
.../impl/stats/BatchMetadataStoreStats.java | 43 ++++-
.../impl/MetadataStoreFactoryImplTest.java | 2 +-
20 files changed, 232 insertions(+), 316 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 503a7a0e11f..85c0a133c47 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -951,8 +951,6 @@ metadataStoreBatchingMaxOperations=1000
# Maximum size of a batch
metadataStoreBatchingMaxSizeKb=128
-# The number of threads used for serializing and deserializing data to and
from the metadata store
-metadataStoreSerDesThreads=1
### --- Authentication --- ###
diff --git a/conf/standalone.conf b/conf/standalone.conf
index f9c7ccb658f..77e2f37ac88 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -430,9 +430,6 @@ metadataStoreBatchingMaxOperations=1000
# Maximum size of a batch
metadataStoreBatchingMaxSizeKb=128
-# The number of threads used for serializing and deserializing data to and
from the metadata store
-metadataStoreSerDesThreads=1
-
### --- TLS --- ###
# Deprecated - Use webServicePortTls and brokerServicePortTls instead
tlsEnabled=false
diff --git a/pip/pip-453.md b/pip/pip-453.md
index f9109798ba7..a42736b9dda 100644
--- a/pip/pip-453.md
+++ b/pip/pip-453.md
@@ -40,9 +40,8 @@ Additionally, some code paths execute the compute intensive
tasks in the metadat
# High Level Design
-Create 4 sets of threads:
+Create 3 set of threads:
- `<name>-event`: the original metadata store thread, which is now only
responsible to handle notifications. This executor won't be a
`ScheduledExecutorService` anymore.
-- `<name>-scheduler`: a single thread, which is used to schedule tasks like
flushing and retrying failed operations.
- `<name>-batch-flusher`: a single thread, which is used to schedule the
flushing task at a fixed rate. It won't be created if
`metadataStoreBatchingEnabled` is false.
- `<name>-worker`: a fixed thread pool shared by all `MetadataCache` instances
to execute compute intensive tasks like serialization and deserialization. The
same path will be handled by the same thread to keep the processing order on
the same path.
@@ -54,6 +53,25 @@ The only concern is that introducing a new thread to execute
callbacks allows wa
metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());;
```
+Other tasks like the retry on failure is executed in JVM's common
`ForkJoinPool` by `CompletableFuture` APIs. For example:
+
+```diff
+---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
++++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+@@ -245,9 +245,8 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
+ countsByType, totalSize, opsForLog);
+
+ // Retry with the individual operations
+- executor.schedule(() -> {
+- ops.forEach(o ->
batchOperation(Collections.singletonList(o)));
+- }, 100, TimeUnit.MILLISECONDS);
++ CompletableFuture.delayedExecutor(100,
TimeUnit.MILLISECONDS).execute(() ->
++ ops.forEach(o ->
batchOperation(Collections.singletonList(o))));
+ } else {
+ MetadataStoreException e = getException(code, path);
+ ops.forEach(o ->
o.getFuture().completeExceptionally(e));
+```
+
# Detailed Design
## Public-facing Changes
@@ -67,11 +85,9 @@ Add a configurations to specify the number of worker threads
for `MetadataCache`
category = CATEGORY_SERVER,
doc = "The number of threads uses for serializing and
deserializing data to and from the metadata store"
)
- private int metadataStoreSerDesThreads = 1;
+ private int metadataStoreSerDesThreads =
Runtime.getRuntime().availableProcessors();
```
-Use 1 as the default value since the serialization and deserialization tasks
are not frequent. This separated thread pool is mainly added to avoid blocking
the metadata store callback thread.
-
### Metrics
The `pulsar_batch_metadata_store_executor_queue_size` metric will be removed
because the `<name>-batch-flusher` thread won't execute other tasks except for
flushing.
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index 4ef1c594be4..878bbc4d654 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -58,8 +57,6 @@ public class IsolatedBookieEnsemblePlacementPolicy extends
RackawareEnsemblePlac
// the secondary group.
private ImmutablePair<Set<String>, Set<String>> defaultIsolationGroups;
- @Getter
- @VisibleForTesting
private MetadataCache<BookiesRackConfiguration> bookieMappingCache;
private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*";
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index dede4543fc3..f7e50836583 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -490,12 +490,6 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private boolean metadataStoreAllowReadOnlyOperations;
- @FieldContext(
- category = CATEGORY_SERVER,
- doc = "The number of threads used for serializing and
deserializing data to and from the metadata store"
- )
- private int metadataStoreSerDesThreads = 1;
-
@Deprecated
@FieldContext(
category = CATEGORY_SERVER,
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index 936b04386ff..68f92ab416d 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -22,15 +22,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import java.nio.charset.StandardCharsets;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -291,7 +288,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
secondaryBookieGroup.put(BOOKIE4,
BookieInfo.builder().rack("rack0").build());
bookieMapping.put("group2", secondaryBookieGroup);
- updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(),
null).getResult();
@@ -342,7 +340,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
+ "\": {\"rack\": \"rack0\", \"hostname\":
\"bookie3.example.com\"}, \"" + BOOKIE4
+ "\": {\"rack\": \"rack2\", \"hostname\":
\"bookie4.example.com\"}}}";
- updateBookieInfo(isolationPolicy,
data.getBytes(StandardCharsets.UTF_8));
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
data.getBytes(StandardCharsets.UTF_8),
+ Optional.empty()).join();
List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2,
Collections.emptyMap(),
new HashSet<>()).getResult();
@@ -400,7 +399,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
bookieMapping.put("group1", mainBookieGroup);
bookieMapping.put("group2", secondaryBookieGroup);
- updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(),
new HashSet<>()).getResult();
@@ -784,7 +784,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
bookieMapping.put(isolationGroup2, group2);
bookieMapping.put(isolationGroup3, group3);
- updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3));
groups.setRight(Sets.newHashSet(""));
@@ -807,7 +808,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
bookieMapping.put(isolationGroup1, group1);
bookieMapping.put(isolationGroup2, group2);
- updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
groups.setLeft(Sets.newHashSet(isolationGroup1));
groups.setRight(Sets.newHashSet(isolationGroup2));
@@ -829,24 +831,12 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
bookieMapping.put(isolationGroup1, group1);
bookieMapping.put(isolationGroup2, group2);
- updateBookieInfo(isolationPolicy,
jsonMapper.writeValueAsBytes(bookieMapping));
+ store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping),
+ Optional.empty()).join();
groups.setLeft(Sets.newHashSet(isolationGroup1));
groups.setRight(Sets.newHashSet(isolationGroup2));
blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2,
groups);
assertTrue(blacklist.isEmpty());
}
-
- // The policy gets the bookie info asynchronously before each query or
update, when putting the bookie info into
- // the metadata store, the cache needs some time to receive the
notification and update accordingly.
- private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy
isolationPolicy, byte[] bookieInfo) {
- final var cache = isolationPolicy.getBookieMappingCache();
- assertNotNull(cache); // the policy must have been initialized
-
- final var key = BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH;
- final var previousBookieInfo = cache.getIfCached(key);
- store.put(key, bookieInfo, Optional.empty()).join();
- Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() ->
- assertNotEquals(cache.getIfCached(key), previousBookieInfo));
- }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index d4b69f8d5fb..56949873621 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -429,7 +429,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.synchronizer(synchronizer)
.openTelemetry(openTelemetry)
.nodeSizeStats(new DefaultMetadataNodeSizeStats())
-
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
.build());
}
@@ -1300,7 +1299,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.openTelemetry(openTelemetry)
.nodeSizeStats(new DefaultMetadataNodeSizeStats())
-
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
.build());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 6195e9cdae5..6c04889d8f1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -25,15 +25,10 @@ import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertSame;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -43,10 +38,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.metadata.api.MetadataCacheConfig;
-import org.apache.pulsar.metadata.api.MetadataSerde;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.api.Stat;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -348,60 +339,4 @@ public class PulsarServiceTest extends
MockedPulsarServiceBaseTest {
assertTrue(e instanceof PulsarClientException.TimeoutException);
}
}
-
- @Test
- public void testMetadataSerDesThreads() throws Exception {
- final var numSerDesThreads = 5;
- final var config = new ServiceConfiguration();
- config.setMetadataStoreSerDesThreads(numSerDesThreads);
- config.setClusterName("test");
- config.setMetadataStoreUrl("memory:local");
- config.setConfigurationMetadataStoreUrl("memory:local");
-
- @Cleanup final var pulsar = new PulsarService(config);
- pulsar.start();
-
- BiConsumer<MetadataStore, String> verifier = (store, prefix) -> {
- final var serDes = new CustomMetadataSerDes();
- final var cache = store.getMetadataCache(prefix, serDes,
MetadataCacheConfig.builder().build());
- for (int i = 0; i < 100 &&
serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) {
- cache.create(prefix + i, "value-" + i).join();
- final var value = cache.get(prefix + i).join();
- assertEquals(value.orElseThrow(), "value-" + i);
- final var newValue = cache.readModifyUpdate(prefix + i, s -> s
+ "-updated").join();
- assertEquals(newValue, "value-" + i + "-updated");
- // Verify the serialization and deserialization are handled by
the same thread
- assertEquals(serDes.threadNameToSerializedPaths,
serDes.threadNameToDeserializedPaths);
- }
- log.info("SerDes thread mapping: {}",
serDes.threadNameToSerializedPaths);
- assertEquals(serDes.threadNameToSerializedPaths.keySet().size(),
numSerDesThreads);
- // Verify a path cannot be handled by multiple threads
- final var paths =
serDes.threadNameToSerializedPaths.values().stream()
- .flatMap(Set::stream).sorted().toList();
- assertEquals(paths.stream().distinct().toList(), paths);
- };
-
- verifier.accept(pulsar.getLocalMetadataStore(), "/test-local/");
- verifier.accept(pulsar.getConfigurationMetadataStore(),
"/test-config/");
- }
-
- private static class CustomMetadataSerDes implements MetadataSerde<String>
{
-
- final Map<String, Set<String>> threadNameToSerializedPaths = new
ConcurrentHashMap<>();
- final Map<String, Set<String>> threadNameToDeserializedPaths = new
ConcurrentHashMap<>();
-
- @Override
- public byte[] serialize(String path, String value) throws IOException{
-
threadNameToSerializedPaths.computeIfAbsent(Thread.currentThread().getName(),
- __ -> ConcurrentHashMap.newKeySet()).add(path);
- return value.getBytes();
- }
-
- @Override
- public String deserialize(String path, byte[] data, Stat stat) throws
IOException {
-
threadNameToDeserializedPaths.computeIfAbsent(Thread.currentThread().getName(),
- __ -> ConcurrentHashMap.newKeySet()).add(path);
- return new String(data);
- }
- }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
index 390aa1e49e2..9e8bde20b88 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.stats;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes;
+import java.util.concurrent.ExecutorService;
import lombok.Cleanup;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -28,6 +29,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -51,6 +53,14 @@ public class OpenTelemetryMetadataStoreStatsTest extends
BrokerTestBase {
var newStats = new MetadataStoreStats(
localMetadataStoreName,
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
FieldUtils.writeField(localMetadataStore, "metadataStoreStats",
newStats, true);
+
+ var currentBatchedStats = (BatchMetadataStoreStats)
FieldUtils.readField(localMetadataStore,
+ "batchMetadataStoreStats", true);
+ currentBatchedStats.close();
+ var currentExecutor = (ExecutorService)
FieldUtils.readField(currentBatchedStats, "executor", true);
+ var newBatchedStats = new
BatchMetadataStoreStats(localMetadataStoreName, currentExecutor,
+
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
+ FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats",
newBatchedStats, true);
}
@AfterMethod(alwaysRun = true)
@@ -79,5 +89,7 @@ public class OpenTelemetryMetadataStoreStatsTest extends
BrokerTestBase {
var metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
assertMetricLongSumValue(metrics,
MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME,
attributes, value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(metrics,
BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes,
+ value -> assertThat(value).isPositive());
}
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index fcde0dce840..ef50dc87691 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -104,7 +104,4 @@ public class MetadataStoreConfig {
* The estimator to estimate the payload length of metadata node, which
used to limit the batch size requested.
*/
private MetadataNodeSizeStats nodeSizeStats;
-
- @Builder.Default
- private final int numSerDesThreads = 1;
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index ca165f0464e..b1f0572547c 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -39,10 +39,10 @@ import java.util.function.Supplier;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.common.stats.CacheMetricsCollector;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.metadata.api.CacheGetResult;
+import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -62,26 +62,23 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
private final MetadataStore store;
private final MetadataStoreExtended storeExtended;
private final MetadataSerde<T> serde;
- private final OrderedExecutor executor;
- private final ScheduledExecutorService schedulerExecutor;
+ private final ScheduledExecutorService executor;
private final MetadataCacheConfig<T> cacheConfig;
private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>>
objCache;
public MetadataCacheImpl(String cacheName, MetadataStore store,
TypeReference<T> typeRef,
- MetadataCacheConfig<T> cacheConfig,
OrderedExecutor executor,
- ScheduledExecutorService schedulerExecutor) {
- this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef),
cacheConfig, executor, schedulerExecutor);
+ MetadataCacheConfig<T> cacheConfig,
ScheduledExecutorService executor) {
+ this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef),
cacheConfig, executor);
}
public MetadataCacheImpl(String cacheName, MetadataStore store, JavaType
type, MetadataCacheConfig<T> cacheConfig,
- OrderedExecutor executor,
ScheduledExecutorService schedulerExecutor) {
- this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type),
cacheConfig, executor, schedulerExecutor);
+ ScheduledExecutorService executor) {
+ this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type),
cacheConfig, executor);
}
public MetadataCacheImpl(String cacheName, MetadataStore store,
MetadataSerde<T> serde,
- MetadataCacheConfig<T> cacheConfig,
OrderedExecutor executor,
- ScheduledExecutorService schedulerExecutor) {
+ MetadataCacheConfig<T> cacheConfig,
ScheduledExecutorService executor) {
this.store = store;
if (store instanceof MetadataStoreExtended) {
this.storeExtended = (MetadataStoreExtended) store;
@@ -91,7 +88,6 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
this.serde = serde;
this.cacheConfig = cacheConfig;
this.executor = executor;
- this.schedulerExecutor = schedulerExecutor;
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
if (cacheConfig.getRefreshAfterWriteMillis() > 0) {
@@ -105,9 +101,6 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
.buildAsync(new AsyncCacheLoader<String,
Optional<CacheGetResult<T>>>() {
@Override
public CompletableFuture<Optional<CacheGetResult<T>>>
asyncLoad(String key, Executor executor) {
- if (log.isDebugEnabled()) {
- log.debug("Loading key {} into metadata cache {}",
key, cacheName);
- }
return readValueFromStore(key);
}
@@ -117,16 +110,12 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
Optional<CacheGetResult<T>> oldValue,
Executor executor) {
if (store instanceof AbstractMetadataStore &&
((AbstractMetadataStore) store).isConnected()) {
- if (log.isDebugEnabled()) {
- log.debug("Reloading key {} into metadata
cache {}", key, cacheName);
- }
- final var future = readValueFromStore(key);
- future.thenAccept(val -> {
+ return readValueFromStore(key).thenApply(val -> {
if (cacheConfig.getAsyncReloadConsumer() !=
null) {
cacheConfig.getAsyncReloadConsumer().accept(key, val);
}
+ return val;
});
- return future;
} else {
// Do not try to refresh the cache item if we know
that we're not connected to the
// metadata store
@@ -139,46 +128,22 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
}
private CompletableFuture<Optional<CacheGetResult<T>>>
readValueFromStore(String path) {
- final var future = new
CompletableFuture<Optional<CacheGetResult<T>>>();
- store.get(path).thenComposeAsync(optRes -> {
- // There could be multiple pending reads for the same path, for
example, when a path is created,
- // 1. The `accept` method will call `refresh`
- // 2. The `put` method will call `refresh` after the metadata
store put operation is done
- // Both will call this method and the same result will be read. In
this case, we only need to deserialize
- // the value once.
- if (!optRes.isPresent()) {
- if (log.isDebugEnabled()) {
- log.debug("Key {} not found in metadata store", path);
- }
- return FutureUtils.value(Optional.<CacheGetResult<T>>empty());
- }
- final var res = optRes.get();
- final var cachedFuture = objCache.getIfPresent(path);
- if (cachedFuture != null && cachedFuture != future) {
- if (log.isDebugEnabled()) {
- log.debug("A new read on key {} is in progress or
completed, ignore this one", path);
- }
- return cachedFuture;
- }
- try {
- T obj = serde.deserialize(path, res.getValue(), res.getStat());
- if (log.isDebugEnabled()) {
- log.debug("Deserialized value for key {} (version: {}):
{}", path, res.getStat().getVersion(),
- obj);
- }
- return FutureUtils.value(Optional.of(new CacheGetResult<>(obj,
res.getStat())));
- } catch (Throwable t) {
- return FutureUtils.exception(new
ContentDeserializationException(
- "Failed to deserialize payload for key '" + path + "'",
t));
- }
- }, executor.chooseThread(path)).whenComplete((result, e) -> {
- if (e != null) {
- future.completeExceptionally(e.getCause());
- } else {
- future.complete(result);
- }
- });
- return future;
+ return store.get(path)
+ .thenCompose(optRes -> {
+ if (!optRes.isPresent()) {
+ return FutureUtils.value(Optional.empty());
+ }
+
+ try {
+ GetResult res = optRes.get();
+ T obj = serde.deserialize(path, res.getValue(),
res.getStat());
+ return FutureUtils
+ .value(Optional.of(new CacheGetResult<>(obj,
res.getStat())));
+ } catch (Throwable t) {
+ return FutureUtils.exception(new
ContentDeserializationException(
+ "Failed to deserialize payload for key '" +
path + "'", t));
+ }
+ });
}
@Override
@@ -204,9 +169,8 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
@Override
public CompletableFuture<T> readModifyUpdateOrCreate(String path,
Function<Optional<T>, T> modifyFunction) {
- final var executor = this.executor.chooseThread(path);
return executeWithRetry(() -> objCache.get(path)
- .thenComposeAsync(optEntry -> {
+ .thenCompose(optEntry -> {
Optional<T> currentValue;
long expectedVersion;
@@ -238,14 +202,13 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
return store.put(path, newValue,
Optional.of(expectedVersion)).thenAccept(__ -> {
refresh(path);
}).thenApply(__ -> newValueObj);
- }, executor), path);
+ }), path);
}
@Override
public CompletableFuture<T> readModifyUpdate(String path, Function<T, T>
modifyFunction) {
- final var executor = this.executor.chooseThread(path);
return executeWithRetry(() -> objCache.get(path)
- .thenComposeAsync(optEntry -> {
+ .thenCompose(optEntry -> {
if (!optEntry.isPresent()) {
return FutureUtils.exception(new
NotFoundException(""));
}
@@ -268,57 +231,59 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
return store.put(path, newValue,
Optional.of(expectedVersion)).thenAccept(__ -> {
refresh(path);
}).thenApply(__ -> newValueObj);
- }, executor), path);
- }
-
- private CompletableFuture<byte[]> serialize(String path, T value) {
- final var future = new CompletableFuture<byte[]>();
- executor.executeOrdered(path, () -> {
- try {
- future.complete(serde.serialize(path, value));
- } catch (Throwable t) {
- future.completeExceptionally(t);
- }
- });
- return future;
+ }), path);
}
@Override
public CompletableFuture<Void> create(String path, T value) {
- final var future = new CompletableFuture<Void>();
- serialize(path, value).thenCompose(content -> store.put(path, content,
Optional.of(-1L)))
- // Make sure we have the value cached before the operation is
completed
- // In addition to caching the value, we need to add a watch on the
path,
- // so when/if it changes on any other node, we are notified and we
can
- // update the cache
- .thenCompose(__ -> objCache.get(path))
- .whenComplete((__, ex) -> {
- if (ex == null) {
- future.complete(null);
- } else if (ex.getCause() instanceof BadVersionException) {
- // Use already exists exception to provide more
self-explanatory error message
- future.completeExceptionally(new
AlreadyExistsException(ex.getCause()));
- } else {
- future.completeExceptionally(ex.getCause());
- }
- });
+ byte[] content;
+ try {
+ content = serde.serialize(path, value);
+ } catch (Throwable t) {
+ return FutureUtils.exception(t);
+ }
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ store.put(path, content, Optional.of(-1L))
+ .thenAccept(stat -> {
+ // Make sure we have the value cached before the operation
is completed
+ // In addition to caching the value, we need to add a
watch on the path,
+ // so when/if it changes on any other node, we are
notified and we can
+ // update the cache
+ objCache.get(path).whenComplete((stat2, ex) -> {
+ if (ex == null) {
+ future.complete(null);
+ } else {
+ log.error("Exception while getting path {}", path,
ex);
+ future.completeExceptionally(ex.getCause());
+ }
+ });
+ }).exceptionally(ex -> {
+ if (ex.getCause() instanceof BadVersionException) {
+ // Use already exists exception to provide more
self-explanatory error message
+ future.completeExceptionally(new
AlreadyExistsException(ex.getCause()));
+ } else {
+ future.completeExceptionally(ex.getCause());
+ }
+ return null;
+ });
+
return future;
}
@Override
public CompletableFuture<Void> put(String path, T value,
EnumSet<CreateOption> options) {
- return serialize(path, value).thenCompose(bytes -> {
- if (storeExtended != null) {
- return storeExtended.put(path, bytes, Optional.empty(),
options);
- } else {
- return store.put(path, bytes, Optional.empty());
- }
- }).thenAccept(__ -> {
- if (log.isDebugEnabled()) {
- log.debug("Refreshing path {} after put operation", path);
- }
- refresh(path);
- });
+ final byte[] bytes;
+ try {
+ bytes = serde.serialize(path, value);
+ } catch (IOException e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ if (storeExtended != null) {
+ return storeExtended.put(path, bytes, Optional.empty(),
options).thenAccept(__ -> refresh(path));
+ } else {
+ return store.put(path, bytes, Optional.empty()).thenAccept(__ ->
refresh(path));
+ }
}
@Override
@@ -358,9 +323,6 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
switch (t.getType()) {
case Created:
case Modified:
- if (log.isDebugEnabled()) {
- log.debug("Refreshing path {} for {} notification", path,
t.getType());
- }
refresh(path);
break;
@@ -392,7 +354,8 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
final var next = backoff.next();
log.info("Update key {} conflicts. Retrying in {} ms.
Mandatory stop: {}. Elapsed time: {} ms", key,
next, backoff.isMandatoryStopMade(), elapsed);
- schedulerExecutor.schedule(() -> execute(op, key, result,
backoff), next, TimeUnit.MILLISECONDS);
+ executor.schedule(() -> execute(op, key, result, backoff),
next,
+ TimeUnit.MILLISECONDS);
return null;
}
result.completeExceptionally(ex.getCause());
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index d118a792e2f..b0e4b43f700 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -26,7 +26,6 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import java.time.Instant;
@@ -39,17 +38,16 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.stats.CacheMetricsCollector;
import org.apache.pulsar.common.util.FutureUtil;
@@ -78,9 +76,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new
CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Consumer<SessionEvent>>
sessionListeners = new CopyOnWriteArrayList<>();
protected final String metadataStoreName;
- private final OrderedExecutor serDesExecutor;
- private final ExecutorService eventExecutor;
- private final ScheduledExecutorService schedulerExecutor;
+ protected final ScheduledExecutorService executor;
private final AsyncLoadingCache<String, List<String>> childrenCache;
private final AsyncLoadingCache<String, Boolean> existsCache;
private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches =
new CopyOnWriteArrayList<>();
@@ -97,21 +93,13 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected MetadataNodeSizeStats nodeSizeStats;
- protected AbstractMetadataStore(
- String metadataStoreName, OpenTelemetry openTelemetry,
MetadataNodeSizeStats nodeSizeStats,
- int numSerDesThreads) {
+ protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry
openTelemetry,
+ MetadataNodeSizeStats nodeSizeStats) {
this.nodeSizeStats = nodeSizeStats == null ? new
DummyMetadataNodeSizeStats()
: nodeSizeStats;
- final var namePrefix = StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName
- : getClass().getSimpleName();
- this.eventExecutor = Executors.newSingleThreadExecutor(
- new DefaultThreadFactory(namePrefix + "-event"));
- this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(
- new DefaultThreadFactory(namePrefix + "-scheduler"));
- this.serDesExecutor = OrderedExecutor.newBuilder()
- .numThreads(numSerDesThreads)
- .name(namePrefix + "-worker")
- .build();
+ this.executor = new ScheduledThreadPoolExecutor(1,
+ new DefaultThreadFactory(
+ StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName()));
registerListener(this);
this.childrenCache = Caffeine.newBuilder()
@@ -261,8 +249,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
JavaType typeRef =
TypeFactory.defaultInstance().constructSimpleType(clazz, null);
String cacheName = StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName();
MetadataCacheImpl<T> metadataCache =
- new MetadataCacheImpl<T>(cacheName, this, typeRef,
cacheConfig, this.serDesExecutor,
- this.schedulerExecutor);
+ new MetadataCacheImpl<T>(cacheName, this, typeRef,
cacheConfig, this.executor);
metadataCaches.add(metadataCache);
return metadataCache;
}
@@ -271,8 +258,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef,
MetadataCacheConfig cacheConfig) {
String cacheName = StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName();
MetadataCacheImpl<T> metadataCache =
- new MetadataCacheImpl<T>(cacheName, this, typeRef,
cacheConfig, this.serDesExecutor,
- this.schedulerExecutor);
+ new MetadataCacheImpl<T>(cacheName, this, typeRef,
cacheConfig, this.executor);
metadataCaches.add(metadataCache);
return metadataCache;
}
@@ -282,7 +268,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
MetadataCacheConfig
cacheConfig) {
MetadataCacheImpl<T> metadataCache =
new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig,
- this.serDesExecutor, this.schedulerExecutor);
+ this.executor);
metadataCaches.add(metadataCache);
return metadataCache;
}
@@ -362,7 +348,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
});
return null;
- }, eventExecutor);
+ }, executor);
} catch (RejectedExecutionException e) {
return FutureUtil.failedFuture(e);
}
@@ -545,7 +531,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
// Notice listeners.
try {
- eventExecutor.execute(() -> {
+ executor.execute(() -> {
sessionListeners.forEach(l -> {
try {
l.accept(event);
@@ -570,9 +556,8 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
@Override
public void close() throws Exception {
- MoreExecutors.shutdownAndAwaitTermination(serDesExecutor, 10,
TimeUnit.SECONDS);
- MoreExecutors.shutdownAndAwaitTermination(schedulerExecutor, 10,
TimeUnit.SECONDS);
- MoreExecutors.shutdownAndAwaitTermination(eventExecutor, 10,
TimeUnit.SECONDS);
+ executor.shutdownNow();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
this.metadataStoreStats.close();
}
@@ -589,30 +574,30 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
}
- protected final <T> void processEvent(Consumer<T> eventProcessor, T event)
{
+ /**
+ * Run the task in the executor thread and fail the future if the executor
is shutting down.
+ */
+ @VisibleForTesting
+ public void execute(Runnable task, CompletableFuture<?> future) {
try {
- eventExecutor.execute(() -> eventProcessor.accept(event));
- } catch (RejectedExecutionException e) {
- log.warn("Rejected processing event {}", event);
+ executor.execute(task);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
}
}
- protected final void scheduleDelayedTask(long delay, TimeUnit unit,
Runnable task) {
- schedulerExecutor.schedule(task, delay, unit);
- }
-
- protected final void safeExecuteCallback(Runnable task,
Consumer<Throwable> exceptionHandler) {
+ /**
+ * Run the task in the executor thread and fail the future if the executor
is shutting down.
+ */
+ @VisibleForTesting
+ public void execute(Runnable task, Supplier<List<CompletableFuture<?>>>
futures) {
try {
- eventExecutor.execute(task);
- } catch (Throwable t) {
- exceptionHandler.accept(t);
+ executor.execute(task);
+ } catch (final Throwable t) {
+ futures.get().forEach(f -> f.completeExceptionally(t));
}
}
- protected final void safeExecuteCallback(Runnable task,
CompletableFuture<?> future) {
- safeExecuteCallback(task, future::completeExceptionally);
- }
-
protected static String parent(String path) {
int idx = path.lastIndexOf('/');
if (idx <= 0) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
index e1311fccfe0..3937fd712dc 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
@@ -188,7 +188,7 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
@Override
protected CompletableFuture<Boolean> existsFromStore(String path) {
return kv.get(ByteSequence.from(path, StandardCharsets.UTF_8),
EXISTS_GET_OPTION)
- .thenApply(gr -> gr.getCount() == 1);
+ .thenApplyAsync(gr -> gr.getCount() == 1, executor);
}
@Override
@@ -204,8 +204,9 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
}
return super.storePut(parent, new byte[0], Optional.empty(),
EnumSet.noneOf(CreateOption.class))
// Then create the unique key with the version added in
the path
- .thenCompose(
- stat -> super.storePut(path + stat.getVersion(),
data, optExpectedVersion, options));
+ .thenComposeAsync(
+ stat -> super.storePut(path + stat.getVersion(),
data, optExpectedVersion, options),
+ executor);
}
}
@@ -312,7 +313,9 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
}
} else {
log.warn("Failed to commit: {}", cause.getMessage());
- ops.forEach(o -> o.getFuture().completeExceptionally(ex));
+ executor.execute(() -> {
+ ops.forEach(o ->
o.getFuture().completeExceptionally(ex));
+ });
}
return null;
});
@@ -323,7 +326,7 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
private void handleBatchOperationResult(TxnResponse txnResponse,
List<MetadataOp> ops) {
- safeExecuteCallbacks(() -> {
+ executor.execute(() -> {
if (!txnResponse.isSucceeded()) {
if (ops.size() > 1) {
// Retry individually
@@ -401,7 +404,7 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
}
}
}
- }, ops);
+ });
}
private synchronized CompletableFuture<Void> createLease(boolean
retryOnFailure) {
@@ -441,7 +444,9 @@ public class EtcdMetadataStore extends
AbstractBatchedMetadataStore {
if (retryOnFailure) {
future.exceptionally(ex -> {
log.warn("Failed to create Etcd lease. Retrying later", ex);
- scheduleDelayedTask(1, TimeUnit.SECONDS, () ->
createLease(true));
+ executor.schedule(() -> {
+ createLease(true);
+ }, 1, TimeUnit.SECONDS);
return null;
});
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 627304b2edc..079cb3130e0 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -79,7 +79,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig)
throws MetadataStoreException {
super(metadataStoreConfig.getMetadataStoreName(),
metadataStoreConfig.getOpenTelemetry(),
- metadataStoreConfig.getNodeSizeStats(),
metadataStoreConfig.getNumSerDesThreads());
+ metadataStoreConfig.getNodeSizeStats());
String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
// Local means a private data set
// update synchronizer and register sync listener
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 08e5478ffcc..74bddda7454 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -210,7 +210,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig)
throws MetadataStoreException {
super(metadataStoreConfig.getMetadataStoreName(),
metadataStoreConfig.getOpenTelemetry(),
- metadataStoreConfig.getNodeSizeStats(),
metadataStoreConfig.getNumSerDesThreads());
+ metadataStoreConfig.getNodeSizeStats());
this.metadataUrl = metadataURL;
try {
RocksDB.loadLibrary();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index f56d6c6941f..5bf7e2272f0 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -119,7 +119,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
private void processSessionWatcher(WatchedEvent event) {
if (sessionWatcher != null) {
- processEvent(sessionWatcher::process, event);
+ executor.execute(() -> sessionWatcher.process(event));
}
}
@@ -245,8 +245,9 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
countsByType, totalSize, opsForLog);
// Retry with the individual operations
- scheduleDelayedTask(100, TimeUnit.MILLISECONDS,
- () -> ops.forEach(o ->
batchOperation(Collections.singletonList(o))));
+ executor.schedule(() -> {
+ ops.forEach(o ->
batchOperation(Collections.singletonList(o)));
+ }, 100, TimeUnit.MILLISECONDS);
} else {
MetadataStoreException e = getException(code, path);
ops.forEach(o ->
o.getFuture().completeExceptionally(e));
@@ -255,7 +256,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
}
// Trigger all the futures in the batch
- safeExecuteCallbacks(() -> {
+ execute(() -> {
for (int i = 0; i < ops.size(); i++) {
OpResult opr = results.get(i);
MetadataOp op = ops.get(i);
@@ -277,7 +278,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
"Operation type not supported in
multi: " + op.getType()));
}
}
- }, ops);
+ }, () ->
ops.stream().map(MetadataOp::getFuture).collect(Collectors.toList()));
}, null);
} catch (Throwable t) {
ops.forEach(o -> o.getFuture().completeExceptionally(new
MetadataStoreException(t)));
@@ -394,7 +395,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
try {
zkc.exists(path, null, (rc, path1, ctx, stat) -> {
- safeExecuteCallback(() -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(true);
@@ -420,7 +421,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
try {
zkc.delete(op.getPath(), expectedVersion, (rc, path1, ctx) -> {
- safeExecuteCallback(() -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(null);
@@ -445,7 +446,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
CreateMode createMode = getCreateMode(opPut.getOptions());
asyncCreateFullPathOptimistic(zkc, opPut.getPath(),
opPut.getData(), createMode,
(rc, path1, ctx, name) -> {
- safeExecuteCallback(() -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(new Stat(name, 0, 0, 0,
createMode.isEphemeral(), true));
@@ -459,7 +460,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
});
} else {
zkc.setData(opPut.getPath(), opPut.getData(), expectedVersion,
(rc, path1, ctx, stat) -> {
- safeExecuteCallback(() -> {
+ execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(getStat(path1, stat));
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 30989a41bd1..a9319a50fec 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -18,20 +18,16 @@
*/
package org.apache.pulsar.metadata.impl.batching;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -42,7 +38,6 @@ import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscUnboundedArrayQueue;
-import org.jspecify.annotations.Nullable;
@Slf4j
public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore {
@@ -51,6 +46,8 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
private final MessagePassingQueue<MetadataOp> readOps;
private final MessagePassingQueue<MetadataOp> writeOps;
+ private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
+
private final boolean enabled;
private final int maxDelayMillis;
protected final int maxOperations;
@@ -58,12 +55,9 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
private MetadataEventSynchronizer synchronizer;
private final BatchMetadataStoreStats batchMetadataStoreStats;
protected MetadataStoreBatchStrategy metadataStoreBatchStrategy;
- @Nullable
- private final ScheduledExecutorService flushExecutor;
protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
- super(conf.getMetadataStoreName(), conf.getOpenTelemetry(),
conf.getNodeSizeStats(),
- conf.getNumSerDesThreads());
+ super(conf.getMetadataStoreName(), conf.getOpenTelemetry(),
conf.getNodeSizeStats());
this.enabled = conf.isBatchingEnabled();
this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
@@ -73,22 +67,18 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
if (enabled) {
readOps = new MpscUnboundedArrayQueue<>(10_000);
writeOps = new MpscUnboundedArrayQueue<>(10_000);
- final var name =
StringUtils.isNotBlank(conf.getMetadataStoreName()) ?
conf.getMetadataStoreName()
- : getClass().getSimpleName();
- flushExecutor = Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory(
- name + "-batch-flusher"));
- scheduledTask = flushExecutor.scheduleAtFixedRate(this::flush,
maxDelayMillis, maxDelayMillis,
- TimeUnit.MILLISECONDS);
+ scheduledTask =
+ executor.scheduleAtFixedRate(this::flush, maxDelayMillis,
maxDelayMillis, TimeUnit.MILLISECONDS);
} else {
scheduledTask = null;
readOps = null;
writeOps = null;
- flushExecutor = null;
}
// update synchronizer and register sync listener
updateMetadataEventSynchronizer(conf.getSynchronizer());
- this.batchMetadataStoreStats = new
BatchMetadataStoreStats(metadataStoreName);
+ this.batchMetadataStoreStats =
+ new BatchMetadataStoreStats(metadataStoreName, executor,
conf.getOpenTelemetry());
this.metadataStoreBatchStrategy = new
DefaultMetadataStoreBatchStrategy(maxOperations, maxSize);
}
@@ -106,13 +96,12 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
op.getFuture().completeExceptionally(ex);
}
scheduledTask.cancel(true);
- MoreExecutors.shutdownAndAwaitTermination(flushExecutor, 10,
TimeUnit.SECONDS);
}
super.close();
this.batchMetadataStoreStats.close();
}
- private synchronized void flush() {
+ private void flush() {
List<MetadataOp> currentBatch;
if (!readOps.isEmpty()) {
while (CollectionUtils.isNotEmpty(currentBatch =
metadataStoreBatchStrategy.nextBatch(readOps))) {
@@ -124,6 +113,8 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
internalBatchOperation(currentBatch);
}
}
+
+ flushInProgress.set(false);
}
@Override
@@ -178,8 +169,8 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
internalBatchOperation(Collections.singletonList(op));
return;
}
- if (queue.size() > maxOperations) {
- flush();
+ if (queue.size() > maxOperations &&
flushInProgress.compareAndSet(false, true)) {
+ executor.execute(this::flush);
}
} else {
internalBatchOperation(Collections.singletonList(op));
@@ -203,8 +194,4 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
}
protected abstract void batchOperation(List<MetadataOp> ops);
-
- protected final void safeExecuteCallbacks(Runnable runnable,
List<MetadataOp> ops) {
- safeExecuteCallback(runnable, t -> ops.forEach(op ->
op.getFuture().completeExceptionally(t)));
- }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 407a927bda4..d055dd7da55 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -61,7 +61,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
private Optional<MetadataEventSynchronizer> synchronizer;
public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) {
- super("oxia-metadata", OpenTelemetry.noop(), null, 1);
+ super("oxia-metadata", OpenTelemetry.noop(), null);
this.client = oxia;
this.identity = identity;
this.synchronizer = Optional.empty();
@@ -75,7 +75,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
boolean enableSessionWatcher)
throws Exception {
super("oxia-metadata",
Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry(),
- metadataStoreConfig.getNodeSizeStats(),
metadataStoreConfig.getNumSerDesThreads());
+ metadataStoreConfig.getNodeSizeStats());
var linger = metadataStoreConfig.getBatchingMaxDelayMillis();
if (!metadataStoreConfig.isBatchingEnabled()) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
index 82cc15d8aaf..9549a8df8f9 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
@@ -18,13 +18,23 @@
*/
package org.apache.pulsar.metadata.impl.stats;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
public final class BatchMetadataStoreStats implements AutoCloseable {
private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50,
100, 200, 500, 1000};
private static final String NAME = "name";
+ private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge
+ .build("pulsar_batch_metadata_store_executor_queue_size", "-")
+ .labelNames(NAME)
+ .register();
private static final Histogram OPS_WAITING = Histogram
.build("pulsar_batch_metadata_store_queue_wait_time", "-")
.unit("ms")
@@ -44,17 +54,46 @@ public final class BatchMetadataStoreStats implements
AutoCloseable {
.register();
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final ThreadPoolExecutor executor;
private final String metadataStoreName;
private final Histogram.Child batchOpsWaitingChild;
private final Histogram.Child batchExecuteTimeChild;
private final Histogram.Child opsPerBatchChild;
- public BatchMetadataStoreStats(String metadataStoreName) {
+ public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME =
"pulsar.broker.metadata.store.executor.queue.size";
+ private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter;
+
+ public BatchMetadataStoreStats(String metadataStoreName, ExecutorService
executor, OpenTelemetry openTelemetry) {
+ if (executor instanceof ThreadPoolExecutor tx) {
+ this.executor = tx;
+ } else {
+ this.executor = null;
+ }
this.metadataStoreName = metadataStoreName;
+
+ EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() {
+ @Override
+ public double get() {
+ return getQueueSize();
+ }
+ }, metadataStoreName);
+
this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName);
this.batchExecuteTimeChild =
BATCH_EXECUTE_TIME.labels(metadataStoreName);
this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName);
+
+ var meter = openTelemetry.getMeter("org.apache.pulsar");
+ var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME,
metadataStoreName);
+ this.batchMetadataStoreSizeCounter = meter
+ .upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME)
+ .setDescription("The number of batch operations in the
metadata store executor queue")
+ .setUnit("{operation}")
+ .buildWithCallback(measurement ->
measurement.record(getQueueSize(), attributes));
+ }
+
+ private int getQueueSize() {
+ return executor == null ? 0 : executor.getQueue().size();
}
public void recordOpWaiting(long millis) {
@@ -72,9 +111,11 @@ public final class BatchMetadataStoreStats implements
AutoCloseable {
@Override
public void close() throws Exception {
if (closed.compareAndSet(false, true)) {
+ EXECUTOR_QUEUE_SIZE.remove(this.metadataStoreName);
OPS_WAITING.remove(this.metadataStoreName);
BATCH_EXECUTE_TIME.remove(this.metadataStoreName);
OPS_PER_BATCH.remove(metadataStoreName);
+ batchMetadataStoreSizeCounter.close();
}
}
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index 0ae0b022a35..d42b2228346 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -96,7 +96,7 @@ public class MetadataStoreFactoryImplTest {
public static class MyMetadataStore extends AbstractMetadataStore {
protected MyMetadataStore() {
- super("custom", OpenTelemetry.noop(), null, 1);
+ super("custom", OpenTelemetry.noop(), null);
}
@Override