This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d8a823e7b8f [improve][metadata] Add Option-set API to MetadataStore 
(#25710)
d8a823e7b8f is described below

commit d8a823e7b8ff645c582bbe682da85219e5baa9a9
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 8 10:07:26 2026 -0700

    [improve][metadata] Add Option-set API to MetadataStore (#25710)
---
 .../apache/pulsar/metadata/api/MetadataCache.java  |  36 +++++--
 .../apache/pulsar/metadata/api/MetadataStore.java  | 100 +++++++++++++++++---
 .../org/apache/pulsar/metadata/api/Option.java     |  67 +++++++++++++
 .../apache/pulsar/metadata/api/OptionsHelper.java  |  76 +++++++++++++++
 .../api/extended/MetadataStoreExtended.java        |  64 +++++++++----
 .../metadata/cache/impl/MetadataCacheImpl.java     |  46 ++++-----
 .../metadata/impl/AbstractMetadataStore.java       | 104 +++++++++++++--------
 .../pulsar/metadata/impl/DualMetadataCache.java    |   8 +-
 .../pulsar/metadata/impl/DualMetadataStore.java    |  90 +++++++-----------
 .../metadata/impl/FaultInjectionMetadataStore.java |  43 ++++-----
 .../metadata/impl/LocalMemoryMetadataStore.java    |  14 +--
 .../pulsar/metadata/impl/RocksdbMetadataStore.java |  10 +-
 .../pulsar/metadata/impl/ZKMetadataStore.java      |  22 ++---
 .../batching/AbstractBatchedMetadataStore.java     |   8 +-
 .../pulsar/metadata/impl/batching/OpPut.java       |   9 +-
 .../metadata/impl/oxia/OxiaMetadataStore.java      |  29 +++---
 .../impl/MetadataStoreFactoryImplTest.java         |   6 +-
 17 files changed, 486 insertions(+), 246 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
index 641889b3c18..4a6cd25769e 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCache.java
@@ -19,9 +19,11 @@
 package org.apache.pulsar.metadata.api;
 
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
@@ -172,21 +174,37 @@ public interface MetadataCache<T> {
 
     /**
      * Create or update the value of the given path in the metadata store 
without version comparison.
-     * <p>
-     * This method is equivalent to
-     * {@link 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended#put(String, 
byte[], Optional, EnumSet)} or
-     * {@link MetadataStore#put(String, byte[], Optional)} if the metadata 
store does not support this extended API,
-     * with `Optional.empty()` as the 3rd argument. It means if the path does 
not exist, it will be created. If the path
-     * already exists, the new value will override the old value.
-     * </p>
+     *
+     * <p>If the path does not exist it will be created; if it exists, the new 
value will override the old
+     * value. Recognized options include {@link Option.Ephemeral}, {@link 
Option.Sequential},
+     * {@link Option.SecondaryIndex}, and {@link Option.PartitionKey}. Pass 
{@link Set#of()} for none.
+     *
      * @param path the path of the object in the metadata store
      * @param value the object to put in the metadata store
-     * @param options the create options if the path does not in the metadata 
store
+     * @param opts the set of {@link Option options} for this operation
      * @return the future that indicates if this operation failed, it could 
fail with
      *   {@link java.io.IOException} if the value failed to be serialized
      *   {@link MetadataStoreException} if the metadata store operation failed
      */
-    CompletableFuture<Void> put(String path, T value, EnumSet<CreateOption> 
options);
+    CompletableFuture<Void> put(String path, T value, Set<Option> opts);
+
+    /**
+     * Legacy {@code EnumSet<CreateOption>} form of {@link #put(String, 
Object, Set)}. Translates the
+     * {@link CreateOption} set into the canonical {@code Set<Option>} form 
and forwards to the canonical method.
+     */
+    default CompletableFuture<Void> put(String path, T value, 
EnumSet<CreateOption> options) {
+        if (options == null || options.isEmpty()) {
+            return put(path, value, Set.of());
+        }
+        Set<Option> opts = new HashSet<>();
+        if (options.contains(CreateOption.Ephemeral)) {
+            opts.add(Option.Ephemeral.INSTANCE);
+        }
+        if (options.contains(CreateOption.Sequential)) {
+            opts.add(Option.Sequential.INSTANCE);
+        }
+        return put(path, value, opts);
+    }
 
     /**
      * Delete an object from the metadata store.
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index b2d90800943..71f69a313c2 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.Beta;
 import io.github.merlimat.slog.Logger;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.function.Consumer;
@@ -42,7 +43,8 @@ public interface MetadataStore extends AutoCloseable {
     Logger LOG = Logger.get(MetadataStore.class);
 
     /**
-     * Read the value of one key, identified by the path
+     * Read the value of one key, identified by the path, with a set of {@link 
Option options}, e.g.
+     * {@link Option.PartitionKey} for routing on sharded backends. Pass 
{@link Set#of()} for no options.
      *
      * The async call will return a future that yields a {@link GetResult} 
that will contain the value and the
      * associated {@link Stat} object.
@@ -51,9 +53,18 @@ public interface MetadataStore extends AutoCloseable {
      *
      * @param path
      *            the path of the key to get from the store
+     * @param opts
+     *            the set of {@link Option options} for this operation
      * @return a future to track the async request
      */
-    CompletableFuture<Optional<GetResult>> get(String path);
+    CompletableFuture<Optional<GetResult>> get(String path, Set<Option> opts);
+
+    /**
+     * Like {@link #get(String, Set)} with no options.
+     */
+    default CompletableFuture<Optional<GetResult>> get(String path) {
+        return get(path, Set.of());
+    }
 
 
     /**
@@ -73,9 +84,18 @@ public interface MetadataStore extends AutoCloseable {
      *
      * @param path
      *            the path of the key to get from the store
+     * @param opts
+     *            the set of {@link Option options} for this operation
      * @return a future to track the async request
      */
-    CompletableFuture<List<String>> getChildren(String path);
+    CompletableFuture<List<String>> getChildren(String path, Set<Option> opts);
+
+    /**
+     * Like {@link #getChildren(String, Set)} with no options.
+     */
+    default CompletableFuture<List<String>> getChildren(String path) {
+        return getChildren(path, Set.of());
+    }
 
 
     /**
@@ -83,14 +103,24 @@ public interface MetadataStore extends AutoCloseable {
      *
      * If the path itself does not exist, it will return an empty list.
      *
-     * This method is similar to {@link #getChildren(String)}, but it attempts 
to read directly from
+     * This method is similar to {@link #getChildren(String, Set)}, but it 
attempts to read directly from
      *  the underlying store.
      *
      * @param path
      *            the path of the key to get from the store
+     * @param opts
+     *            the set of {@link Option options} for this operation
      * @return a future to track the async request
      */
-    CompletableFuture<List<String>> getChildrenFromStore(String path);
+    CompletableFuture<List<String>> getChildrenFromStore(String path, 
Set<Option> opts);
+
+    /**
+     * Like {@link #getChildrenFromStore(String, Set)} with no options.
+     */
+    default CompletableFuture<List<String>> getChildrenFromStore(String path) {
+        return getChildrenFromStore(path, Set.of());
+    }
+
     /**
      * Read whether a specific path exists.
      *
@@ -99,19 +129,29 @@ public interface MetadataStore extends AutoCloseable {
      *
      * @param path
      *            the path of the key to check on the store
+     * @param opts
+     *            the set of {@link Option options} for this operation
      * @return a future to track the async request
      */
-    CompletableFuture<Boolean> exists(String path);
+    CompletableFuture<Boolean> exists(String path, Set<Option> opts);
+
+    /**
+     * Like {@link #exists(String, Set)} with no options.
+     */
+    default CompletableFuture<Boolean> exists(String path) {
+        return exists(path, Set.of());
+    }
 
     /**
-     * Put a new value for a given key.
+     * Put a new value for a given key with a set of {@link Option options}, 
e.g.
+     * {@link Option.Ephemeral}, {@link Option.Sequential}, {@link 
Option.SecondaryIndex}, or
+     * {@link Option.PartitionKey}. Pass {@link Set#of()} for no options.
      *
      * The caller can specify an expected version to be atomically checked 
against the current version of the stored
      * data.
      *
      * The future will return the {@link Stat} object associated with the 
newly inserted value.
      *
-     *
      * @param path
      *            the path of the key to delete from the store
      * @param value
@@ -119,11 +159,20 @@ public interface MetadataStore extends AutoCloseable {
      * @param expectedVersion
      *            if present, the version will have to match with the 
currently stored value for the operation to
      *            succeed. Use -1 to enforce a non-existing value.
+     * @param opts
+     *            the set of {@link Option options} for this operation
      * @throws BadVersionException
      *             if the expected version doesn't match the actual version of 
the data
      * @return a future to track the async request
      */
-    CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> 
expectedVersion);
+    CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> 
expectedVersion, Set<Option> opts);
+
+    /**
+     * Like {@link #put(String, byte[], Optional, Set)} with no options.
+     */
+    default CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion) {
+        return put(path, value, expectedVersion, Set.of());
+    }
 
     /**
      *
@@ -132,13 +181,22 @@ public interface MetadataStore extends AutoCloseable {
      * @param expectedVersion
      *            if present, the version will have to match with the 
currently stored value for the operation to
      *            succeed
+     * @param opts
+     *            the set of {@link Option options} for this operation
      * @throws NotFoundException
      *             if the path is not found
      * @throws BadVersionException
      *             if the expected version doesn't match the actual version of 
the data
      * @return a future to track the async request
      */
-    CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion);
+    CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion, Set<Option> opts);
+
+    /**
+     * Like {@link #delete(String, Optional, Set)} with no options.
+     */
+    default CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion) {
+        return delete(path, expectedVersion, Set.of());
+    }
 
     default CompletableFuture<Void> deleteIfExists(String path, Optional<Long> 
expectedVersion) {
         return delete(path, expectedVersion)
@@ -278,15 +336,25 @@ public interface MetadataStore extends AutoCloseable {
      * @param indexName      the secondary index name
      * @param secondaryKey   the secondary key to look up
      * @param fallbackFilter predicate to filter results during fallback scan; 
ignored by native implementations
+     * @param opts           the set of {@link Option options} for this 
operation
      * @return list of matching {@link GetResult} entries
      */
     default CompletableFuture<List<GetResult>> findByIndex(
             String scanPathPrefix, String indexName, String secondaryKey,
-            Predicate<GetResult> fallbackFilter) {
+            Predicate<GetResult> fallbackFilter, Set<Option> opts) {
         return CompletableFuture.failedFuture(
                 new MetadataStoreException("Secondary index queries not 
supported by this store"));
     }
 
+    /**
+     * Like {@link #findByIndex(String, String, String, Predicate, Set)} with 
no options.
+     */
+    default CompletableFuture<List<GetResult>> findByIndex(
+            String scanPathPrefix, String indexName, String secondaryKey,
+            Predicate<GetResult> fallbackFilter) {
+        return findByIndex(scanPathPrefix, indexName, secondaryKey, 
fallbackFilter, Set.of());
+    }
+
     /**
      * Stream all direct children of {@code parentPath} together with their 
values.
      *
@@ -307,13 +375,21 @@ public interface MetadataStore extends AutoCloseable {
      *
      * @param parentPath path whose direct children should be streamed
      * @param consumer   callback that receives records, completion, or an 
error
+     * @param opts       the set of {@link Option options} for this operation
      * @return a future that completes when the scan terminates
      */
-    default CompletableFuture<Void> scanChildren(String parentPath, 
ScanConsumer consumer) {
+    default CompletableFuture<Void> scanChildren(String parentPath, 
ScanConsumer consumer, Set<Option> opts) {
         return CompletableFuture.failedFuture(
                 new MetadataStoreException("scanChildren not supported by this 
store"));
     }
 
+    /**
+     * Like {@link #scanChildren(String, ScanConsumer, Set)} with no options.
+     */
+    default CompletableFuture<Void> scanChildren(String parentPath, 
ScanConsumer consumer) {
+        return scanChildren(parentPath, consumer, Set.of());
+    }
+
     /**
      * Returns the default metadata cache config.
      *
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java
new file mode 100644
index 00000000000..faee2a3a1e4
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+/**
+ * An option attached to a {@link MetadataStore} operation.
+ *
+ * <p>Methods on the store that accept options take {@code Set<Option>}. The 
set of options
+ * actually consulted depends on the operation — for example, {@link 
Ephemeral} only affects
+ * {@code put}, while {@link PartitionKey} applies to every operation. Options 
that aren't
+ * relevant to an operation are silently ignored, so callers can safely thread 
the same option
+ * set through chains of operations.
+ *
+ * <p>New options are added as new subtypes of this sealed interface; existing 
method signatures
+ * don't change.
+ */
+public sealed interface Option {
+
+    /**
+     * Mark a record as ephemeral — it disappears when the session that 
created it ends.
+     * Only consulted by {@code put}. Equivalent to the legacy
+     * {@link org.apache.pulsar.metadata.api.extended.CreateOption#Ephemeral}.
+     */
+    enum Ephemeral implements Option { INSTANCE }
+
+    /**
+     * Use a server-assigned monotonically increasing sequence in the key. 
Only consulted by
+     * {@code put}. Equivalent to the legacy
+     * {@link org.apache.pulsar.metadata.api.extended.CreateOption#Sequential}.
+     */
+    enum Sequential implements Option { INSTANCE }
+
+    /**
+     * Add a secondary-index entry on the record being written. Multiple 
{@code SecondaryIndex}
+     * options can be supplied to the same {@code put}. Backends without 
native secondary-index
+     * support ignore these.
+     *
+     * @param indexName    the secondary-index name
+     * @param secondaryKey the value to index this record under
+     */
+    record SecondaryIndex(String indexName, String secondaryKey) implements 
Option {}
+
+    /**
+     * Routing hint for sharded backends. Records sharing the same {@code 
partitionKey} are
+     * guaranteed to be co-located in the same shard. Backends without 
sharding ignore the hint.
+     * Pass to any operation.
+     *
+     * @param key the partition key (treated opaquely; equality-routed)
+     */
+    record PartitionKey(String key) implements Option {}
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
new file mode 100644
index 00000000000..13c9ff57b8e
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Helpers for extracting typed values from a {@code Set<Option>}.
+ *
+ * <p>{@link MetadataStore} implementations and wrappers use these to consult 
only the options
+ * they care about, ignoring the rest.
+ */
+public final class OptionsHelper {
+
+    private OptionsHelper() {}
+
+    public static boolean isEphemeral(Set<Option> opts) {
+        return opts != null && opts.contains(Option.Ephemeral.INSTANCE);
+    }
+
+    public static boolean isSequential(Set<Option> opts) {
+        return opts != null && opts.contains(Option.Sequential.INSTANCE);
+    }
+
+    /** @return the partition-key value, or {@code null} if no {@link 
Option.PartitionKey} is present. */
+    public static String partitionKey(Set<Option> opts) {
+        if (opts == null) {
+            return null;
+        }
+        for (Option o : opts) {
+            if (o instanceof Option.PartitionKey pk) {
+                return pk.key();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Build the {@code indexName -> secondaryKey} map from any {@link 
Option.SecondaryIndex} entries.
+     * Returns an empty map when no entries are present.
+     */
+    public static Map<String, String> secondaryIndexes(Set<Option> opts) {
+        if (opts == null || opts.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        Map<String, String> map = null;
+        for (Option o : opts) {
+            if (o instanceof Option.SecondaryIndex si) {
+                if (map == null) {
+                    map = new HashMap<>();
+                }
+                map.put(si.indexName(), si.secondaryKey());
+            }
+        }
+        return map == null ? Collections.emptyMap() : map;
+    }
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
index ad5c7a25300..22838154098 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
@@ -19,8 +19,10 @@
 package org.apache.pulsar.metadata.api.extended;
 
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import org.apache.pulsar.metadata.api.MetadataEvent;
@@ -29,6 +31,7 @@ import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
+import org.apache.pulsar.metadata.api.Option;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
 
@@ -44,35 +47,35 @@ public interface MetadataStoreExtended extends 
MetadataStore {
     }
 
     /**
-     * Put a new value for a given key.
-     *
-     * The caller can specify an expected version to be atomically checked 
against the current version of the stored
-     * data.
-     *
-     * The future will return the {@link Stat} object associated with the 
newly inserted value.
+     * Legacy {@code EnumSet<CreateOption>} form of {@link 
MetadataStore#put(String, byte[], Optional, Set)}.
      *
+     * <p>Translates the {@link CreateOption} set into the canonical {@code 
Set<Option>} form and
+     * forwards to {@link MetadataStore#put(String, byte[], Optional, Set)}.
      *
      * @param path
-     *            the path of the key to delete from the store
+     *            the path of the key
      * @param value
-     *            the value to
+     *            the value to store
      * @param expectedVersion
-     *            if present, the version will have to match with the 
currently stored value for the operation to
-     *            succeed. Use -1 to enforce a non-existing value.
+     *            if present, the version will have to match for the operation 
to succeed. Use -1 to enforce a
+     *            non-existing value.
      * @param options
-     *            a set of {@link CreateOption} to use if the the key-value 
pair is being created
+     *            a set of {@link CreateOption} to use if the key-value pair 
is being created
      * @throws BadVersionException
      *             if the expected version doesn't match the actual version of 
the data
      * @return a future to track the async request
      */
-    CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> 
expectedVersion,
-            EnumSet<CreateOption> options);
+    default CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion,
+            EnumSet<CreateOption> options) {
+        return put(path, value, expectedVersion, toOptions(options, null));
+    }
 
     /**
-     * Put a new value for a given key with secondary index associations.
+     * Legacy {@code EnumSet<CreateOption>} + {@code Map<String,String>} form 
of
+     * {@link MetadataStore#put(String, byte[], Optional, Set)}.
      *
-     * <p>Secondary indexes are hints: stores that don't support them simply 
ignore them
-     * and delegate to the regular {@link #put(String, byte[], Optional, 
EnumSet)} method.
+     * <p>Translates the inputs into the canonical {@code Set<Option>} form 
and forwards to
+     * {@link MetadataStore#put(String, byte[], Optional, Set)}.
      *
      * @param path              the path of the key
      * @param value             the value to store
@@ -84,7 +87,34 @@ public interface MetadataStoreExtended extends MetadataStore 
{
      */
     default CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion,
             EnumSet<CreateOption> options, Map<String, String> 
secondaryIndexes) {
-        return put(path, value, expectedVersion, options);
+        return put(path, value, expectedVersion, toOptions(options, 
secondaryIndexes));
+    }
+
+    /**
+     * Translate the legacy {@code EnumSet<CreateOption>} + {@code 
Map<String,String>} form into the
+     * canonical {@code Set<Option>} form.
+     */
+    private static Set<Option> toOptions(EnumSet<CreateOption> options, 
Map<String, String> secondaryIndexes) {
+        boolean hasOptions = options != null && !options.isEmpty();
+        boolean hasIndexes = secondaryIndexes != null && 
!secondaryIndexes.isEmpty();
+        if (!hasOptions && !hasIndexes) {
+            return Set.of();
+        }
+        Set<Option> result = new HashSet<>();
+        if (hasOptions) {
+            if (options.contains(CreateOption.Ephemeral)) {
+                result.add(Option.Ephemeral.INSTANCE);
+            }
+            if (options.contains(CreateOption.Sequential)) {
+                result.add(Option.Sequential.INSTANCE);
+            }
+        }
+        if (hasIndexes) {
+            for (Map.Entry<String, String> e : secondaryIndexes.entrySet()) {
+                result.add(new Option.SecondaryIndex(e.getKey(), 
e.getValue()));
+            }
+        }
+        return result;
     }
 
     /**
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index cab2c3a2e0a..a0a23dbd10e 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -28,10 +28,11 @@ import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collections;
-import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -56,7 +57,7 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.Notification;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.Option;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 
@@ -263,9 +264,8 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                     }
 
                     final T finalNewValue = newValueObj;
-                    return putWithIndexes(path, newValue, 
Optional.of(expectedVersion),
-                                    EnumSet.noneOf(CreateOption.class),
-                                    indexExtractor.apply(finalNewValue))
+                    return store.put(path, newValue, 
Optional.of(expectedVersion),
+                                    
toOptions(indexExtractor.apply(finalNewValue)))
                             .thenAccept(__ -> refresh(path))
                             .thenApply(__ -> finalNewValue);
                 }, executor), path);
@@ -292,9 +292,8 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
     public CompletableFuture<Void> create(String path, T value,
                                           Function<T, Map<String, String>> 
indexExtractor) {
         final var future = new CompletableFuture<Void>();
-        serialize(path, value).thenCompose(content -> putWithIndexes(
-                        path, content, Optional.of(-1L), 
EnumSet.noneOf(CreateOption.class),
-                        indexExtractor.apply(value)))
+        serialize(path, value).thenCompose(content -> store.put(
+                        path, content, Optional.of(-1L), 
toOptions(indexExtractor.apply(value))))
             // Make sure we have the value cached before the operation is 
completed
             // In addition to caching the value, we need to add a watch on the 
path,
             // so when/if it changes on any other node, we are notified and we 
can
@@ -313,28 +312,23 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
         return future;
     }
 
-    /**
-     * Route writes through the extended store API when secondary indexes are 
present and
-     * the underlying store supports them. Falls back to the base {@code put} 
otherwise.
-     */
-    private CompletableFuture<?> putWithIndexes(String path, byte[] content, 
Optional<Long> version,
-                                                EnumSet<CreateOption> options,
-                                                Map<String, String> indexes) {
-        if (storeExtended != null && indexes != null && !indexes.isEmpty()) {
-            return storeExtended.put(path, content, version, options, indexes);
+    /** Build a {@link Set} of {@link Option.SecondaryIndex} entries from an 
{@code indexName -> key} map. */
+    private static Set<Option> toOptions(Map<String, String> indexes) {
+        if (indexes == null || indexes.isEmpty()) {
+            return Set.of();
         }
-        return store.put(path, content, version);
+        Set<Option> opts = new HashSet<>();
+        for (Map.Entry<String, String> e : indexes.entrySet()) {
+            opts.add(new Option.SecondaryIndex(e.getKey(), e.getValue()));
+        }
+        return opts;
     }
 
     @Override
-    public CompletableFuture<Void> put(String path, T value, 
EnumSet<CreateOption> options) {
-        return serialize(path, value).thenCompose(bytes -> {
-            if (storeExtended != null) {
-                return storeExtended.put(path, bytes, Optional.empty(), 
options);
-            } else {
-                return store.put(path, bytes, Optional.empty());
-            }
-        }).thenAccept(__ -> {
+    public CompletableFuture<Void> put(String path, T value, Set<Option> opts) 
{
+        return serialize(path, value).thenCompose(bytes ->
+                store.put(path, bytes, Optional.empty(), opts)
+        ).thenAccept(__ -> {
             log.debug().attr("path", path).log("Refreshing path after put 
operation");
             refresh(path);
         });
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 02086898e9f..c7b9f2b582f 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -34,10 +34,8 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import io.opentelemetry.api.OpenTelemetry;
 import java.time.Instant;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -70,6 +68,8 @@ import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
 import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
@@ -103,6 +103,12 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     protected abstract CompletableFuture<Boolean> existsFromStore(String path);
 
+    // Re-declare the legacy no-opts public methods as abstract here so that 
backends keep providing them
+    // as the concrete implementation hooks. The canonical {@code Set<Option>} 
forms in this class delegate
+    // to these via virtual dispatch.
+    @Override
+    public abstract CompletableFuture<List<String>> 
getChildrenFromStore(String path);
+
     protected MetadataNodeSizeStats nodeSizeStats;
 
     protected AbstractMetadataStore(
@@ -231,7 +237,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
             CompletableFuture<?> updateResult = (event.getType() == 
NotificationType.Deleted)
                     ? deleteInternal(event.getPath(), Optional.empty())
                     : putInternal(event.getPath(), event.getValue(),
-                    Optional.ofNullable(event.getExpectedVersion()), options);
+                    Optional.ofNullable(event.getExpectedVersion()), 
fromLegacyCreateOptions(options));
             updateResult.thenApply(stat -> {
                 log.debug().attr("path", event.getPath()).log("successfully 
updated");
                 return result.complete(null);
@@ -339,7 +345,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     }
 
     @Override
-    public CompletableFuture<Optional<GetResult>> get(String path) {
+    public CompletableFuture<Optional<GetResult>> get(String path, Set<Option> 
opts) {
         if (isClosed()) {
             return alreadyClosedFailedFuture();
         }
@@ -363,12 +369,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     protected abstract CompletableFuture<Optional<GetResult>> storeGet(String 
path);
 
     @Override
-    public CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion) {
-        return put(path, value, expectedVersion, 
EnumSet.noneOf(CreateOption.class));
-    }
-
-    @Override
-    public final CompletableFuture<List<String>> getChildren(String path) {
+    public final CompletableFuture<List<String>> getChildren(String path, 
Set<Option> opts) {
         if (isClosed()) {
             return alreadyClosedFailedFuture();
         }
@@ -383,7 +384,12 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     }
 
     @Override
-    public final CompletableFuture<Boolean> exists(String path) {
+    public CompletableFuture<List<String>> getChildrenFromStore(String path, 
Set<Option> opts) {
+        return getChildrenFromStore(path);
+    }
+
+    @Override
+    public final CompletableFuture<Boolean> exists(String path, Set<Option> 
opts) {
         if (isClosed()) {
             return alreadyClosedFailedFuture();
         }
@@ -445,7 +451,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     protected abstract CompletableFuture<Void> storeDelete(String path, 
Optional<Long> expectedVersion);
 
     @Override
-    public final CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion) {
+    public final CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion, Set<Option> opts) {
         log.info().attr("path", path).attr("expectedVersion", 
expectedVersion).log("Deleting path");
         if (isClosed()) {
             return alreadyClosedFailedFuture();
@@ -512,19 +518,18 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                 });
     }
 
+    /**
+     * Backend hook for writing a value. Implementations consume {@link 
Option} entries from {@code opts}
+     * via {@link OptionsHelper} (e.g. {@link OptionsHelper#isEphemeral}, 
{@link OptionsHelper#isSequential},
+     * {@link OptionsHelper#secondaryIndexes}, {@link 
OptionsHelper#partitionKey}).
+     */
     protected abstract CompletableFuture<Stat> storePut(String path, byte[] 
data, Optional<Long> optExpectedVersion,
-                                                        EnumSet<CreateOption> 
options);
-
-    protected CompletableFuture<Stat> storePut(String path, byte[] data, 
Optional<Long> optExpectedVersion,
-                                               EnumSet<CreateOption> options,
-                                               Map<String, String> 
secondaryIndexes) {
-        return storePut(path, data, optExpectedVersion, options);
-    }
+                                                        Set<Option> opts);
 
     @Override
     public CompletableFuture<List<GetResult>> findByIndex(
             String scanPathPrefix, String indexName, String secondaryKey,
-            Predicate<GetResult> fallbackFilter) {
+            Predicate<GetResult> fallbackFilter, Set<Option> opts) {
         if (isClosed()) {
             return alreadyClosedFailedFuture();
         }
@@ -532,7 +537,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     }
 
     @Override
-    public CompletableFuture<Void> scanChildren(String parentPath, 
ScanConsumer consumer) {
+    public CompletableFuture<Void> scanChildren(String parentPath, 
ScanConsumer consumer, Set<Option> opts) {
         if (isClosed()) {
             CompletableFuture<Void> failed = alreadyClosedFailedFuture();
             failed.whenComplete((__, ex) -> {
@@ -601,13 +606,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     @Override
     public final CompletableFuture<Stat> put(String path, byte[] data, 
Optional<Long> optExpectedVersion,
-            EnumSet<CreateOption> options) {
-        return put(path, data, optExpectedVersion, options, 
Collections.emptyMap());
-    }
-
-    @Override
-    public final CompletableFuture<Stat> put(String path, byte[] data, 
Optional<Long> optExpectedVersion,
-            EnumSet<CreateOption> options, Map<String, String> 
secondaryIndexes) {
+            Set<Option> opts) {
         if (isClosed()) {
             return alreadyClosedFailedFuture();
         }
@@ -616,15 +615,14 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
             metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - 
start);
             return FutureUtil.failedFuture(new 
MetadataStoreException.InvalidPathException(path));
         }
-        HashSet<CreateOption> ops = new HashSet<>(options);
         if (getMetadataEventSynchronizer().isPresent()) {
             Long version = optExpectedVersion.isPresent() && 
optExpectedVersion.get() < 0 ? null
                     : optExpectedVersion.orElse(null);
-            MetadataEvent event = new MetadataEvent(path, data, ops, version,
+            MetadataEvent event = new MetadataEvent(path, data, 
toLegacyCreateOptions(opts), version,
                     Instant.now().toEpochMilli(), 
getMetadataEventSynchronizer().get().getClusterName(),
                     NotificationType.Modified);
             return getMetadataEventSynchronizer().get().notify(event)
-                    .thenCompose(__ -> putInternal(path, data, 
optExpectedVersion, options, secondaryIndexes))
+                    .thenCompose(__ -> putInternal(path, data, 
optExpectedVersion, opts))
                     .whenComplete((v, t) -> {
                         if (t != null) {
                             
metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
@@ -634,7 +632,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                         }
                     });
         } else {
-            return putInternal(path, data, optExpectedVersion, options, 
secondaryIndexes)
+            return putInternal(path, data, optExpectedVersion, opts)
                     .whenComplete((v, t) -> {
                         if (t != null) {
                             
metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
@@ -647,17 +645,45 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     }
 
-    public final CompletableFuture<Stat> putInternal(String path, byte[] data, 
Optional<Long> optExpectedVersion,
-            Set<CreateOption> options) {
-        return putInternal(path, data, optExpectedVersion, options, 
Collections.emptyMap());
+    /**
+     * Translate {@link Option.Ephemeral}/{@link Option.Sequential} entries 
from {@code opts} into the
+     * legacy {@link CreateOption} set carried by {@link MetadataEvent} for 
sync replication. Other
+     * {@link Option} kinds (e.g. {@link Option.SecondaryIndex}, {@link 
Option.PartitionKey}) are not
+     * propagated through the legacy event payload.
+     */
+    private static HashSet<CreateOption> toLegacyCreateOptions(Set<Option> 
opts) {
+        HashSet<CreateOption> result = new HashSet<>();
+        if (OptionsHelper.isEphemeral(opts)) {
+            result.add(CreateOption.Ephemeral);
+        }
+        if (OptionsHelper.isSequential(opts)) {
+            result.add(CreateOption.Sequential);
+        }
+        return result;
+    }
+
+    /**
+     * Translate a legacy {@link CreateOption} set (carried by replicated 
{@link MetadataEvent} payloads)
+     * into the canonical {@code Set<Option>} form consumed by the {@code 
storePut} hook.
+     */
+    private static Set<Option> fromLegacyCreateOptions(Set<CreateOption> 
options) {
+        if (options == null || options.isEmpty()) {
+            return Set.of();
+        }
+        HashSet<Option> result = new HashSet<>();
+        if (options.contains(CreateOption.Ephemeral)) {
+            result.add(Option.Ephemeral.INSTANCE);
+        }
+        if (options.contains(CreateOption.Sequential)) {
+            result.add(Option.Sequential.INSTANCE);
+        }
+        return result;
     }
 
     public final CompletableFuture<Stat> putInternal(String path, byte[] data, 
Optional<Long> optExpectedVersion,
-            Set<CreateOption> options, Map<String, String> secondaryIndexes) {
-        var enumOptions =
-                (options != null && !options.isEmpty()) ? 
EnumSet.copyOf(options) : EnumSet.noneOf(CreateOption.class);
+            Set<Option> opts) {
         // Ensure caches are invalidated before the operation is confirmed
-        return storePut(path, data, optExpectedVersion, enumOptions, 
secondaryIndexes)
+        return storePut(path, data, optExpectedVersion, opts == null ? 
Set.of() : opts)
                 .thenApply(stat -> {
                     NotificationType type = stat.isFirstVersion() ? 
NotificationType.Created
                             : NotificationType.Modified;
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataCache.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataCache.java
index 47cc47448b4..45ad7fc51f7 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataCache.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataCache.java
@@ -19,9 +19,9 @@
 package org.apache.pulsar.metadata.impl;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -30,7 +30,7 @@ import org.apache.pulsar.metadata.api.CacheGetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataCacheConfig;
 import org.apache.pulsar.metadata.api.MetadataSerde;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.Option;
 
 public class DualMetadataCache<T> implements MetadataCache<T> {
     private final DualMetadataStore dualMetadataStore;
@@ -106,8 +106,8 @@ public class DualMetadataCache<T> implements 
MetadataCache<T> {
     }
 
     @Override
-    public CompletableFuture<Void> put(String path, T value, 
EnumSet<CreateOption> options) {
-        return metadataCache.get().put(path, value, options);
+    public CompletableFuture<Void> put(String path, T value, Set<Option> opts) 
{
+        return metadataCache.get().put(path, value, opts);
     }
 
     @Override
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index bcc6b7226da..55ab3f74055 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -23,7 +23,6 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.time.Duration;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -51,6 +50,8 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
 import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
 import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
@@ -262,124 +263,97 @@ public class DualMetadataStore implements 
MetadataStoreExtended {
     }
 
     @Override
-    public CompletableFuture<Optional<GetResult>> get(String path) {
+    public CompletableFuture<Optional<GetResult>> get(String path, Set<Option> 
opts) {
         return switch (migrationState.getPhase()) {
-            case NOT_STARTED, PREPARATION, COPYING, FAILED -> 
sourceStore.get(path);
-            case COMPLETED -> targetStore.get(path);
+            case NOT_STARTED, PREPARATION, COPYING, FAILED -> 
sourceStore.get(path, opts);
+            case COMPLETED -> targetStore.get(path, opts);
         };
     }
 
     @Override
-    public CompletableFuture<List<String>> getChildren(String path) {
+    public CompletableFuture<List<String>> getChildren(String path, 
Set<Option> opts) {
         return switch (migrationState.getPhase()) {
-            case NOT_STARTED, PREPARATION, COPYING, FAILED -> 
sourceStore.getChildren(path);
-            case COMPLETED -> targetStore.getChildren(path);
+            case NOT_STARTED, PREPARATION, COPYING, FAILED -> 
sourceStore.getChildren(path, opts);
+            case COMPLETED -> targetStore.getChildren(path, opts);
         };
     }
 
     @Override
-    public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+    public CompletableFuture<List<String>> getChildrenFromStore(String path, 
Set<Option> opts) {
         return switch (migrationState.getPhase()) {
-            case NOT_STARTED, PREPARATION, COPYING, FAILED -> 
sourceStore.getChildrenFromStore(path);
-            case COMPLETED -> targetStore.getChildrenFromStore(path);
+            case NOT_STARTED, PREPARATION, COPYING, FAILED -> 
sourceStore.getChildrenFromStore(path, opts);
+            case COMPLETED -> targetStore.getChildrenFromStore(path, opts);
         };
     }
 
     @Override
-    public CompletableFuture<Boolean> exists(String path) {
+    public CompletableFuture<List<String>> getChildrenFromStore(String path) {
         return switch (migrationState.getPhase()) {
-            case NOT_STARTED, PREPARATION, COPYING, FAILED -> 
sourceStore.exists(path);
-            case COMPLETED -> targetStore.exists(path);
+            case NOT_STARTED, PREPARATION, COPYING, FAILED -> 
sourceStore.getChildrenFromStore(path);
+            case COMPLETED -> targetStore.getChildrenFromStore(path);
         };
     }
 
     @Override
-    public CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion) {
-        return put(path, value, expectedVersion, 
EnumSet.noneOf(CreateOption.class));
+    public CompletableFuture<Boolean> exists(String path, Set<Option> opts) {
+        return switch (migrationState.getPhase()) {
+            case NOT_STARTED, PREPARATION, COPYING, FAILED -> 
sourceStore.exists(path, opts);
+            case COMPLETED -> targetStore.exists(path, opts);
+        };
     }
 
     @Override
     public CompletableFuture<List<GetResult>> findByIndex(
             String scanPathPrefix, String indexName, String secondaryKey,
-            Predicate<GetResult> fallbackFilter) {
+            Predicate<GetResult> fallbackFilter, Set<Option> opts) {
         return switch (migrationState.getPhase()) {
             case NOT_STARTED, PREPARATION, COPYING, FAILED ->
-                    sourceStore.findByIndex(scanPathPrefix, indexName, 
secondaryKey, fallbackFilter);
+                    sourceStore.findByIndex(scanPathPrefix, indexName, 
secondaryKey, fallbackFilter, opts);
             case COMPLETED ->
-                    targetStore.findByIndex(scanPathPrefix, indexName, 
secondaryKey, fallbackFilter);
+                    targetStore.findByIndex(scanPathPrefix, indexName, 
secondaryKey, fallbackFilter, opts);
         };
     }
 
     @Override
-    public CompletableFuture<Void> scanChildren(String parentPath, 
ScanConsumer consumer) {
+    public CompletableFuture<Void> scanChildren(String parentPath, 
ScanConsumer consumer, Set<Option> opts) {
         return switch (migrationState.getPhase()) {
             case NOT_STARTED, PREPARATION, COPYING, FAILED ->
-                    sourceStore.scanChildren(parentPath, consumer);
+                    sourceStore.scanChildren(parentPath, consumer, opts);
             case COMPLETED ->
-                    targetStore.scanChildren(parentPath, consumer);
+                    targetStore.scanChildren(parentPath, consumer, opts);
         };
     }
 
     @Override
-    public CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion,
-                                       EnumSet<CreateOption> options, 
Map<String, String> secondaryIndexes) {
-        switch (migrationState.getPhase()) {
-            case NOT_STARTED, FAILED -> {
-                if (options.contains(CreateOption.Ephemeral)) {
-                    localEphemeralPaths.add(path);
-                }
-                pendingSourceWrites.incrementAndGet();
-                var future = sourceStore.put(path, value, expectedVersion, 
options, secondaryIndexes);
-                future.whenComplete((result, e) -> 
pendingSourceWrites.decrementAndGet());
-                return future;
-            }
-            case PREPARATION, COPYING -> {
-                return 
CompletableFuture.failedFuture(READ_ONLY_STATE_EXCEPTION);
-            }
-            case COMPLETED -> {
-                return targetStore.put(path, value, expectedVersion, options, 
secondaryIndexes);
-            }
-            default -> throw new IllegalStateException("Invalid phase " + 
migrationState.getPhase());
-        }
-    }
-
-    @Override
-    public CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion,
-                                       EnumSet<CreateOption> options) {
+    public CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion, Set<Option> opts) {
         switch (migrationState.getPhase()) {
             case NOT_STARTED, FAILED -> {
-                // Track ephemeral nodes
-                if (options.contains(CreateOption.Ephemeral)) {
+                if (OptionsHelper.isEphemeral(opts)) {
                     localEphemeralPaths.add(path);
                 }
-
-                // Track pending writes
                 pendingSourceWrites.incrementAndGet();
-                var future = sourceStore.put(path, value, expectedVersion, 
options);
+                var future = sourceStore.put(path, value, expectedVersion, 
opts);
                 future.whenComplete((result, e) -> 
pendingSourceWrites.decrementAndGet());
                 return future;
             }
-
             case PREPARATION, COPYING -> {
                 return 
CompletableFuture.failedFuture(READ_ONLY_STATE_EXCEPTION);
             }
-
             case COMPLETED -> {
-                return targetStore.put(path, value, expectedVersion, options);
+                return targetStore.put(path, value, expectedVersion, opts);
             }
-
             default -> throw new IllegalStateException("Invalid phase " + 
migrationState.getPhase());
         }
     }
 
     @Override
-    public CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion) {
+    public CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion, Set<Option> opts) {
         switch (migrationState.getPhase()) {
             case NOT_STARTED, FAILED -> {
                 localEphemeralPaths.remove(path);
 
                 pendingSourceWrites.incrementAndGet();
-                var future = sourceStore.delete(path, expectedVersion);
+                var future = sourceStore.delete(path, expectedVersion, opts);
                 future.whenComplete((result, e) -> 
pendingSourceWrites.decrementAndGet());
                 return future;
             }
@@ -389,7 +363,7 @@ public class DualMetadataStore implements 
MetadataStoreExtended {
             }
 
             case COMPLETED -> {
-                return targetStore.delete(path, expectedVersion);
+                return targetStore.delete(path, expectedVersion, opts);
             }
 
             default -> throw new IllegalStateException("Invalid phase " + 
migrationState.getPhase());
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index de988b6b1c8..6fbd22e3888 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -19,9 +19,9 @@
 package org.apache.pulsar.metadata.impl;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReference;
@@ -38,9 +38,9 @@ import org.apache.pulsar.metadata.api.MetadataEvent;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.Option;
 import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
@@ -77,74 +77,63 @@ public class FaultInjectionMetadataStore implements 
MetadataStoreExtended {
     }
 
     @Override
-    public CompletableFuture<Optional<GetResult>> get(String path) {
+    public CompletableFuture<Optional<GetResult>> get(String path, Set<Option> 
opts) {
         Optional<MetadataStoreException> ex = 
programmedFailure(OperationType.GET, path);
         if (ex.isPresent()) {
             return FutureUtil.failedFuture(ex.get());
         }
 
-        return store.get(path);
+        return store.get(path, opts);
     }
 
     @Override
-    public CompletableFuture<List<String>> getChildren(String path) {
+    public CompletableFuture<List<String>> getChildren(String path, 
Set<Option> opts) {
         Optional<MetadataStoreException> ex = 
programmedFailure(OperationType.GET_CHILDREN, path);
         if (ex.isPresent()) {
             return FutureUtil.failedFuture(ex.get());
         }
 
-        return store.getChildren(path);
+        return store.getChildren(path, opts);
     }
 
     @Override
-    public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+    public CompletableFuture<List<String>> getChildrenFromStore(String path, 
Set<Option> opts) {
         Optional<MetadataStoreException> ex = 
programmedFailure(OperationType.GET_CHILDREN, path);
         if (ex.isPresent()) {
             return FutureUtil.failedFuture(ex.get());
         }
 
-        return store.getChildrenFromStore(path);
+        return store.getChildrenFromStore(path, opts);
     }
 
     @Override
-    public CompletableFuture<Boolean> exists(String path) {
+    public CompletableFuture<Boolean> exists(String path, Set<Option> opts) {
         Optional<MetadataStoreException> ex = 
programmedFailure(OperationType.EXISTS, path);
         if (ex.isPresent()) {
             return FutureUtil.failedFuture(ex.get());
         }
 
-        return store.exists(path);
+        return store.exists(path, opts);
     }
 
     @Override
-    public CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion) {
+    public CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion, Set<Option> opts) {
         Optional<MetadataStoreException> ex = 
programmedFailure(OperationType.PUT, path);
         if (ex.isPresent()) {
             return FutureUtil.failedFuture(ex.get());
         }
 
-        return store.put(path, value, expectedVersion);
+        return store.put(path, value, expectedVersion, opts);
     }
 
     @Override
-    public CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion,
-                                       EnumSet<CreateOption> options) {
-        Optional<MetadataStoreException> ex = 
programmedFailure(OperationType.PUT, path);
-        if (ex.isPresent()) {
-            return FutureUtil.failedFuture(ex.get());
-        }
-
-        return store.put(path, value, expectedVersion, options);
-    }
-
-    @Override
-    public CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion) {
+    public CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion, Set<Option> opts) {
         Optional<MetadataStoreException> ex = 
programmedFailure(OperationType.DELETE, path);
         if (ex.isPresent()) {
             return FutureUtil.failedFuture(ex.get());
         }
 
-        return store.delete(path, expectedVersion);
+        return store.delete(path, expectedVersion, opts);
     }
 
     @Override
@@ -158,14 +147,14 @@ public class FaultInjectionMetadataStore implements 
MetadataStoreExtended {
     }
 
     @Override
-    public CompletableFuture<Void> scanChildren(String parentPath, 
ScanConsumer consumer) {
+    public CompletableFuture<Void> scanChildren(String parentPath, 
ScanConsumer consumer, Set<Option> opts) {
         Optional<MetadataStoreException> ex = 
programmedFailure(OperationType.SCAN_CHILDREN, parentPath);
         if (ex.isPresent()) {
             consumer.onError(ex.get());
             return FutureUtil.failedFuture(ex.get());
         }
 
-        return store.scanChildren(parentPath, consumer);
+        return store.scanChildren(parentPath, consumer, opts);
     }
 
     @Override
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 7c7bd31e29b..130eb6ff0da 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.metadata.impl;
 
 import com.google.common.collect.MapMaker;
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -45,9 +44,10 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.MetadataStoreProvider;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
 import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 @CustomLog
@@ -190,7 +190,7 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
 
     @Override
     public CompletableFuture<Stat> storePut(String path, byte[] data, 
Optional<Long> optExpectedVersion,
-                                            EnumSet<CreateOption> options) {
+                                            Set<Option> opts) {
         if (!isValidPath(path)) {
             return FutureUtil.failedFuture(new 
MetadataStoreException.InvalidPathException(path));
         }
@@ -198,14 +198,15 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
             boolean hasVersion = optExpectedVersion.isPresent();
             int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
 
-            if (options.contains(CreateOption.Sequential)) {
+            if (OptionsHelper.isSequential(opts)) {
                 path += Long.toString(sequentialIdGenerator.getAndIncrement());
             }
+            boolean ephemeral = OptionsHelper.isEphemeral(opts);
 
             long now = System.currentTimeMillis();
 
             if (hasVersion && expectedVersion == -1) {
-                Value newValue = new Value(0, data, now, now, 
options.contains(CreateOption.Ephemeral));
+                Value newValue = new Value(0, data, now, now, ephemeral);
                 Value existingValue = map.putIfAbsent(path, newValue);
                 if (existingValue != null) {
                     return FutureUtils.exception(new BadVersionException(""));
@@ -222,8 +223,7 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
                 } else {
                     long newVersion = existingValue != null ? 
existingValue.version + 1 : 0;
                     long createdTimestamp = existingValue != null ? 
existingValue.createdTimestamp : now;
-                    Value newValue = new Value(newVersion, data, 
createdTimestamp, now,
-                            options.contains(CreateOption.Ephemeral));
+                    Value newValue = new Value(newVersion, data, 
createdTimestamp, now, ephemeral);
                     map.put(path, newValue);
 
                     NotificationType type =
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 3a7d7abe5d2..17836a7d383 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -29,7 +29,6 @@ import java.nio.file.attribute.PosixFilePermission;
 import java.nio.file.attribute.PosixFilePermissions;
 import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -53,9 +52,10 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreProvider;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
 import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ConfigOptions;
@@ -586,7 +586,7 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
 
     @Override
     protected CompletableFuture<Stat> storePut(String path, byte[] data, 
Optional<Long> expectedVersion,
-                                               EnumSet<CreateOption> options) {
+                                               Set<Option> opts) {
         log.debug().attr("path", path).attr("instanceId", 
instanceId).log("storePut");
         try {
             dbStateLock.readLock().lock();
@@ -613,8 +613,8 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
                     metaValue = new MetaValue();
                     metaValue.version = 0;
                     metaValue.createdTimestamp = timestamp;
-                    metaValue.ephemeral = 
options.contains(CreateOption.Ephemeral);
-                    if (options.contains(CreateOption.Sequential)) {
+                    metaValue.ephemeral = OptionsHelper.isEphemeral(opts);
+                    if (OptionsHelper.isSequential(opts)) {
                         path += sequentialIdGenerator.getAndIncrement();
                         pathBytes = toBytes(path);
                         transaction.put(SEQUENTIAL_ID_KEY, 
toBytes(sequentialIdGenerator.get()));
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 4f609f034bb..5e7c57fac77 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -20,9 +20,9 @@ package org.apache.pulsar.metadata.impl;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -44,8 +44,9 @@ import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
 import org.apache.pulsar.metadata.api.MetadataStoreProvider;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
 import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore;
@@ -573,18 +574,13 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
         }
     }
 
-    private static CreateMode getCreateMode(EnumSet<CreateOption> options) {
-        if (options.contains(CreateOption.Ephemeral)) {
-            if (options.contains(CreateOption.Sequential)) {
-                return CreateMode.EPHEMERAL_SEQUENTIAL;
-            } else {
-                return CreateMode.EPHEMERAL;
-            }
-        } else if (options.contains(CreateOption.Sequential)) {
-            return CreateMode.PERSISTENT_SEQUENTIAL;
-        } else {
-            return CreateMode.PERSISTENT;
+    private static CreateMode getCreateMode(Set<Option> opts) {
+        boolean ephemeral = OptionsHelper.isEphemeral(opts);
+        boolean sequential = OptionsHelper.isSequential(opts);
+        if (ephemeral) {
+            return sequential ? CreateMode.EPHEMERAL_SEQUENTIAL : 
CreateMode.EPHEMERAL;
         }
+        return sequential ? CreateMode.PERSISTENT_SEQUENTIAL : 
CreateMode.PERSISTENT;
     }
 
     public long getZkSessionId() {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index ec6ed23e61d..2d7b56ad650 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.metadata.impl.batching;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -36,8 +36,8 @@ import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Option;
 import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
 import org.jctools.queues.MessagePassingQueue;
@@ -149,8 +149,8 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
 
     @Override
     protected CompletableFuture<Stat> storePut(String path, byte[] data, 
Optional<Long> optExpectedVersion,
-                                               EnumSet<CreateOption> options) {
-        OpPut op = new OpPut(path, data, optExpectedVersion, options);
+                                               Set<Option> opts) {
+        OpPut op = new OpPut(path, data, optExpectedVersion, opts);
         enqueue(writeOps, op);
         return op.getFuture();
     }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
index dc29fea9559..5cd08419052 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pulsar.metadata.impl.batching;
 
-import java.util.EnumSet;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
 import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
 
 @Data
 @AllArgsConstructor
@@ -32,13 +33,13 @@ public class OpPut implements MetadataOp {
     private final String path;
     private final byte[] data;
     private final Optional<Long> optExpectedVersion;
-    private final EnumSet<CreateOption> options;
+    private final Set<Option> options;
 
     public final long created = System.currentTimeMillis();
     private final CompletableFuture<Stat> future = new CompletableFuture<>();
 
     public boolean isEphemeral() {
-        return options.contains(CreateOption.Ephemeral);
+        return OptionsHelper.isEphemeral(options);
     }
 
     @Override
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 21bbba615d4..3b89a99bb67 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -31,7 +31,6 @@ import io.oxia.client.api.options.ListOption;
 import io.oxia.client.api.options.PutOption;
 import java.time.Duration;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -51,9 +50,10 @@ import 
org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.OptionsHelper;
 import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 
 @CustomLog
@@ -206,15 +206,8 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
 
     @Override
     protected CompletableFuture<Stat> storePut(
-            String path, byte[] data, Optional<Long> optExpectedVersion, 
EnumSet<CreateOption> options) {
-        return doStorePut(path, data, optExpectedVersion, options, 
Collections.emptyMap());
-    }
-
-    @Override
-    protected CompletableFuture<Stat> storePut(
-            String path, byte[] data, Optional<Long> optExpectedVersion, 
EnumSet<CreateOption> options,
-            Map<String, String> secondaryIndexes) {
-        return doStorePut(path, data, optExpectedVersion, options, 
secondaryIndexes);
+            String path, byte[] data, Optional<Long> optExpectedVersion, 
Set<Option> opts) {
+        return doStorePut(path, data, optExpectedVersion, opts);
     }
 
     @Override
@@ -273,21 +266,21 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
     }
 
     private CompletableFuture<Stat> doStorePut(
-            String path, byte[] data, Optional<Long> optExpectedVersion, 
EnumSet<CreateOption> options,
-            Map<String, String> secondaryIndexes) {
+            String path, byte[] data, Optional<Long> optExpectedVersion, 
Set<Option> opts) {
+        boolean sequential = OptionsHelper.isSequential(opts);
+        boolean ephemeral = OptionsHelper.isEphemeral(opts);
+        Map<String, String> secondaryIndexes = 
OptionsHelper.secondaryIndexes(opts);
         CompletableFuture<Void> parentsCreated = createParents(path);
         return parentsCreated.thenCompose(
                 __ -> {
                     var expectedVersion = optExpectedVersion;
-                    if (expectedVersion.isPresent()
-                            && expectedVersion.get() != -1L
-                            && options.contains(CreateOption.Sequential)) {
+                    if (expectedVersion.isPresent() && expectedVersion.get() 
!= -1L && sequential) {
                         return CompletableFuture.failedFuture(
                                 new MetadataStoreException(
                                         "Can't have expectedVersion and 
Sequential at the same time"));
                     }
                     CompletableFuture<String> actualPath;
-                    if (options.contains(CreateOption.Sequential)) {
+                    if (sequential) {
                         var parent = parent(path);
                         var parentPath = parent == null ? "/" : parent;
 
@@ -311,7 +304,7 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
                                     })
                             .ifPresent(putOptions::add);
 
-                    if (options.contains(CreateOption.Ephemeral)) {
+                    if (ephemeral) {
                         putOptions.add(PutOption.AsEphemeralRecord);
                     }
                     var parentPath = parent(path);
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index 22c3553bf1c..fdb636d7a7e 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.metadata.impl;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import io.opentelemetry.api.OpenTelemetry;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import lombok.Cleanup;
 import org.apache.pulsar.metadata.api.GetResult;
@@ -31,8 +31,8 @@ import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreProvider;
+import org.apache.pulsar.metadata.api.Option;
 import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -121,7 +121,7 @@ public class MetadataStoreFactoryImplTest {
 
         @Override
         protected CompletableFuture<Stat> storePut(String path, byte[] data, 
Optional<Long> optExpectedVersion,
-                                                   EnumSet<CreateOption> 
options) {
+                                                   Set<Option> opts) {
             return null;
         }
     }

Reply via email to