This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 48689001111 [improve] Wire Set<Option> through backend hooks + Oxia 
PartitionKey (#25723)
48689001111 is described below

commit 486890011119f607a3fda6b39f95e443a1b0105c
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 8 11:12:55 2026 -0700

    [improve] Wire Set<Option> through backend hooks + Oxia PartitionKey 
(#25723)
---
 .../apache/pulsar/metadata/api/MetadataStore.java  |  18 +++-
 .../metadata/impl/AbstractMetadataStore.java       |  78 ++++++++------
 .../pulsar/metadata/impl/DualMetadataStore.java    |   6 +-
 .../metadata/impl/FaultInjectionMetadataStore.java |   4 +-
 .../metadata/impl/LocalMemoryMetadataStore.java    |  10 +-
 .../pulsar/metadata/impl/RocksdbMetadataStore.java |  10 +-
 .../pulsar/metadata/impl/ZKMetadataStore.java      |   2 +-
 .../batching/AbstractBatchedMetadataStore.java     |  12 +--
 .../pulsar/metadata/impl/batching/OpDelete.java    |   3 +
 .../pulsar/metadata/impl/batching/OpGet.java       |   3 +
 .../metadata/impl/batching/OpGetChildren.java      |   3 +
 .../metadata/impl/oxia/OxiaMetadataStore.java      |  74 ++++++++++---
 .../pulsar/metadata/OxiaPartitionKeyTest.java      | 117 +++++++++++++++++++++
 .../impl/MetadataStoreFactoryImplTest.java         |   8 +-
 14 files changed, 270 insertions(+), 78 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 71f69a313c2..d258934bec9 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -198,8 +198,8 @@ public interface MetadataStore extends AutoCloseable {
         return delete(path, expectedVersion, Set.of());
     }
 
-    default CompletableFuture<Void> deleteIfExists(String path, Optional<Long> 
expectedVersion) {
-        return delete(path, expectedVersion)
+    default CompletableFuture<Void> deleteIfExists(String path, Optional<Long> 
expectedVersion, Set<Option> opts) {
+        return delete(path, expectedVersion, opts)
                 .exceptionally(e -> {
                     if (e.getCause() instanceof NotFoundException) {
                         LOG.info().attr("path", path).log("Path not found 
while deleting (this is not a problem)");
@@ -216,6 +216,11 @@ public interface MetadataStore extends AutoCloseable {
                 });
     }
 
+    /** Like {@link #deleteIfExists(String, Optional, Set)} with no options. */
+    default CompletableFuture<Void> deleteIfExists(String path, Optional<Long> 
expectedVersion) {
+        return deleteIfExists(path, expectedVersion, Set.of());
+    }
+
     /**
      * Delete a key-value pair and all the children nodes.
      *
@@ -224,9 +229,16 @@ public interface MetadataStore extends AutoCloseable {
      *
      * @param path
      *            the path of the key to delete from the store
+     * @param opts
+     *            the set of {@link Option options} for this operation
      * @return a future to track the async request
      */
-    CompletableFuture<Void> deleteRecursive(String path);
+    CompletableFuture<Void> deleteRecursive(String path, Set<Option> opts);
+
+    /** Like {@link #deleteRecursive(String, Set)} with no options. */
+    default CompletableFuture<Void> deleteRecursive(String path) {
+        return deleteRecursive(path, Set.of());
+    }
 
     /**
      * Register a listener that will be called on changes in the underlying 
store.
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index c7b9f2b582f..72e8a26afb8 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -101,13 +101,23 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     protected final AtomicBoolean isClosed = new AtomicBoolean(false);
 
-    protected abstract CompletableFuture<Boolean> existsFromStore(String path);
+    /**
+     * Backend hook for {@link #exists}. Implementations consume {@link 
Option} entries from {@code opts}
+     * via {@link OptionsHelper} (e.g. {@link OptionsHelper#partitionKey} for 
routing on sharded backends).
+     */
+    protected abstract CompletableFuture<Boolean> existsFromStore(String path, 
Set<Option> opts);
+
+    /**
+     * Backend hook for {@link MetadataStore#getChildrenFromStore(String, 
Set)}. Implementations consume
+     * {@link Option} entries from {@code opts} via {@link OptionsHelper}.
+     */
+    @Override
+    public abstract CompletableFuture<List<String>> 
getChildrenFromStore(String path, Set<Option> opts);
 
-    // Re-declare the legacy no-opts public methods as abstract here so that 
backends keep providing them
-    // as the concrete implementation hooks. The canonical {@code Set<Option>} 
forms in this class delegate
-    // to these via virtual dispatch.
     @Override
-    public abstract CompletableFuture<List<String>> 
getChildrenFromStore(String path);
+    public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+        return getChildrenFromStore(path, Set.of());
+    }
 
     protected MetadataNodeSizeStats nodeSizeStats;
 
@@ -186,14 +196,14 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                 .buildAsync(new AsyncCacheLoader<String, Boolean>() {
                     @Override
                     public CompletableFuture<Boolean> asyncLoad(String key, 
Executor executor) {
-                        return existsFromStore(key);
+                        return existsFromStore(key, Set.of());
                     }
 
                     @Override
                     public CompletableFuture<Boolean> asyncReload(String key, 
Boolean oldValue,
                             Executor executor) {
                         if (isConnected) {
-                            return existsFromStore(key);
+                            return existsFromStore(key, Set.of());
                         } else {
                             // Do not refresh if we're not connected
                             return CompletableFuture.completedFuture(oldValue);
@@ -235,7 +245,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
             }
             // else update the event
             CompletableFuture<?> updateResult = (event.getType() == 
NotificationType.Deleted)
-                    ? deleteInternal(event.getPath(), Optional.empty())
+                    ? deleteInternal(event.getPath(), Optional.empty(), 
Set.of())
                     : putInternal(event.getPath(), event.getValue(),
                     Optional.ofNullable(event.getExpectedVersion()), 
fromLegacyCreateOptions(options));
             updateResult.thenApply(stat -> {
@@ -355,7 +365,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
             return FutureUtil
                     .failedFuture(new 
MetadataStoreException.InvalidPathException(path));
         }
-        return storeGet(path)
+        return storeGet(path, opts)
                 .whenComplete((v, t) -> {
                     if (t != null) {
                         v.ifPresent(getResult -> 
nodeSizeStats.recordGetRes(path, getResult));
@@ -366,7 +376,11 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                 });
     }
 
-    protected abstract CompletableFuture<Optional<GetResult>> storeGet(String 
path);
+    /**
+     * Backend hook for {@link #get}. Implementations consume {@link Option} 
entries from {@code opts}
+     * via {@link OptionsHelper}.
+     */
+    protected abstract CompletableFuture<Optional<GetResult>> storeGet(String 
path, Set<Option> opts);
 
     @Override
     public final CompletableFuture<List<String>> getChildren(String path, 
Set<Option> opts) {
@@ -383,11 +397,6 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         return listFuture;
     }
 
-    @Override
-    public CompletableFuture<List<String>> getChildrenFromStore(String path, 
Set<Option> opts) {
-        return getChildrenFromStore(path);
-    }
-
     @Override
     public final CompletableFuture<Boolean> exists(String path, Set<Option> 
opts) {
         if (isClosed()) {
@@ -448,7 +457,12 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         }
     }
 
-    protected abstract CompletableFuture<Void> storeDelete(String path, 
Optional<Long> expectedVersion);
+    /**
+     * Backend hook for {@link #delete}. Implementations consume {@link 
Option} entries from {@code opts}
+     * via {@link OptionsHelper}.
+     */
+    protected abstract CompletableFuture<Void> storeDelete(String path, 
Optional<Long> expectedVersion,
+                                                            Set<Option> opts);
 
     @Override
     public final CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion, Set<Option> opts) {
@@ -466,7 +480,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                     expectedVersion.orElse(null), Instant.now().toEpochMilli(),
                     getMetadataEventSynchronizer().get().getClusterName(), 
NotificationType.Deleted);
             return getMetadataEventSynchronizer().get().notify(event)
-                    .thenCompose(__ -> deleteInternal(path, expectedVersion))
+                    .thenCompose(__ -> deleteInternal(path, expectedVersion, 
opts))
                     .whenComplete((v, t) -> {
                         if (null != t) {
                             
metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
@@ -475,7 +489,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                         }
                     });
         } else {
-            return deleteInternal(path, expectedVersion)
+            return deleteInternal(path, expectedVersion, opts)
                     .whenComplete((v, t) -> {
                         if (null != t) {
                             
metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
@@ -486,9 +500,9 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         }
     }
 
-    private CompletableFuture<Void> deleteInternal(String path, Optional<Long> 
expectedVersion) {
+    private CompletableFuture<Void> deleteInternal(String path, Optional<Long> 
expectedVersion, Set<Option> opts) {
         // Ensure caches are invalidated before the operation is confirmed
-        return storeDelete(path, expectedVersion).thenRun(() -> {
+        return storeDelete(path, expectedVersion, opts).thenRun(() -> {
             existsCache.synchronous().invalidate(path);
             childrenCache.synchronous().invalidate(path);
             String parent = parent(path);
@@ -502,19 +516,19 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     }
 
     @Override
-    public CompletableFuture<Void> deleteRecursive(String path) {
+    public CompletableFuture<Void> deleteRecursive(String path, Set<Option> 
opts) {
         log.info().attr("path", path).log("Deleting recursively path");
         if (isClosed()) {
             return alreadyClosedFailedFuture();
         }
-        return getChildren(path)
+        return getChildren(path, opts)
                 .thenCompose(children -> FutureUtil.waitForAll(
                         children.stream()
-                                .map(child -> deleteRecursive(path + "/" + 
child))
+                                .map(child -> deleteRecursive(path + "/" + 
child, opts))
                                 .collect(Collectors.toList())))
                 .thenCompose(__ -> {
                     log.info().attr("path", path).log("After deleting all 
children, now deleting path");
-                    return deleteIfExists(path, Optional.empty());
+                    return deleteIfExists(path, Optional.empty(), opts);
                 });
     }
 
@@ -533,7 +547,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         if (isClosed()) {
             return alreadyClosedFailedFuture();
         }
-        return storeFindByIndex(scanPathPrefix, indexName, secondaryKey, 
fallbackFilter);
+        return storeFindByIndex(scanPathPrefix, indexName, secondaryKey, 
fallbackFilter, opts);
     }
 
     @Override
@@ -552,7 +566,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
             consumer.onError(ex);
             return FutureUtil.failedFuture(ex);
         }
-        return storeScanChildren(parentPath, consumer);
+        return storeScanChildren(parentPath, consumer, opts);
     }
 
     /**
@@ -561,13 +575,13 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
      * {@link #storeGet}. Backends with a native range-scan primitive (Oxia, 
RocksDB,
      * in-memory NavigableMap) override this method for a single store-side 
scan.
      */
-    protected CompletableFuture<Void> storeScanChildren(String parentPath, 
ScanConsumer consumer) {
+    protected CompletableFuture<Void> storeScanChildren(String parentPath, 
ScanConsumer consumer, Set<Option> opts) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        getChildrenFromStore(parentPath).thenCompose(children -> {
+        getChildrenFromStore(parentPath, opts).thenCompose(children -> {
             CompletableFuture<Void> chain = 
CompletableFuture.completedFuture(null);
             for (String child : children) {
                 String childPath = parentPath.equals("/") ? "/" + child : 
parentPath + "/" + child;
-                chain = chain.thenCompose(__ -> storeGet(childPath))
+                chain = chain.thenCompose(__ -> storeGet(childPath, opts))
                         .thenAccept(opt -> opt.ifPresent(consumer::onNext));
             }
             return chain;
@@ -587,12 +601,12 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     protected CompletableFuture<List<GetResult>> storeFindByIndex(
             String scanPathPrefix, String indexName, String secondaryKey,
-            Predicate<GetResult> fallbackFilter) {
+            Predicate<GetResult> fallbackFilter, Set<Option> opts) {
         // Default fallback: full scan under scanPathPrefix, applying 
fallbackFilter to each result.
-        return getChildrenFromStore(scanPathPrefix)
+        return getChildrenFromStore(scanPathPrefix, opts)
                 .thenCompose(children -> {
                     List<CompletableFuture<Optional<GetResult>>> futures = 
children.stream()
-                            .map(child -> storeGet(scanPathPrefix + "/" + 
child))
+                            .map(child -> storeGet(scanPathPrefix + "/" + 
child, opts))
                             .toList();
                     return FutureUtil.waitForAll(futures)
                             .thenApply(__ -> futures.stream()
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index 55ab3f74055..38a3bc6013a 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -371,11 +371,11 @@ public class DualMetadataStore implements 
MetadataStoreExtended {
     }
 
     @Override
-    public CompletableFuture<Void> deleteRecursive(String path) {
+    public CompletableFuture<Void> deleteRecursive(String path, Set<Option> 
opts) {
         switch (migrationState.getPhase()) {
             case NOT_STARTED, FAILED -> {
                 pendingSourceWrites.incrementAndGet();
-                var future = sourceStore.deleteRecursive(path);
+                var future = sourceStore.deleteRecursive(path, opts);
                 future.whenComplete((result, e) -> 
pendingSourceWrites.decrementAndGet());
                 return future;
             }
@@ -383,7 +383,7 @@ public class DualMetadataStore implements 
MetadataStoreExtended {
                 return 
CompletableFuture.failedFuture(READ_ONLY_STATE_EXCEPTION);
             }
             case COMPLETED -> {
-                return targetStore.deleteRecursive(path);
+                return targetStore.deleteRecursive(path, opts);
             }
 
             default -> throw new IllegalStateException("Invalid phase " + 
migrationState.getPhase());
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 6fbd22e3888..552f0e817b5 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -137,13 +137,13 @@ public class FaultInjectionMetadataStore implements 
MetadataStoreExtended {
     }
 
     @Override
-    public CompletableFuture<Void> deleteRecursive(String path) {
+    public CompletableFuture<Void> deleteRecursive(String path, Set<Option> 
opts) {
         Optional<MetadataStoreException> ex = 
programmedFailure(OperationType.DELETE, path);
         if (ex.isPresent()) {
             return FutureUtil.failedFuture(ex.get());
         }
 
-        return store.deleteRecursive(path);
+        return store.deleteRecursive(path, opts);
     }
 
     @Override
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 130eb6ff0da..c1f00c081c5 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -108,7 +108,7 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
     }
 
     @Override
-    public CompletableFuture<Optional<GetResult>> storeGet(String path) {
+    public CompletableFuture<Optional<GetResult>> storeGet(String path, 
Set<Option> opts) {
         synchronized (map) {
             Value v = map.get(path);
             if (v != null) {
@@ -123,7 +123,7 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
     }
 
     @Override
-    protected CompletableFuture<Void> storeScanChildren(String parentPath, 
ScanConsumer consumer) {
+    protected CompletableFuture<Void> storeScanChildren(String parentPath, 
ScanConsumer consumer, Set<Option> opts) {
         // Snapshot the immediate children under the lock, then dispatch 
outside it so a slow
         // consumer can't stall other store operations.
         List<GetResult> snapshot = new ArrayList<>();
@@ -156,7 +156,7 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
     }
 
     @Override
-    public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+    public CompletableFuture<List<String>> getChildrenFromStore(String path, 
Set<Option> opts) {
         if (!isValidPath(path)) {
             return FutureUtil.failedFuture(new 
MetadataStoreException.InvalidPathException(path));
         }
@@ -178,7 +178,7 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
     }
 
     @Override
-    public CompletableFuture<Boolean> existsFromStore(String path) {
+    public CompletableFuture<Boolean> existsFromStore(String path, Set<Option> 
opts) {
         if (!isValidPath(path)) {
             return FutureUtil.failedFuture(new 
MetadataStoreException.InvalidPathException(path));
         }
@@ -242,7 +242,7 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
     }
 
     @Override
-    public CompletableFuture<Void> storeDelete(String path, Optional<Long> 
optExpectedVersion) {
+    public CompletableFuture<Void> storeDelete(String path, Optional<Long> 
optExpectedVersion, Set<Option> opts) {
         if (!isValidPath(path)) {
             return FutureUtil.failedFuture(new 
MetadataStoreException.InvalidPathException(path));
         }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 17836a7d383..5e9024a3104 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -375,7 +375,7 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
     }
 
     @Override
-    public CompletableFuture<Optional<GetResult>> storeGet(String path) {
+    public CompletableFuture<Optional<GetResult>> storeGet(String path, 
Set<Option> opts) {
         log.debug().attr("path", path).attr("instanceId", 
instanceId).log("getFromStore");
         try {
             dbStateLock.readLock().lock();
@@ -408,7 +408,7 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
     }
 
     @Override
-    protected CompletableFuture<Void> storeScanChildren(String parentPath, 
ScanConsumer consumer) {
+    protected CompletableFuture<Void> storeScanChildren(String parentPath, 
ScanConsumer consumer, Set<Option> opts) {
         // Native iterator-based scan over the parent's key range, with the 
same direct-child
         // filter getChildrenFromStore applies. Snapshot under the read lock 
then dispatch
         // outside it.
@@ -487,7 +487,7 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
     }
 
     @Override
-    public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+    public CompletableFuture<List<String>> getChildrenFromStore(String path, 
Set<Option> opts) {
         log.debug().attr("path", path).attr("instanceId", 
instanceId).log("getChildrenFromStore");
         try {
             dbStateLock.readLock().lock();
@@ -530,7 +530,7 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
     }
 
     @Override
-    protected CompletableFuture<Boolean> existsFromStore(String path) {
+    protected CompletableFuture<Boolean> existsFromStore(String path, 
Set<Option> opts) {
         log.debug().attr("path", path).attr("instanceId", 
instanceId).log("existsFromStore");
         try {
             dbStateLock.readLock().lock();
@@ -551,7 +551,7 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
     }
 
     @Override
-    protected CompletableFuture<Void> storeDelete(String path, Optional<Long> 
expectedVersion) {
+    protected CompletableFuture<Void> storeDelete(String path, Optional<Long> 
expectedVersion, Set<Option> opts) {
         log.debug().attr("path", path).attr("instanceId", 
instanceId).log("storeDelete");
         try {
             dbStateLock.readLock().lock();
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 5e7c57fac77..fc5c9e4f5e4 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -390,7 +390,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
     }
 
     @Override
-    public CompletableFuture<Boolean> existsFromStore(String path) {
+    public CompletableFuture<Boolean> existsFromStore(String path, Set<Option> 
opts) {
         CompletableFuture<Boolean> future = new CompletableFuture<>();
 
         try {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 2d7b56ad650..7db4c9146f5 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -127,22 +127,22 @@ public abstract class AbstractBatchedMetadataStore 
extends AbstractMetadataStore
     }
 
     @Override
-    public final CompletableFuture<Optional<GetResult>> storeGet(String path) {
-        OpGet op = new OpGet(path);
+    public final CompletableFuture<Optional<GetResult>> storeGet(String path, 
Set<Option> opts) {
+        OpGet op = new OpGet(path, opts);
         enqueue(readOps, op);
         return op.getFuture();
     }
 
     @Override
-    public final CompletableFuture<List<String>> getChildrenFromStore(String 
path) {
-        OpGetChildren op = new OpGetChildren(path);
+    public final CompletableFuture<List<String>> getChildrenFromStore(String 
path, Set<Option> opts) {
+        OpGetChildren op = new OpGetChildren(path, opts);
         enqueue(readOps, op);
         return op.getFuture();
     }
 
     @Override
-    protected final CompletableFuture<Void> storeDelete(String path, 
Optional<Long> expectedVersion) {
-        OpDelete op = new OpDelete(path, expectedVersion);
+    protected final CompletableFuture<Void> storeDelete(String path, 
Optional<Long> expectedVersion, Set<Option> opts) {
+        OpDelete op = new OpDelete(path, expectedVersion, opts);
         enqueue(writeOps, op);
         return op.getFuture();
     }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
index f773d40fe08..7e5da62fded 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
@@ -19,15 +19,18 @@
 package org.apache.pulsar.metadata.impl.batching;
 
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import org.apache.pulsar.metadata.api.Option;
 
 @Data
 @AllArgsConstructor
 public class OpDelete implements MetadataOp {
     private final String path;
     private final Optional<Long> optExpectedVersion;
+    private final Set<Option> options;
     public final long created = System.currentTimeMillis();
 
     private final CompletableFuture<Void> future = new CompletableFuture<>();
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
index 8c684b9d661..1726c37fecb 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
@@ -19,16 +19,19 @@
 package org.apache.pulsar.metadata.impl.batching;
 
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.Option;
 
 @Data
 @AllArgsConstructor
 public class OpGet implements MetadataOp {
 
     private final String path;
+    private final Set<Option> options;
     public final long created = System.currentTimeMillis();
     private final CompletableFuture<Optional<GetResult>> future = new 
CompletableFuture<>();
 
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
index 777c8f7d0a3..9f4799a02f2 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
@@ -19,15 +19,18 @@
 package org.apache.pulsar.metadata.impl.batching;
 
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import org.apache.pulsar.metadata.api.Option;
 
 @Data
 @AllArgsConstructor
 public class OpGetChildren implements MetadataOp {
 
     private final String path;
+    private final Set<Option> options;
     public final long created = System.currentTimeMillis();
     private final CompletableFuture<List<String>> future = new 
CompletableFuture<>();
 
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 3b89a99bb67..105332bf74e 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -27,8 +27,10 @@ import io.oxia.client.api.Version;
 import io.oxia.client.api.exceptions.KeyAlreadyExistsException;
 import io.oxia.client.api.exceptions.UnexpectedVersionIdException;
 import io.oxia.client.api.options.DeleteOption;
+import io.oxia.client.api.options.GetOption;
 import io.oxia.client.api.options.ListOption;
 import io.oxia.client.api.options.PutOption;
+import io.oxia.client.api.options.RangeScanOption;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashSet;
@@ -152,11 +154,11 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
     }
 
     @Override
-    public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+    public CompletableFuture<List<String>> getChildrenFromStore(String path, 
Set<Option> opts) {
         var pathWithSlash = path.endsWith("/") ? path : path + "/";
 
         return client
-                .list(pathWithSlash, pathWithSlash + "/")
+                .list(pathWithSlash, pathWithSlash + "/", listOptions(opts))
                 .thenApply(
                         children ->
                                 children.stream().map(child -> 
child.substring(pathWithSlash.length())).toList())
@@ -164,30 +166,27 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
     }
 
     @Override
-    protected CompletableFuture<Boolean> existsFromStore(String path) {
-        return client.get(path).thenApply(Objects::nonNull)
+    protected CompletableFuture<Boolean> existsFromStore(String path, 
Set<Option> opts) {
+        return client.get(path, getOptions(opts)).thenApply(Objects::nonNull)
                 .exceptionallyCompose(this::convertException);
     }
 
     @Override
-    protected CompletableFuture<Optional<GetResult>> storeGet(String path) {
-        return client.get(path).thenApply(res -> convertGetResult(path, res))
+    protected CompletableFuture<Optional<GetResult>> storeGet(String path, 
Set<Option> opts) {
+        return client.get(path, getOptions(opts)).thenApply(res -> 
convertGetResult(path, res))
                 .exceptionallyCompose(this::convertException);
     }
 
     @Override
-    protected CompletableFuture<Void> storeDelete(String path, Optional<Long> 
expectedVersion) {
-        return getChildrenFromStore(path)
+    protected CompletableFuture<Void> storeDelete(String path, Optional<Long> 
expectedVersion, Set<Option> opts) {
+        return getChildrenFromStore(path, opts)
                 .thenCompose(
                         children -> {
                             if (!children.isEmpty()) {
                                 return CompletableFuture.failedFuture(
                                         new MetadataStoreException("Key '" + 
path + "' has children"));
                             } else {
-                                Set<DeleteOption> delOption =
-                                        expectedVersion
-                                                .map(v -> 
Collections.singleton(DeleteOption.IfVersionIdEquals(v)))
-                                                
.orElse(Collections.emptySet());
+                                Set<DeleteOption> delOption = 
deleteOptions(opts, expectedVersion);
                                 CompletableFuture<Boolean> result = 
client.delete(path, delOption);
                                 return result
                                         .thenCompose(
@@ -211,7 +210,7 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
     }
 
     @Override
-    protected CompletableFuture<Void> storeScanChildren(String parentPath, 
ScanConsumer consumer) {
+    protected CompletableFuture<Void> storeScanChildren(String parentPath, 
ScanConsumer consumer, Set<Option> opts) {
         // Oxia's hierarchical sort makes [parentPath + "/", parentPath + 
"//") the canonical
         // range covering exactly the immediate children — same convention 
getChildrenFromStore
         // uses with `client.list(...)`.
@@ -237,7 +236,7 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
                     consumer.onCompleted();
                     done.complete(null);
                 }
-            });
+            }, rangeScanOptions(opts));
         } catch (Throwable t) {
             consumer.onError(t);
             done.completeExceptionally(t);
@@ -248,12 +247,14 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
     @Override
     protected CompletableFuture<List<GetResult>> storeFindByIndex(
             String scanPathPrefix, String indexName, String secondaryKey,
-            Predicate<GetResult> fallbackFilter) {
+            Predicate<GetResult> fallbackFilter, Set<Option> opts) {
         String scopedKey = scanPathPrefix + "/" + secondaryKey;
-        return client.list(scopedKey, scopedKey + "~", 
Set.of(ListOption.UseIndex(indexName)))
+        Set<ListOption> listOpts = new HashSet<>(listOptions(opts));
+        listOpts.add(ListOption.UseIndex(indexName));
+        return client.list(scopedKey, scopedKey + "~", listOpts)
                 .thenCompose(primaryKeys -> {
                     List<CompletableFuture<Optional<GetResult>>> futures = 
primaryKeys.stream()
-                            .map(this::storeGet)
+                            .map(p -> storeGet(p, opts))
                             .toList();
                     return FutureUtil.waitForAll(futures)
                             .thenApply(__ -> futures.stream()
@@ -307,6 +308,10 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
                     if (ephemeral) {
                         putOptions.add(PutOption.AsEphemeralRecord);
                     }
+                    String partitionKey = OptionsHelper.partitionKey(opts);
+                    if (partitionKey != null) {
+                        putOptions.add(PutOption.PartitionKey(partitionKey));
+                    }
                     var parentPath = parent(path);
                     var parentPrefix = parentPath == null ? "" : parentPath;
                     secondaryIndexes.forEach((indexName, secondaryKey) ->
@@ -324,6 +329,41 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
                 });
     }
 
+    /** Build the Oxia {@link GetOption} set from {@code opts}, currently 
routing the partition key. */
+    private static Set<GetOption> getOptions(Set<Option> opts) {
+        String partitionKey = OptionsHelper.partitionKey(opts);
+        return partitionKey == null ? Set.of() : 
Set.of(GetOption.PartitionKey(partitionKey));
+    }
+
+    /** Build the Oxia {@link ListOption} set from {@code opts}, currently 
routing the partition key. */
+    private static Set<ListOption> listOptions(Set<Option> opts) {
+        String partitionKey = OptionsHelper.partitionKey(opts);
+        return partitionKey == null ? Set.of() : 
Set.of(ListOption.PartitionKey(partitionKey));
+    }
+
+    /**
+     * Build the Oxia {@link DeleteOption} set from {@code opts} together with 
the optional expected
+     * version.
+     */
+    private static Set<DeleteOption> deleteOptions(Set<Option> opts, 
Optional<Long> expectedVersion) {
+        String partitionKey = OptionsHelper.partitionKey(opts);
+        if (partitionKey == null && expectedVersion.isEmpty()) {
+            return Set.of();
+        }
+        Set<DeleteOption> result = new HashSet<>();
+        expectedVersion.ifPresent(v -> 
result.add(DeleteOption.IfVersionIdEquals(v)));
+        if (partitionKey != null) {
+            result.add(DeleteOption.PartitionKey(partitionKey));
+        }
+        return result;
+    }
+
+    /** Build the Oxia {@link RangeScanOption} set from {@code opts}, 
currently routing the partition key. */
+    private static Set<RangeScanOption> rangeScanOptions(Set<Option> opts) {
+        String partitionKey = OptionsHelper.partitionKey(opts);
+        return partitionKey == null ? Set.of() : 
Set.of(RangeScanOption.PartitionKey(partitionKey));
+    }
+
     private <T> CompletionStage<T> convertException(Throwable ex) {
         Throwable actEx = FutureUtil.unwrapCompletionException(ex);
         if (actEx instanceof UnexpectedVersionIdException || actEx instanceof 
KeyAlreadyExistsException) {
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaPartitionKeyTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaPartitionKeyTest.java
new file mode 100644
index 00000000000..6dbe6c363dd
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaPartitionKeyTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import io.oxia.testcontainers.OxiaContainer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Option;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests for {@link Option.PartitionKey} routing on a multi-shard 
Oxia cluster.
+ *
+ * <p>These tests verify that the {@link Option.PartitionKey} option is 
plumbed end-to-end through
+ * the {@code MetadataStore} API down into the Oxia client. They run against a 
3-shard cluster so
+ * routed operations exercise the cross-shard path, but they do not assert 
which shard a particular
+ * key lands in (Oxia's hash function is an implementation detail).
+ */
+public class OxiaPartitionKeyTest {
+
+    private static final int SHARDS = 3;
+    private OxiaContainer oxiaServer;
+
+    @BeforeClass
+    public void start() {
+        oxiaServer = new 
OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME).withShards(SHARDS);
+        oxiaServer.start();
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void stop() {
+        if (oxiaServer != null) {
+            oxiaServer.close();
+            oxiaServer = null;
+        }
+    }
+
+    private MetadataStore newStore() throws Exception {
+        return MetadataStoreFactory.create(
+                "oxia://" + oxiaServer.getServiceAddress(),
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+    }
+
+    @Test
+    public void putAndGetRoundTripWithPartitionKey() throws Exception {
+        @Cleanup
+        MetadataStore store = newStore();
+
+        String path = "/partition-key-roundtrip-" + System.nanoTime();
+        byte[] value = "hello".getBytes(StandardCharsets.UTF_8);
+        Set<Option> opts = Set.of(new Option.PartitionKey("group-A"));
+
+        store.put(path, value, Optional.empty(), opts).get();
+
+        var result = store.get(path, opts).get();
+        assertTrue(result.isPresent(), "value should be reachable with the 
same partition key");
+        assertEquals(result.get().getValue(), value);
+    }
+
+    @Test
+    public void deleteUsesPartitionKey() throws Exception {
+        @Cleanup
+        MetadataStore store = newStore();
+
+        String path = "/partition-key-delete-" + System.nanoTime();
+        Set<Option> opts = Set.of(new Option.PartitionKey("group-X"));
+
+        store.put(path, "v".getBytes(StandardCharsets.UTF_8), 
Optional.empty(), opts).get();
+        assertTrue(store.get(path, opts).get().isPresent());
+
+        store.delete(path, Optional.empty(), opts).get();
+        assertFalse(store.get(path, opts).get().isPresent());
+    }
+
+    @Test
+    public void getChildrenWithPartitionKey() throws Exception {
+        @Cleanup
+        MetadataStore store = newStore();
+
+        String parent = "/partition-key-children-" + System.nanoTime();
+        Set<Option> opts = Set.of(new Option.PartitionKey("group-Y"));
+
+        store.put(parent + "/a", "1".getBytes(StandardCharsets.UTF_8), 
Optional.empty(), opts).get();
+        store.put(parent + "/b", "2".getBytes(StandardCharsets.UTF_8), 
Optional.empty(), opts).get();
+
+        List<String> children = store.getChildren(parent, opts).get();
+        assertTrue(children.containsAll(List.of("a", "b")),
+                "expected children a and b, got: " + children);
+    }
+}
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index fdb636d7a7e..a20d7affce7 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -100,22 +100,22 @@ public class MetadataStoreFactoryImplTest {
         }
 
         @Override
-        public CompletableFuture<List<String>> getChildrenFromStore(String 
path) {
+        public CompletableFuture<List<String>> getChildrenFromStore(String 
path, Set<Option> opts) {
             return null;
         }
 
         @Override
-        protected CompletableFuture<Boolean> existsFromStore(String path) {
+        protected CompletableFuture<Boolean> existsFromStore(String path, 
Set<Option> opts) {
             return null;
         }
 
         @Override
-        protected CompletableFuture<Optional<GetResult>> storeGet(String path) 
{
+        protected CompletableFuture<Optional<GetResult>> storeGet(String path, 
Set<Option> opts) {
             return null;
         }
 
         @Override
-        protected CompletableFuture<Void> storeDelete(String path, 
Optional<Long> expectedVersion) {
+        protected CompletableFuture<Void> storeDelete(String path, 
Optional<Long> expectedVersion, Set<Option> opts) {
             return null;
         }
 


Reply via email to