This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d8a823e7b8f [improve][metadata] Add Option-set API to MetadataStore
(#25710)
d8a823e7b8f is described below
commit d8a823e7b8ff645c582bbe682da85219e5baa9a9
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 8 10:07:26 2026 -0700
[improve][metadata] Add Option-set API to MetadataStore (#25710)
---
.../apache/pulsar/metadata/api/MetadataCache.java | 36 +++++--
.../apache/pulsar/metadata/api/MetadataStore.java | 100 +++++++++++++++++---
.../org/apache/pulsar/metadata/api/Option.java | 67 +++++++++++++
.../apache/pulsar/metadata/api/OptionsHelper.java | 76 +++++++++++++++
.../api/extended/MetadataStoreExtended.java | 64 +++++++++----
.../metadata/cache/impl/MetadataCacheImpl.java | 46 ++++-----
.../metadata/impl/AbstractMetadataStore.java | 104 +++++++++++++--------
.../pulsar/metadata/impl/DualMetadataCache.java | 8 +-
.../pulsar/metadata/impl/DualMetadataStore.java | 90 +++++++-----------
.../metadata/impl/FaultInjectionMetadataStore.java | 43 ++++-----
.../metadata/impl/LocalMemoryMetadataStore.java | 14 +--
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 10 +-
.../pulsar/metadata/impl/ZKMetadataStore.java | 22 ++---
.../batching/AbstractBatchedMetadataStore.java | 8 +-
.../pulsar/metadata/impl/batching/OpPut.java | 9 +-
.../metadata/impl/oxia/OxiaMetadataStore.java | 29 +++---
.../impl/MetadataStoreFactoryImplTest.java | 6 +-
17 files changed, 486 insertions(+), 246 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
index 641889b3c18..4a6cd25769e 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
@@ -19,9 +19,11 @@
package org.apache.pulsar.metadata.api;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
@@ -172,21 +174,37 @@ public interface MetadataCache<T> {
/**
* Create or update the value of the given path in the metadata store
without version comparison.
- * <p>
- * This method is equivalent to
- * {@link
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended#put(String,
byte[], Optional, EnumSet)} or
- * {@link MetadataStore#put(String, byte[], Optional)} if the metadata
store does not support this extended API,
- * with `Optional.empty()` as the 3rd argument. It means if the path does
not exist, it will be created. If the path
- * already exists, the new value will override the old value.
- * </p>
+ *
+ * <p>If the path does not exist it will be created; if it exists, the new
value will override the old
+ * value. Recognized options include {@link Option.Ephemeral}, {@link
Option.Sequential},
+ * {@link Option.SecondaryIndex}, and {@link Option.PartitionKey}. Pass
{@link Set#of()} for none.
+ *
* @param path the path of the object in the metadata store
* @param value the object to put in the metadata store
- * @param options the create options if the path does not in the metadata
store
+ * @param opts the set of {@link Option options} for this operation
* @return the future that indicates if this operation failed, it could
fail with
* {@link java.io.IOException} if the value failed to be serialized
* {@link MetadataStoreException} if the metadata store operation failed
*/
- CompletableFuture<Void> put(String path, T value, EnumSet<CreateOption>
options);
+ CompletableFuture<Void> put(String path, T value, Set<Option> opts);
+
+ /**
+ * Legacy {@code EnumSet<CreateOption>} form of {@link #put(String,
Object, Set)}. Translates the
+ * {@link CreateOption} set into the canonical {@code Set<Option>} form
and forwards to the canonical method.
+ */
+ default CompletableFuture<Void> put(String path, T value,
EnumSet<CreateOption> options) {
+ if (options == null || options.isEmpty()) {
+ return put(path, value, Set.of());
+ }
+ Set<Option> opts = new HashSet<>();
+ if (options.contains(CreateOption.Ephemeral)) {
+ opts.add(Option.Ephemeral.INSTANCE);
+ }
+ if (options.contains(CreateOption.Sequential)) {
+ opts.add(Option.Sequential.INSTANCE);
+ }
+ return put(path, value, opts);
+ }
/**
* Delete an object from the metadata store.
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index b2d90800943..71f69a313c2 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.Beta;
import io.github.merlimat.slog.Logger;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
@@ -42,7 +43,8 @@ public interface MetadataStore extends AutoCloseable {
Logger LOG = Logger.get(MetadataStore.class);
/**
- * Read the value of one key, identified by the path
+ * Read the value of one key, identified by the path, with a set of {@link
Option options}, e.g.
+ * {@link Option.PartitionKey} for routing on sharded backends. Pass
{@link Set#of()} for no options.
*
* The async call will return a future that yields a {@link GetResult}
that will contain the value and the
* associated {@link Stat} object.
@@ -51,9 +53,18 @@ public interface MetadataStore extends AutoCloseable {
*
* @param path
* the path of the key to get from the store
+ * @param opts
+ * the set of {@link Option options} for this operation
* @return a future to track the async request
*/
- CompletableFuture<Optional<GetResult>> get(String path);
+ CompletableFuture<Optional<GetResult>> get(String path, Set<Option> opts);
+
+ /**
+ * Like {@link #get(String, Set)} with no options.
+ */
+ default CompletableFuture<Optional<GetResult>> get(String path) {
+ return get(path, Set.of());
+ }
/**
@@ -73,9 +84,18 @@ public interface MetadataStore extends AutoCloseable {
*
* @param path
* the path of the key to get from the store
+ * @param opts
+ * the set of {@link Option options} for this operation
* @return a future to track the async request
*/
- CompletableFuture<List<String>> getChildren(String path);
+ CompletableFuture<List<String>> getChildren(String path, Set<Option> opts);
+
+ /**
+ * Like {@link #getChildren(String, Set)} with no options.
+ */
+ default CompletableFuture<List<String>> getChildren(String path) {
+ return getChildren(path, Set.of());
+ }
/**
@@ -83,14 +103,24 @@ public interface MetadataStore extends AutoCloseable {
*
* If the path itself does not exist, it will return an empty list.
*
- * This method is similar to {@link #getChildren(String)}, but it attempts
to read directly from
+ * This method is similar to {@link #getChildren(String, Set)}, but it
attempts to read directly from
* the underlying store.
*
* @param path
* the path of the key to get from the store
+ * @param opts
+ * the set of {@link Option options} for this operation
* @return a future to track the async request
*/
- CompletableFuture<List<String>> getChildrenFromStore(String path);
+ CompletableFuture<List<String>> getChildrenFromStore(String path,
Set<Option> opts);
+
+ /**
+ * Like {@link #getChildrenFromStore(String, Set)} with no options.
+ */
+ default CompletableFuture<List<String>> getChildrenFromStore(String path) {
+ return getChildrenFromStore(path, Set.of());
+ }
+
/**
* Read whether a specific path exists.
*
@@ -99,19 +129,29 @@ public interface MetadataStore extends AutoCloseable {
*
* @param path
* the path of the key to check on the store
+ * @param opts
+ * the set of {@link Option options} for this operation
* @return a future to track the async request
*/
- CompletableFuture<Boolean> exists(String path);
+ CompletableFuture<Boolean> exists(String path, Set<Option> opts);
+
+ /**
+ * Like {@link #exists(String, Set)} with no options.
+ */
+ default CompletableFuture<Boolean> exists(String path) {
+ return exists(path, Set.of());
+ }
/**
- * Put a new value for a given key.
+ * Put a new value for a given key with a set of {@link Option options},
e.g.
+ * {@link Option.Ephemeral}, {@link Option.Sequential}, {@link
Option.SecondaryIndex}, or
+ * {@link Option.PartitionKey}. Pass {@link Set#of()} for no options.
*
* The caller can specify an expected version to be atomically checked
against the current version of the stored
* data.
*
* The future will return the {@link Stat} object associated with the
newly inserted value.
*
- *
* @param path
* the path of the key to delete from the store
* @param value
@@ -119,11 +159,20 @@ public interface MetadataStore extends AutoCloseable {
* @param expectedVersion
* if present, the version will have to match with the
currently stored value for the operation to
* succeed. Use -1 to enforce a non-existing value.
+ * @param opts
+ * the set of {@link Option options} for this operation
* @throws BadVersionException
* if the expected version doesn't match the actual version of
the data
* @return a future to track the async request
*/
- CompletableFuture<Stat> put(String path, byte[] value, Optional<Long>
expectedVersion);
+ CompletableFuture<Stat> put(String path, byte[] value, Optional<Long>
expectedVersion, Set<Option> opts);
+
+ /**
+ * Like {@link #put(String, byte[], Optional, Set)} with no options.
+ */
+ default CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion) {
+ return put(path, value, expectedVersion, Set.of());
+ }
/**
*
@@ -132,13 +181,22 @@ public interface MetadataStore extends AutoCloseable {
* @param expectedVersion
* if present, the version will have to match with the
currently stored value for the operation to
* succeed
+ * @param opts
+ * the set of {@link Option options} for this operation
* @throws NotFoundException
* if the path is not found
* @throws BadVersionException
* if the expected version doesn't match the actual version of
the data
* @return a future to track the async request
*/
- CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion);
+ CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion, Set<Option> opts);
+
+ /**
+ * Like {@link #delete(String, Optional, Set)} with no options.
+ */
+ default CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion) {
+ return delete(path, expectedVersion, Set.of());
+ }
default CompletableFuture<Void> deleteIfExists(String path, Optional<Long>
expectedVersion) {
return delete(path, expectedVersion)
@@ -278,15 +336,25 @@ public interface MetadataStore extends AutoCloseable {
* @param indexName the secondary index name
* @param secondaryKey the secondary key to look up
* @param fallbackFilter predicate to filter results during fallback scan;
ignored by native implementations
+ * @param opts the set of {@link Option options} for this
operation
* @return list of matching {@link GetResult} entries
*/
default CompletableFuture<List<GetResult>> findByIndex(
String scanPathPrefix, String indexName, String secondaryKey,
- Predicate<GetResult> fallbackFilter) {
+ Predicate<GetResult> fallbackFilter, Set<Option> opts) {
return CompletableFuture.failedFuture(
new MetadataStoreException("Secondary index queries not
supported by this store"));
}
+ /**
+ * Like {@link #findByIndex(String, String, String, Predicate, Set)} with
no options.
+ */
+ default CompletableFuture<List<GetResult>> findByIndex(
+ String scanPathPrefix, String indexName, String secondaryKey,
+ Predicate<GetResult> fallbackFilter) {
+ return findByIndex(scanPathPrefix, indexName, secondaryKey,
fallbackFilter, Set.of());
+ }
+
/**
* Stream all direct children of {@code parentPath} together with their
values.
*
@@ -307,13 +375,21 @@ public interface MetadataStore extends AutoCloseable {
*
* @param parentPath path whose direct children should be streamed
* @param consumer callback that receives records, completion, or an
error
+ * @param opts the set of {@link Option options} for this operation
* @return a future that completes when the scan terminates
*/
- default CompletableFuture<Void> scanChildren(String parentPath,
ScanConsumer consumer) {
+ default CompletableFuture<Void> scanChildren(String parentPath,
ScanConsumer consumer, Set<Option> opts) {
return CompletableFuture.failedFuture(
new MetadataStoreException("scanChildren not supported by this
store"));
}
+ /**
+ * Like {@link #scanChildren(String, ScanConsumer, Set)} with no options.
+ */
+ default CompletableFuture<Void> scanChildren(String parentPath,
ScanConsumer consumer) {
+ return scanChildren(parentPath, consumer, Set.of());
+ }
+
/**
* Returns the default metadata cache config.
*
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java
new file mode 100644
index 00000000000..faee2a3a1e4
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+/**
+ * An option attached to a {@link MetadataStore} operation.
+ *
+ * <p>Methods on the store that accept options take {@code Set<Option>}. The
set of options
+ * actually consulted depends on the operation — for example, {@link
Ephemeral} only affects
+ * {@code put}, while {@link PartitionKey} applies to every operation. Options
that aren't
+ * relevant to an operation are silently ignored, so callers can safely thread
the same option
+ * set through chains of operations.
+ *
+ * <p>New options are added as new subtypes of this sealed interface; existing
method signatures
+ * don't change.
+ */
+public sealed interface Option {
+
+ /**
+ * Mark a record as ephemeral — it disappears when the session that
created it ends.
+ * Only consulted by {@code put}. Equivalent to the legacy
+ * {@link org.apache.pulsar.metadata.api.extended.CreateOption#Ephemeral}.
+ */
+ enum Ephemeral implements Option { INSTANCE }
+
+ /**
+ * Use a server-assigned monotonically increasing sequence in the key.
Only consulted by
+ * {@code put}. Equivalent to the legacy
+ * {@link org.apache.pulsar.metadata.api.extended.CreateOption#Sequential}.
+ */
+ enum Sequential implements Option { INSTANCE }
+
+ /**
+ * Add a secondary-index entry on the record being written. Multiple
{@code SecondaryIndex}
+ * options can be supplied to the same {@code put}. Backends without
native secondary-index
+ * support ignore these.
+ *
+ * @param indexName the secondary-index name
+ * @param secondaryKey the value to index this record under
+ */
+ record SecondaryIndex(String indexName, String secondaryKey) implements
Option {}
+
+ /**
+ * Routing hint for sharded backends. Records sharing the same {@code
partitionKey} are
+ * guaranteed to be co-located in the same shard. Backends without
sharding ignore the hint.
+ * Pass to any operation.
+ *
+ * @param key the partition key (treated opaquely; equality-routed)
+ */
+ record PartitionKey(String key) implements Option {}
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
new file mode 100644
index 00000000000..13c9ff57b8e
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Helpers for extracting typed values from a {@code Set<Option>}.
+ *
+ * <p>{@link MetadataStore} implementations and wrappers use these to consult
only the options
+ * they care about, ignoring the rest.
+ */
+public final class OptionsHelper {
+
+ private OptionsHelper() {}
+
+ public static boolean isEphemeral(Set<Option> opts) {
+ return opts != null && opts.contains(Option.Ephemeral.INSTANCE);
+ }
+
+ public static boolean isSequential(Set<Option> opts) {
+ return opts != null && opts.contains(Option.Sequential.INSTANCE);
+ }
+
+ /** @return the partition-key value, or {@code null} if no {@link
Option.PartitionKey} is present. */
+ public static String partitionKey(Set<Option> opts) {
+ if (opts == null) {
+ return null;
+ }
+ for (Option o : opts) {
+ if (o instanceof Option.PartitionKey pk) {
+ return pk.key();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Build the {@code indexName -> secondaryKey} map from any {@link
Option.SecondaryIndex} entries.
+ * Returns an empty map when no entries are present.
+ */
+ public static Map<String, String> secondaryIndexes(Set<Option> opts) {
+ if (opts == null || opts.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ Map<String, String> map = null;
+ for (Option o : opts) {
+ if (o instanceof Option.SecondaryIndex si) {
+ if (map == null) {
+ map = new HashMap<>();
+ }
+ map.put(si.indexName(), si.secondaryKey());
+ }
+ }
+ return map == null ? Collections.emptyMap() : map;
+ }
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
index ad5c7a25300..22838154098 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
@@ -19,8 +19,10 @@
package org.apache.pulsar.metadata.api.extended;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.pulsar.metadata.api.MetadataEvent;
@@ -29,6 +31,7 @@ import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
+import org.apache.pulsar.metadata.api.Option;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
@@ -44,35 +47,35 @@ public interface MetadataStoreExtended extends
MetadataStore {
}
/**
- * Put a new value for a given key.
- *
- * The caller can specify an expected version to be atomically checked
against the current version of the stored
- * data.
- *
- * The future will return the {@link Stat} object associated with the
newly inserted value.
+ * Legacy {@code EnumSet<CreateOption>} form of {@link
MetadataStore#put(String, byte[], Optional, Set)}.
*
+ * <p>Translates the {@link CreateOption} set into the canonical {@code
Set<Option>} form and
+ * forwards to {@link MetadataStore#put(String, byte[], Optional, Set)}.
*
* @param path
- * the path of the key to delete from the store
+ * the path of the key
* @param value
- * the value to
+ * the value to store
* @param expectedVersion
- * if present, the version will have to match with the
currently stored value for the operation to
- * succeed. Use -1 to enforce a non-existing value.
+ * if present, the version will have to match for the operation
to succeed. Use -1 to enforce a
+ * non-existing value.
* @param options
- * a set of {@link CreateOption} to use if the the key-value
pair is being created
+ * a set of {@link CreateOption} to use if the key-value pair
is being created
* @throws BadVersionException
* if the expected version doesn't match the actual version of
the data
* @return a future to track the async request
*/
- CompletableFuture<Stat> put(String path, byte[] value, Optional<Long>
expectedVersion,
- EnumSet<CreateOption> options);
+ default CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion,
+ EnumSet<CreateOption> options) {
+ return put(path, value, expectedVersion, toOptions(options, null));
+ }
/**
- * Put a new value for a given key with secondary index associations.
+ * Legacy {@code EnumSet<CreateOption>} + {@code Map<String,String>} form
of
+ * {@link MetadataStore#put(String, byte[], Optional, Set)}.
*
- * <p>Secondary indexes are hints: stores that don't support them simply
ignore them
- * and delegate to the regular {@link #put(String, byte[], Optional,
EnumSet)} method.
+ * <p>Translates the inputs into the canonical {@code Set<Option>} form
and forwards to
+ * {@link MetadataStore#put(String, byte[], Optional, Set)}.
*
* @param path the path of the key
* @param value the value to store
@@ -84,7 +87,34 @@ public interface MetadataStoreExtended extends MetadataStore
{
*/
default CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion,
EnumSet<CreateOption> options, Map<String, String>
secondaryIndexes) {
- return put(path, value, expectedVersion, options);
+ return put(path, value, expectedVersion, toOptions(options,
secondaryIndexes));
+ }
+
+ /**
+ * Translate the legacy {@code EnumSet<CreateOption>} + {@code
Map<String,String>} form into the
+ * canonical {@code Set<Option>} form.
+ */
+ private static Set<Option> toOptions(EnumSet<CreateOption> options,
Map<String, String> secondaryIndexes) {
+ boolean hasOptions = options != null && !options.isEmpty();
+ boolean hasIndexes = secondaryIndexes != null &&
!secondaryIndexes.isEmpty();
+ if (!hasOptions && !hasIndexes) {
+ return Set.of();
+ }
+ Set<Option> result = new HashSet<>();
+ if (hasOptions) {
+ if (options.contains(CreateOption.Ephemeral)) {
+ result.add(Option.Ephemeral.INSTANCE);
+ }
+ if (options.contains(CreateOption.Sequential)) {
+ result.add(Option.Sequential.INSTANCE);
+ }
+ }
+ if (hasIndexes) {
+ for (Map.Entry<String, String> e : secondaryIndexes.entrySet()) {
+ result.add(new Option.SecondaryIndex(e.getKey(),
e.getValue()));
+ }
+ }
+ return result;
}
/**
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index cab2c3a2e0a..a0a23dbd10e 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -28,10 +28,11 @@ import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
-import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -56,7 +57,7 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException
import
org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.Notification;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.Option;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
@@ -263,9 +264,8 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
}
final T finalNewValue = newValueObj;
- return putWithIndexes(path, newValue,
Optional.of(expectedVersion),
- EnumSet.noneOf(CreateOption.class),
- indexExtractor.apply(finalNewValue))
+ return store.put(path, newValue,
Optional.of(expectedVersion),
+
toOptions(indexExtractor.apply(finalNewValue)))
.thenAccept(__ -> refresh(path))
.thenApply(__ -> finalNewValue);
}, executor), path);
@@ -292,9 +292,8 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
public CompletableFuture<Void> create(String path, T value,
Function<T, Map<String, String>>
indexExtractor) {
final var future = new CompletableFuture<Void>();
- serialize(path, value).thenCompose(content -> putWithIndexes(
- path, content, Optional.of(-1L),
EnumSet.noneOf(CreateOption.class),
- indexExtractor.apply(value)))
+ serialize(path, value).thenCompose(content -> store.put(
+ path, content, Optional.of(-1L),
toOptions(indexExtractor.apply(value))))
// Make sure we have the value cached before the operation is
completed
// In addition to caching the value, we need to add a watch on the
path,
// so when/if it changes on any other node, we are notified and we
can
@@ -313,28 +312,23 @@ public class MetadataCacheImpl<T> implements
MetadataCache<T>, Consumer<Notifica
return future;
}
- /**
- * Route writes through the extended store API when secondary indexes are
present and
- * the underlying store supports them. Falls back to the base {@code put}
otherwise.
- */
- private CompletableFuture<?> putWithIndexes(String path, byte[] content,
Optional<Long> version,
- EnumSet<CreateOption> options,
- Map<String, String> indexes) {
- if (storeExtended != null && indexes != null && !indexes.isEmpty()) {
- return storeExtended.put(path, content, version, options, indexes);
+ /** Build a {@link Set} of {@link Option.SecondaryIndex} entries from an
{@code indexName -> key} map. */
+ private static Set<Option> toOptions(Map<String, String> indexes) {
+ if (indexes == null || indexes.isEmpty()) {
+ return Set.of();
}
- return store.put(path, content, version);
+ Set<Option> opts = new HashSet<>();
+ for (Map.Entry<String, String> e : indexes.entrySet()) {
+ opts.add(new Option.SecondaryIndex(e.getKey(), e.getValue()));
+ }
+ return opts;
}
@Override
- public CompletableFuture<Void> put(String path, T value,
EnumSet<CreateOption> options) {
- return serialize(path, value).thenCompose(bytes -> {
- if (storeExtended != null) {
- return storeExtended.put(path, bytes, Optional.empty(),
options);
- } else {
- return store.put(path, bytes, Optional.empty());
- }
- }).thenAccept(__ -> {
+ public CompletableFuture<Void> put(String path, T value, Set<Option> opts)
{
+ return serialize(path, value).thenCompose(bytes ->
+ store.put(path, bytes, Optional.empty(), opts)
+ ).thenAccept(__ -> {
log.debug().attr("path", path).log("Refreshing path after put
operation");
refresh(path);
});
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 02086898e9f..c7b9f2b582f 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -34,10 +34,8 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
import java.time.Instant;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -70,6 +68,8 @@ import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
@@ -103,6 +103,12 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected abstract CompletableFuture<Boolean> existsFromStore(String path);
+ // Re-declare the legacy no-opts public methods as abstract here so that
backends keep providing them
+ // as the concrete implementation hooks. The canonical {@code Set<Option>}
forms in this class delegate
+ // to these via virtual dispatch.
+ @Override
+ public abstract CompletableFuture<List<String>>
getChildrenFromStore(String path);
+
protected MetadataNodeSizeStats nodeSizeStats;
protected AbstractMetadataStore(
@@ -231,7 +237,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
CompletableFuture<?> updateResult = (event.getType() ==
NotificationType.Deleted)
? deleteInternal(event.getPath(), Optional.empty())
: putInternal(event.getPath(), event.getValue(),
- Optional.ofNullable(event.getExpectedVersion()), options);
+ Optional.ofNullable(event.getExpectedVersion()),
fromLegacyCreateOptions(options));
updateResult.thenApply(stat -> {
log.debug().attr("path", event.getPath()).log("successfully
updated");
return result.complete(null);
@@ -339,7 +345,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
@Override
- public CompletableFuture<Optional<GetResult>> get(String path) {
+ public CompletableFuture<Optional<GetResult>> get(String path, Set<Option>
opts) {
if (isClosed()) {
return alreadyClosedFailedFuture();
}
@@ -363,12 +369,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected abstract CompletableFuture<Optional<GetResult>> storeGet(String
path);
@Override
- public CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion) {
- return put(path, value, expectedVersion,
EnumSet.noneOf(CreateOption.class));
- }
-
- @Override
- public final CompletableFuture<List<String>> getChildren(String path) {
+ public final CompletableFuture<List<String>> getChildren(String path,
Set<Option> opts) {
if (isClosed()) {
return alreadyClosedFailedFuture();
}
@@ -383,7 +384,12 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
@Override
- public final CompletableFuture<Boolean> exists(String path) {
+ public CompletableFuture<List<String>> getChildrenFromStore(String path,
Set<Option> opts) {
+ return getChildrenFromStore(path);
+ }
+
+ @Override
+ public final CompletableFuture<Boolean> exists(String path, Set<Option>
opts) {
if (isClosed()) {
return alreadyClosedFailedFuture();
}
@@ -445,7 +451,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected abstract CompletableFuture<Void> storeDelete(String path,
Optional<Long> expectedVersion);
@Override
- public final CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion) {
+ public final CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion, Set<Option> opts) {
log.info().attr("path", path).attr("expectedVersion",
expectedVersion).log("Deleting path");
if (isClosed()) {
return alreadyClosedFailedFuture();
@@ -512,19 +518,18 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
});
}
+ /**
+ * Backend hook for writing a value. Implementations consume {@link
Option} entries from {@code opts}
+ * via {@link OptionsHelper} (e.g. {@link OptionsHelper#isEphemeral},
{@link OptionsHelper#isSequential},
+ * {@link OptionsHelper#secondaryIndexes}, {@link
OptionsHelper#partitionKey}).
+ */
protected abstract CompletableFuture<Stat> storePut(String path, byte[]
data, Optional<Long> optExpectedVersion,
- EnumSet<CreateOption>
options);
-
- protected CompletableFuture<Stat> storePut(String path, byte[] data,
Optional<Long> optExpectedVersion,
- EnumSet<CreateOption> options,
- Map<String, String>
secondaryIndexes) {
- return storePut(path, data, optExpectedVersion, options);
- }
+ Set<Option> opts);
@Override
public CompletableFuture<List<GetResult>> findByIndex(
String scanPathPrefix, String indexName, String secondaryKey,
- Predicate<GetResult> fallbackFilter) {
+ Predicate<GetResult> fallbackFilter, Set<Option> opts) {
if (isClosed()) {
return alreadyClosedFailedFuture();
}
@@ -532,7 +537,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
@Override
- public CompletableFuture<Void> scanChildren(String parentPath,
ScanConsumer consumer) {
+ public CompletableFuture<Void> scanChildren(String parentPath,
ScanConsumer consumer, Set<Option> opts) {
if (isClosed()) {
CompletableFuture<Void> failed = alreadyClosedFailedFuture();
failed.whenComplete((__, ex) -> {
@@ -601,13 +606,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
@Override
public final CompletableFuture<Stat> put(String path, byte[] data,
Optional<Long> optExpectedVersion,
- EnumSet<CreateOption> options) {
- return put(path, data, optExpectedVersion, options,
Collections.emptyMap());
- }
-
- @Override
- public final CompletableFuture<Stat> put(String path, byte[] data,
Optional<Long> optExpectedVersion,
- EnumSet<CreateOption> options, Map<String, String>
secondaryIndexes) {
+ Set<Option> opts) {
if (isClosed()) {
return alreadyClosedFailedFuture();
}
@@ -616,15 +615,14 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() -
start);
return FutureUtil.failedFuture(new
MetadataStoreException.InvalidPathException(path));
}
- HashSet<CreateOption> ops = new HashSet<>(options);
if (getMetadataEventSynchronizer().isPresent()) {
Long version = optExpectedVersion.isPresent() &&
optExpectedVersion.get() < 0 ? null
: optExpectedVersion.orElse(null);
- MetadataEvent event = new MetadataEvent(path, data, ops, version,
+ MetadataEvent event = new MetadataEvent(path, data,
toLegacyCreateOptions(opts), version,
Instant.now().toEpochMilli(),
getMetadataEventSynchronizer().get().getClusterName(),
NotificationType.Modified);
return getMetadataEventSynchronizer().get().notify(event)
- .thenCompose(__ -> putInternal(path, data,
optExpectedVersion, options, secondaryIndexes))
+ .thenCompose(__ -> putInternal(path, data,
optExpectedVersion, opts))
.whenComplete((v, t) -> {
if (t != null) {
metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
@@ -634,7 +632,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
});
} else {
- return putInternal(path, data, optExpectedVersion, options,
secondaryIndexes)
+ return putInternal(path, data, optExpectedVersion, opts)
.whenComplete((v, t) -> {
if (t != null) {
metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
@@ -647,17 +645,45 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
- public final CompletableFuture<Stat> putInternal(String path, byte[] data,
Optional<Long> optExpectedVersion,
- Set<CreateOption> options) {
- return putInternal(path, data, optExpectedVersion, options,
Collections.emptyMap());
+ /**
+ * Translate {@link Option.Ephemeral}/{@link Option.Sequential} entries
from {@code opts} into the
+ * legacy {@link CreateOption} set carried by {@link MetadataEvent} for
sync replication. Other
+ * {@link Option} kinds (e.g. {@link Option.SecondaryIndex}, {@link
Option.PartitionKey}) are not
+ * propagated through the legacy event payload.
+ */
+ private static HashSet<CreateOption> toLegacyCreateOptions(Set<Option>
opts) {
+ HashSet<CreateOption> result = new HashSet<>();
+ if (OptionsHelper.isEphemeral(opts)) {
+ result.add(CreateOption.Ephemeral);
+ }
+ if (OptionsHelper.isSequential(opts)) {
+ result.add(CreateOption.Sequential);
+ }
+ return result;
+ }
+
+ /**
+ * Translate a legacy {@link CreateOption} set (carried by replicated
{@link MetadataEvent} payloads)
+ * into the canonical {@code Set<Option>} form consumed by the {@code
storePut} hook.
+ */
+ private static Set<Option> fromLegacyCreateOptions(Set<CreateOption>
options) {
+ if (options == null || options.isEmpty()) {
+ return Set.of();
+ }
+ HashSet<Option> result = new HashSet<>();
+ if (options.contains(CreateOption.Ephemeral)) {
+ result.add(Option.Ephemeral.INSTANCE);
+ }
+ if (options.contains(CreateOption.Sequential)) {
+ result.add(Option.Sequential.INSTANCE);
+ }
+ return result;
}
public final CompletableFuture<Stat> putInternal(String path, byte[] data,
Optional<Long> optExpectedVersion,
- Set<CreateOption> options, Map<String, String> secondaryIndexes) {
- var enumOptions =
- (options != null && !options.isEmpty()) ?
EnumSet.copyOf(options) : EnumSet.noneOf(CreateOption.class);
+ Set<Option> opts) {
// Ensure caches are invalidated before the operation is confirmed
- return storePut(path, data, optExpectedVersion, enumOptions,
secondaryIndexes)
+ return storePut(path, data, optExpectedVersion, opts == null ?
Set.of() : opts)
.thenApply(stat -> {
NotificationType type = stat.isFirstVersion() ?
NotificationType.Created
: NotificationType.Modified;
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataCache.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataCache.java
index 47cc47448b4..45ad7fc51f7 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataCache.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataCache.java
@@ -19,9 +19,9 @@
package org.apache.pulsar.metadata.impl;
import com.fasterxml.jackson.core.type.TypeReference;
-import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -30,7 +30,7 @@ import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.Option;
public class DualMetadataCache<T> implements MetadataCache<T> {
private final DualMetadataStore dualMetadataStore;
@@ -106,8 +106,8 @@ public class DualMetadataCache<T> implements
MetadataCache<T> {
}
@Override
- public CompletableFuture<Void> put(String path, T value,
EnumSet<CreateOption> options) {
- return metadataCache.get().put(path, value, options);
+ public CompletableFuture<Void> put(String path, T value, Set<Option> opts)
{
+ return metadataCache.get().put(path, value, opts);
}
@Override
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index bcc6b7226da..55ab3f74055 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -23,7 +23,6 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.util.EnumSet;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -51,6 +50,8 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
@@ -262,124 +263,97 @@ public class DualMetadataStore implements
MetadataStoreExtended {
}
@Override
- public CompletableFuture<Optional<GetResult>> get(String path) {
+ public CompletableFuture<Optional<GetResult>> get(String path, Set<Option>
opts) {
return switch (migrationState.getPhase()) {
- case NOT_STARTED, PREPARATION, COPYING, FAILED ->
sourceStore.get(path);
- case COMPLETED -> targetStore.get(path);
+ case NOT_STARTED, PREPARATION, COPYING, FAILED ->
sourceStore.get(path, opts);
+ case COMPLETED -> targetStore.get(path, opts);
};
}
@Override
- public CompletableFuture<List<String>> getChildren(String path) {
+ public CompletableFuture<List<String>> getChildren(String path,
Set<Option> opts) {
return switch (migrationState.getPhase()) {
- case NOT_STARTED, PREPARATION, COPYING, FAILED ->
sourceStore.getChildren(path);
- case COMPLETED -> targetStore.getChildren(path);
+ case NOT_STARTED, PREPARATION, COPYING, FAILED ->
sourceStore.getChildren(path, opts);
+ case COMPLETED -> targetStore.getChildren(path, opts);
};
}
@Override
- public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+ public CompletableFuture<List<String>> getChildrenFromStore(String path,
Set<Option> opts) {
return switch (migrationState.getPhase()) {
- case NOT_STARTED, PREPARATION, COPYING, FAILED ->
sourceStore.getChildrenFromStore(path);
- case COMPLETED -> targetStore.getChildrenFromStore(path);
+ case NOT_STARTED, PREPARATION, COPYING, FAILED ->
sourceStore.getChildrenFromStore(path, opts);
+ case COMPLETED -> targetStore.getChildrenFromStore(path, opts);
};
}
@Override
- public CompletableFuture<Boolean> exists(String path) {
+ public CompletableFuture<List<String>> getChildrenFromStore(String path) {
return switch (migrationState.getPhase()) {
- case NOT_STARTED, PREPARATION, COPYING, FAILED ->
sourceStore.exists(path);
- case COMPLETED -> targetStore.exists(path);
+ case NOT_STARTED, PREPARATION, COPYING, FAILED ->
sourceStore.getChildrenFromStore(path);
+ case COMPLETED -> targetStore.getChildrenFromStore(path);
};
}
@Override
- public CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion) {
- return put(path, value, expectedVersion,
EnumSet.noneOf(CreateOption.class));
+ public CompletableFuture<Boolean> exists(String path, Set<Option> opts) {
+ return switch (migrationState.getPhase()) {
+ case NOT_STARTED, PREPARATION, COPYING, FAILED ->
sourceStore.exists(path, opts);
+ case COMPLETED -> targetStore.exists(path, opts);
+ };
}
@Override
public CompletableFuture<List<GetResult>> findByIndex(
String scanPathPrefix, String indexName, String secondaryKey,
- Predicate<GetResult> fallbackFilter) {
+ Predicate<GetResult> fallbackFilter, Set<Option> opts) {
return switch (migrationState.getPhase()) {
case NOT_STARTED, PREPARATION, COPYING, FAILED ->
- sourceStore.findByIndex(scanPathPrefix, indexName,
secondaryKey, fallbackFilter);
+ sourceStore.findByIndex(scanPathPrefix, indexName,
secondaryKey, fallbackFilter, opts);
case COMPLETED ->
- targetStore.findByIndex(scanPathPrefix, indexName,
secondaryKey, fallbackFilter);
+ targetStore.findByIndex(scanPathPrefix, indexName,
secondaryKey, fallbackFilter, opts);
};
}
@Override
- public CompletableFuture<Void> scanChildren(String parentPath,
ScanConsumer consumer) {
+ public CompletableFuture<Void> scanChildren(String parentPath,
ScanConsumer consumer, Set<Option> opts) {
return switch (migrationState.getPhase()) {
case NOT_STARTED, PREPARATION, COPYING, FAILED ->
- sourceStore.scanChildren(parentPath, consumer);
+ sourceStore.scanChildren(parentPath, consumer, opts);
case COMPLETED ->
- targetStore.scanChildren(parentPath, consumer);
+ targetStore.scanChildren(parentPath, consumer, opts);
};
}
@Override
- public CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion,
- EnumSet<CreateOption> options,
Map<String, String> secondaryIndexes) {
- switch (migrationState.getPhase()) {
- case NOT_STARTED, FAILED -> {
- if (options.contains(CreateOption.Ephemeral)) {
- localEphemeralPaths.add(path);
- }
- pendingSourceWrites.incrementAndGet();
- var future = sourceStore.put(path, value, expectedVersion,
options, secondaryIndexes);
- future.whenComplete((result, e) ->
pendingSourceWrites.decrementAndGet());
- return future;
- }
- case PREPARATION, COPYING -> {
- return
CompletableFuture.failedFuture(READ_ONLY_STATE_EXCEPTION);
- }
- case COMPLETED -> {
- return targetStore.put(path, value, expectedVersion, options,
secondaryIndexes);
- }
- default -> throw new IllegalStateException("Invalid phase " +
migrationState.getPhase());
- }
- }
-
- @Override
- public CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion,
- EnumSet<CreateOption> options) {
+ public CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion, Set<Option> opts) {
switch (migrationState.getPhase()) {
case NOT_STARTED, FAILED -> {
- // Track ephemeral nodes
- if (options.contains(CreateOption.Ephemeral)) {
+ if (OptionsHelper.isEphemeral(opts)) {
localEphemeralPaths.add(path);
}
-
- // Track pending writes
pendingSourceWrites.incrementAndGet();
- var future = sourceStore.put(path, value, expectedVersion,
options);
+ var future = sourceStore.put(path, value, expectedVersion,
opts);
future.whenComplete((result, e) ->
pendingSourceWrites.decrementAndGet());
return future;
}
-
case PREPARATION, COPYING -> {
return
CompletableFuture.failedFuture(READ_ONLY_STATE_EXCEPTION);
}
-
case COMPLETED -> {
- return targetStore.put(path, value, expectedVersion, options);
+ return targetStore.put(path, value, expectedVersion, opts);
}
-
default -> throw new IllegalStateException("Invalid phase " +
migrationState.getPhase());
}
}
@Override
- public CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion) {
+ public CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion, Set<Option> opts) {
switch (migrationState.getPhase()) {
case NOT_STARTED, FAILED -> {
localEphemeralPaths.remove(path);
pendingSourceWrites.incrementAndGet();
- var future = sourceStore.delete(path, expectedVersion);
+ var future = sourceStore.delete(path, expectedVersion, opts);
future.whenComplete((result, e) ->
pendingSourceWrites.decrementAndGet());
return future;
}
@@ -389,7 +363,7 @@ public class DualMetadataStore implements
MetadataStoreExtended {
}
case COMPLETED -> {
- return targetStore.delete(path, expectedVersion);
+ return targetStore.delete(path, expectedVersion, opts);
}
default -> throw new IllegalStateException("Invalid phase " +
migrationState.getPhase());
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index de988b6b1c8..6fbd22e3888 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -19,9 +19,9 @@
package org.apache.pulsar.metadata.impl;
import com.fasterxml.jackson.core.type.TypeReference;
-import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
@@ -38,9 +38,9 @@ import org.apache.pulsar.metadata.api.MetadataEvent;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.Option;
import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
@@ -77,74 +77,63 @@ public class FaultInjectionMetadataStore implements
MetadataStoreExtended {
}
@Override
- public CompletableFuture<Optional<GetResult>> get(String path) {
+ public CompletableFuture<Optional<GetResult>> get(String path, Set<Option>
opts) {
Optional<MetadataStoreException> ex =
programmedFailure(OperationType.GET, path);
if (ex.isPresent()) {
return FutureUtil.failedFuture(ex.get());
}
- return store.get(path);
+ return store.get(path, opts);
}
@Override
- public CompletableFuture<List<String>> getChildren(String path) {
+ public CompletableFuture<List<String>> getChildren(String path,
Set<Option> opts) {
Optional<MetadataStoreException> ex =
programmedFailure(OperationType.GET_CHILDREN, path);
if (ex.isPresent()) {
return FutureUtil.failedFuture(ex.get());
}
- return store.getChildren(path);
+ return store.getChildren(path, opts);
}
@Override
- public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+ public CompletableFuture<List<String>> getChildrenFromStore(String path,
Set<Option> opts) {
Optional<MetadataStoreException> ex =
programmedFailure(OperationType.GET_CHILDREN, path);
if (ex.isPresent()) {
return FutureUtil.failedFuture(ex.get());
}
- return store.getChildrenFromStore(path);
+ return store.getChildrenFromStore(path, opts);
}
@Override
- public CompletableFuture<Boolean> exists(String path) {
+ public CompletableFuture<Boolean> exists(String path, Set<Option> opts) {
Optional<MetadataStoreException> ex =
programmedFailure(OperationType.EXISTS, path);
if (ex.isPresent()) {
return FutureUtil.failedFuture(ex.get());
}
- return store.exists(path);
+ return store.exists(path, opts);
}
@Override
- public CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion) {
+ public CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion, Set<Option> opts) {
Optional<MetadataStoreException> ex =
programmedFailure(OperationType.PUT, path);
if (ex.isPresent()) {
return FutureUtil.failedFuture(ex.get());
}
- return store.put(path, value, expectedVersion);
+ return store.put(path, value, expectedVersion, opts);
}
@Override
- public CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion,
- EnumSet<CreateOption> options) {
- Optional<MetadataStoreException> ex =
programmedFailure(OperationType.PUT, path);
- if (ex.isPresent()) {
- return FutureUtil.failedFuture(ex.get());
- }
-
- return store.put(path, value, expectedVersion, options);
- }
-
- @Override
- public CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion) {
+ public CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion, Set<Option> opts) {
Optional<MetadataStoreException> ex =
programmedFailure(OperationType.DELETE, path);
if (ex.isPresent()) {
return FutureUtil.failedFuture(ex.get());
}
- return store.delete(path, expectedVersion);
+ return store.delete(path, expectedVersion, opts);
}
@Override
@@ -158,14 +147,14 @@ public class FaultInjectionMetadataStore implements
MetadataStoreExtended {
}
@Override
- public CompletableFuture<Void> scanChildren(String parentPath,
ScanConsumer consumer) {
+ public CompletableFuture<Void> scanChildren(String parentPath,
ScanConsumer consumer, Set<Option> opts) {
Optional<MetadataStoreException> ex =
programmedFailure(OperationType.SCAN_CHILDREN, parentPath);
if (ex.isPresent()) {
consumer.onError(ex.get());
return FutureUtil.failedFuture(ex.get());
}
- return store.scanChildren(parentPath, consumer);
+ return store.scanChildren(parentPath, consumer, opts);
}
@Override
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 7c7bd31e29b..130eb6ff0da 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.metadata.impl;
import com.google.common.collect.MapMaker;
import java.util.ArrayList;
-import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -45,9 +44,10 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@CustomLog
@@ -190,7 +190,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
@Override
public CompletableFuture<Stat> storePut(String path, byte[] data,
Optional<Long> optExpectedVersion,
- EnumSet<CreateOption> options) {
+ Set<Option> opts) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new
MetadataStoreException.InvalidPathException(path));
}
@@ -198,14 +198,15 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
boolean hasVersion = optExpectedVersion.isPresent();
int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
- if (options.contains(CreateOption.Sequential)) {
+ if (OptionsHelper.isSequential(opts)) {
path += Long.toString(sequentialIdGenerator.getAndIncrement());
}
+ boolean ephemeral = OptionsHelper.isEphemeral(opts);
long now = System.currentTimeMillis();
if (hasVersion && expectedVersion == -1) {
- Value newValue = new Value(0, data, now, now,
options.contains(CreateOption.Ephemeral));
+ Value newValue = new Value(0, data, now, now, ephemeral);
Value existingValue = map.putIfAbsent(path, newValue);
if (existingValue != null) {
return FutureUtils.exception(new BadVersionException(""));
@@ -222,8 +223,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
} else {
long newVersion = existingValue != null ?
existingValue.version + 1 : 0;
long createdTimestamp = existingValue != null ?
existingValue.createdTimestamp : now;
- Value newValue = new Value(newVersion, data,
createdTimestamp, now,
- options.contains(CreateOption.Ephemeral));
+ Value newValue = new Value(newVersion, data,
createdTimestamp, now, ephemeral);
map.put(path, newValue);
NotificationType type =
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 3a7d7abe5d2..17836a7d383 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -29,7 +29,6 @@ import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Comparator;
-import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -53,9 +52,10 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ConfigOptions;
@@ -586,7 +586,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
@Override
protected CompletableFuture<Stat> storePut(String path, byte[] data,
Optional<Long> expectedVersion,
- EnumSet<CreateOption> options) {
+ Set<Option> opts) {
log.debug().attr("path", path).attr("instanceId",
instanceId).log("storePut");
try {
dbStateLock.readLock().lock();
@@ -613,8 +613,8 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
metaValue = new MetaValue();
metaValue.version = 0;
metaValue.createdTimestamp = timestamp;
- metaValue.ephemeral =
options.contains(CreateOption.Ephemeral);
- if (options.contains(CreateOption.Sequential)) {
+ metaValue.ephemeral = OptionsHelper.isEphemeral(opts);
+ if (OptionsHelper.isSequential(opts)) {
path += sequentialIdGenerator.getAndIncrement();
pathBytes = toBytes(path);
transaction.put(SEQUENTIAL_ID_KEY,
toBytes(sequentialIdGenerator.get()));
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 4f609f034bb..5e7c57fac77 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -20,9 +20,9 @@ package org.apache.pulsar.metadata.impl;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -44,8 +44,9 @@ import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore;
@@ -573,18 +574,13 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
}
}
- private static CreateMode getCreateMode(EnumSet<CreateOption> options) {
- if (options.contains(CreateOption.Ephemeral)) {
- if (options.contains(CreateOption.Sequential)) {
- return CreateMode.EPHEMERAL_SEQUENTIAL;
- } else {
- return CreateMode.EPHEMERAL;
- }
- } else if (options.contains(CreateOption.Sequential)) {
- return CreateMode.PERSISTENT_SEQUENTIAL;
- } else {
- return CreateMode.PERSISTENT;
+ private static CreateMode getCreateMode(Set<Option> opts) {
+ boolean ephemeral = OptionsHelper.isEphemeral(opts);
+ boolean sequential = OptionsHelper.isSequential(opts);
+ if (ephemeral) {
+ return sequential ? CreateMode.EPHEMERAL_SEQUENTIAL :
CreateMode.EPHEMERAL;
}
+ return sequential ? CreateMode.PERSISTENT_SEQUENTIAL :
CreateMode.PERSISTENT;
}
public long getZkSessionId() {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index ec6ed23e61d..2d7b56ad650 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.metadata.impl.batching;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -36,8 +36,8 @@ import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Option;
import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
import org.jctools.queues.MessagePassingQueue;
@@ -149,8 +149,8 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
@Override
protected CompletableFuture<Stat> storePut(String path, byte[] data,
Optional<Long> optExpectedVersion,
- EnumSet<CreateOption> options) {
- OpPut op = new OpPut(path, data, optExpectedVersion, options);
+ Set<Option> opts) {
+ OpPut op = new OpPut(path, data, optExpectedVersion, opts);
enqueue(writeOps, op);
return op.getFuture();
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
index dc29fea9559..5cd08419052 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
@@ -18,13 +18,14 @@
*/
package org.apache.pulsar.metadata.impl.batching;
-import java.util.EnumSet;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.AllArgsConstructor;
import lombok.Data;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
@Data
@AllArgsConstructor
@@ -32,13 +33,13 @@ public class OpPut implements MetadataOp {
private final String path;
private final byte[] data;
private final Optional<Long> optExpectedVersion;
- private final EnumSet<CreateOption> options;
+ private final Set<Option> options;
public final long created = System.currentTimeMillis();
private final CompletableFuture<Stat> future = new CompletableFuture<>();
public boolean isEphemeral() {
- return options.contains(CreateOption.Ephemeral);
+ return OptionsHelper.isEphemeral(options);
}
@Override
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 21bbba615d4..3b89a99bb67 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -31,7 +31,6 @@ import io.oxia.client.api.options.ListOption;
import io.oxia.client.api.options.PutOption;
import java.time.Duration;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -51,9 +50,10 @@ import
org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
@CustomLog
@@ -206,15 +206,8 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
@Override
protected CompletableFuture<Stat> storePut(
- String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
- return doStorePut(path, data, optExpectedVersion, options,
Collections.emptyMap());
- }
-
- @Override
- protected CompletableFuture<Stat> storePut(
- String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options,
- Map<String, String> secondaryIndexes) {
- return doStorePut(path, data, optExpectedVersion, options,
secondaryIndexes);
+ String path, byte[] data, Optional<Long> optExpectedVersion,
Set<Option> opts) {
+ return doStorePut(path, data, optExpectedVersion, opts);
}
@Override
@@ -273,21 +266,21 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
}
private CompletableFuture<Stat> doStorePut(
- String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options,
- Map<String, String> secondaryIndexes) {
+ String path, byte[] data, Optional<Long> optExpectedVersion,
Set<Option> opts) {
+ boolean sequential = OptionsHelper.isSequential(opts);
+ boolean ephemeral = OptionsHelper.isEphemeral(opts);
+ Map<String, String> secondaryIndexes =
OptionsHelper.secondaryIndexes(opts);
CompletableFuture<Void> parentsCreated = createParents(path);
return parentsCreated.thenCompose(
__ -> {
var expectedVersion = optExpectedVersion;
- if (expectedVersion.isPresent()
- && expectedVersion.get() != -1L
- && options.contains(CreateOption.Sequential)) {
+ if (expectedVersion.isPresent() && expectedVersion.get()
!= -1L && sequential) {
return CompletableFuture.failedFuture(
new MetadataStoreException(
"Can't have expectedVersion and
Sequential at the same time"));
}
CompletableFuture<String> actualPath;
- if (options.contains(CreateOption.Sequential)) {
+ if (sequential) {
var parent = parent(path);
var parentPath = parent == null ? "/" : parent;
@@ -311,7 +304,7 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
})
.ifPresent(putOptions::add);
- if (options.contains(CreateOption.Ephemeral)) {
+ if (ephemeral) {
putOptions.add(PutOption.AsEphemeralRecord);
}
var parentPath = parent(path);
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index 22c3553bf1c..fdb636d7a7e 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.metadata.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import io.opentelemetry.api.OpenTelemetry;
-import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
import org.apache.pulsar.metadata.api.GetResult;
@@ -31,8 +31,8 @@ import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreProvider;
+import org.apache.pulsar.metadata.api.Option;
import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -121,7 +121,7 @@ public class MetadataStoreFactoryImplTest {
@Override
protected CompletableFuture<Stat> storePut(String path, byte[] data,
Optional<Long> optExpectedVersion,
- EnumSet<CreateOption>
options) {
+ Set<Option> opts) {
return null;
}
}