This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 48689001111 [improve] Wire Set<Option> through backend hooks + Oxia
PartitionKey (#25723)
48689001111 is described below
commit 486890011119f607a3fda6b39f95e443a1b0105c
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 8 11:12:55 2026 -0700
[improve] Wire Set<Option> through backend hooks + Oxia PartitionKey
(#25723)
---
.../apache/pulsar/metadata/api/MetadataStore.java | 18 +++-
.../metadata/impl/AbstractMetadataStore.java | 78 ++++++++------
.../pulsar/metadata/impl/DualMetadataStore.java | 6 +-
.../metadata/impl/FaultInjectionMetadataStore.java | 4 +-
.../metadata/impl/LocalMemoryMetadataStore.java | 10 +-
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 10 +-
.../pulsar/metadata/impl/ZKMetadataStore.java | 2 +-
.../batching/AbstractBatchedMetadataStore.java | 12 +--
.../pulsar/metadata/impl/batching/OpDelete.java | 3 +
.../pulsar/metadata/impl/batching/OpGet.java | 3 +
.../metadata/impl/batching/OpGetChildren.java | 3 +
.../metadata/impl/oxia/OxiaMetadataStore.java | 74 ++++++++++---
.../pulsar/metadata/OxiaPartitionKeyTest.java | 117 +++++++++++++++++++++
.../impl/MetadataStoreFactoryImplTest.java | 8 +-
14 files changed, 270 insertions(+), 78 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 71f69a313c2..d258934bec9 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -198,8 +198,8 @@ public interface MetadataStore extends AutoCloseable {
return delete(path, expectedVersion, Set.of());
}
- default CompletableFuture<Void> deleteIfExists(String path, Optional<Long>
expectedVersion) {
- return delete(path, expectedVersion)
+ default CompletableFuture<Void> deleteIfExists(String path, Optional<Long>
expectedVersion, Set<Option> opts) {
+ return delete(path, expectedVersion, opts)
.exceptionally(e -> {
if (e.getCause() instanceof NotFoundException) {
LOG.info().attr("path", path).log("Path not found
while deleting (this is not a problem)");
@@ -216,6 +216,11 @@ public interface MetadataStore extends AutoCloseable {
});
}
+ /** Like {@link #deleteIfExists(String, Optional, Set)} with no options. */
+ default CompletableFuture<Void> deleteIfExists(String path, Optional<Long>
expectedVersion) {
+ return deleteIfExists(path, expectedVersion, Set.of());
+ }
+
/**
* Delete a key-value pair and all the children nodes.
*
@@ -224,9 +229,16 @@ public interface MetadataStore extends AutoCloseable {
*
* @param path
* the path of the key to delete from the store
+ * @param opts
+ * the set of {@link Option options} for this operation
* @return a future to track the async request
*/
- CompletableFuture<Void> deleteRecursive(String path);
+ CompletableFuture<Void> deleteRecursive(String path, Set<Option> opts);
+
+ /** Like {@link #deleteRecursive(String, Set)} with no options. */
+ default CompletableFuture<Void> deleteRecursive(String path) {
+ return deleteRecursive(path, Set.of());
+ }
/**
* Register a listener that will be called on changes in the underlying
store.
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index c7b9f2b582f..72e8a26afb8 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -101,13 +101,23 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
- protected abstract CompletableFuture<Boolean> existsFromStore(String path);
+ /**
+ * Backend hook for {@link #exists}. Implementations consume {@link
Option} entries from {@code opts}
+ * via {@link OptionsHelper} (e.g. {@link OptionsHelper#partitionKey} for
routing on sharded backends).
+ */
+ protected abstract CompletableFuture<Boolean> existsFromStore(String path,
Set<Option> opts);
+
+ /**
+ * Backend hook for {@link MetadataStore#getChildrenFromStore(String,
Set)}. Implementations consume
+ * {@link Option} entries from {@code opts} via {@link OptionsHelper}.
+ */
+ @Override
+ public abstract CompletableFuture<List<String>>
getChildrenFromStore(String path, Set<Option> opts);
- // Re-declare the legacy no-opts public methods as abstract here so that
backends keep providing them
- // as the concrete implementation hooks. The canonical {@code Set<Option>}
forms in this class delegate
- // to these via virtual dispatch.
@Override
- public abstract CompletableFuture<List<String>>
getChildrenFromStore(String path);
+ public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+ return getChildrenFromStore(path, Set.of());
+ }
protected MetadataNodeSizeStats nodeSizeStats;
@@ -186,14 +196,14 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
.buildAsync(new AsyncCacheLoader<String, Boolean>() {
@Override
public CompletableFuture<Boolean> asyncLoad(String key,
Executor executor) {
- return existsFromStore(key);
+ return existsFromStore(key, Set.of());
}
@Override
public CompletableFuture<Boolean> asyncReload(String key,
Boolean oldValue,
Executor executor) {
if (isConnected) {
- return existsFromStore(key);
+ return existsFromStore(key, Set.of());
} else {
// Do not refresh if we're not connected
return CompletableFuture.completedFuture(oldValue);
@@ -235,7 +245,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
// else update the event
CompletableFuture<?> updateResult = (event.getType() ==
NotificationType.Deleted)
- ? deleteInternal(event.getPath(), Optional.empty())
+ ? deleteInternal(event.getPath(), Optional.empty(),
Set.of())
: putInternal(event.getPath(), event.getValue(),
Optional.ofNullable(event.getExpectedVersion()),
fromLegacyCreateOptions(options));
updateResult.thenApply(stat -> {
@@ -355,7 +365,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
return FutureUtil
.failedFuture(new
MetadataStoreException.InvalidPathException(path));
}
- return storeGet(path)
+ return storeGet(path, opts)
.whenComplete((v, t) -> {
if (t != null) {
v.ifPresent(getResult ->
nodeSizeStats.recordGetRes(path, getResult));
@@ -366,7 +376,11 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
});
}
- protected abstract CompletableFuture<Optional<GetResult>> storeGet(String
path);
+ /**
+ * Backend hook for {@link #get}. Implementations consume {@link Option}
entries from {@code opts}
+ * via {@link OptionsHelper}.
+ */
+ protected abstract CompletableFuture<Optional<GetResult>> storeGet(String
path, Set<Option> opts);
@Override
public final CompletableFuture<List<String>> getChildren(String path,
Set<Option> opts) {
@@ -383,11 +397,6 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
return listFuture;
}
- @Override
- public CompletableFuture<List<String>> getChildrenFromStore(String path,
Set<Option> opts) {
- return getChildrenFromStore(path);
- }
-
@Override
public final CompletableFuture<Boolean> exists(String path, Set<Option>
opts) {
if (isClosed()) {
@@ -448,7 +457,12 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
}
- protected abstract CompletableFuture<Void> storeDelete(String path,
Optional<Long> expectedVersion);
+ /**
+ * Backend hook for {@link #delete}. Implementations consume {@link
Option} entries from {@code opts}
+ * via {@link OptionsHelper}.
+ */
+ protected abstract CompletableFuture<Void> storeDelete(String path,
Optional<Long> expectedVersion,
+ Set<Option> opts);
@Override
public final CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion, Set<Option> opts) {
@@ -466,7 +480,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
expectedVersion.orElse(null), Instant.now().toEpochMilli(),
getMetadataEventSynchronizer().get().getClusterName(),
NotificationType.Deleted);
return getMetadataEventSynchronizer().get().notify(event)
- .thenCompose(__ -> deleteInternal(path, expectedVersion))
+ .thenCompose(__ -> deleteInternal(path, expectedVersion,
opts))
.whenComplete((v, t) -> {
if (null != t) {
metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
@@ -475,7 +489,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
});
} else {
- return deleteInternal(path, expectedVersion)
+ return deleteInternal(path, expectedVersion, opts)
.whenComplete((v, t) -> {
if (null != t) {
metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
@@ -486,9 +500,9 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
}
- private CompletableFuture<Void> deleteInternal(String path, Optional<Long>
expectedVersion) {
+ private CompletableFuture<Void> deleteInternal(String path, Optional<Long>
expectedVersion, Set<Option> opts) {
// Ensure caches are invalidated before the operation is confirmed
- return storeDelete(path, expectedVersion).thenRun(() -> {
+ return storeDelete(path, expectedVersion, opts).thenRun(() -> {
existsCache.synchronous().invalidate(path);
childrenCache.synchronous().invalidate(path);
String parent = parent(path);
@@ -502,19 +516,19 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
@Override
- public CompletableFuture<Void> deleteRecursive(String path) {
+ public CompletableFuture<Void> deleteRecursive(String path, Set<Option>
opts) {
log.info().attr("path", path).log("Deleting recursively path");
if (isClosed()) {
return alreadyClosedFailedFuture();
}
- return getChildren(path)
+ return getChildren(path, opts)
.thenCompose(children -> FutureUtil.waitForAll(
children.stream()
- .map(child -> deleteRecursive(path + "/" +
child))
+ .map(child -> deleteRecursive(path + "/" +
child, opts))
.collect(Collectors.toList())))
.thenCompose(__ -> {
log.info().attr("path", path).log("After deleting all
children, now deleting path");
- return deleteIfExists(path, Optional.empty());
+ return deleteIfExists(path, Optional.empty(), opts);
});
}
@@ -533,7 +547,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
if (isClosed()) {
return alreadyClosedFailedFuture();
}
- return storeFindByIndex(scanPathPrefix, indexName, secondaryKey,
fallbackFilter);
+ return storeFindByIndex(scanPathPrefix, indexName, secondaryKey,
fallbackFilter, opts);
}
@Override
@@ -552,7 +566,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
consumer.onError(ex);
return FutureUtil.failedFuture(ex);
}
- return storeScanChildren(parentPath, consumer);
+ return storeScanChildren(parentPath, consumer, opts);
}
/**
@@ -561,13 +575,13 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
* {@link #storeGet}. Backends with a native range-scan primitive (Oxia,
RocksDB,
* in-memory NavigableMap) override this method for a single store-side
scan.
*/
- protected CompletableFuture<Void> storeScanChildren(String parentPath,
ScanConsumer consumer) {
+ protected CompletableFuture<Void> storeScanChildren(String parentPath,
ScanConsumer consumer, Set<Option> opts) {
CompletableFuture<Void> result = new CompletableFuture<>();
- getChildrenFromStore(parentPath).thenCompose(children -> {
+ getChildrenFromStore(parentPath, opts).thenCompose(children -> {
CompletableFuture<Void> chain =
CompletableFuture.completedFuture(null);
for (String child : children) {
String childPath = parentPath.equals("/") ? "/" + child :
parentPath + "/" + child;
- chain = chain.thenCompose(__ -> storeGet(childPath))
+ chain = chain.thenCompose(__ -> storeGet(childPath, opts))
.thenAccept(opt -> opt.ifPresent(consumer::onNext));
}
return chain;
@@ -587,12 +601,12 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected CompletableFuture<List<GetResult>> storeFindByIndex(
String scanPathPrefix, String indexName, String secondaryKey,
- Predicate<GetResult> fallbackFilter) {
+ Predicate<GetResult> fallbackFilter, Set<Option> opts) {
// Default fallback: full scan under scanPathPrefix, applying
fallbackFilter to each result.
- return getChildrenFromStore(scanPathPrefix)
+ return getChildrenFromStore(scanPathPrefix, opts)
.thenCompose(children -> {
List<CompletableFuture<Optional<GetResult>>> futures =
children.stream()
- .map(child -> storeGet(scanPathPrefix + "/" +
child))
+ .map(child -> storeGet(scanPathPrefix + "/" +
child, opts))
.toList();
return FutureUtil.waitForAll(futures)
.thenApply(__ -> futures.stream()
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index 55ab3f74055..38a3bc6013a 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -371,11 +371,11 @@ public class DualMetadataStore implements
MetadataStoreExtended {
}
@Override
- public CompletableFuture<Void> deleteRecursive(String path) {
+ public CompletableFuture<Void> deleteRecursive(String path, Set<Option>
opts) {
switch (migrationState.getPhase()) {
case NOT_STARTED, FAILED -> {
pendingSourceWrites.incrementAndGet();
- var future = sourceStore.deleteRecursive(path);
+ var future = sourceStore.deleteRecursive(path, opts);
future.whenComplete((result, e) ->
pendingSourceWrites.decrementAndGet());
return future;
}
@@ -383,7 +383,7 @@ public class DualMetadataStore implements
MetadataStoreExtended {
return
CompletableFuture.failedFuture(READ_ONLY_STATE_EXCEPTION);
}
case COMPLETED -> {
- return targetStore.deleteRecursive(path);
+ return targetStore.deleteRecursive(path, opts);
}
default -> throw new IllegalStateException("Invalid phase " +
migrationState.getPhase());
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 6fbd22e3888..552f0e817b5 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -137,13 +137,13 @@ public class FaultInjectionMetadataStore implements
MetadataStoreExtended {
}
@Override
- public CompletableFuture<Void> deleteRecursive(String path) {
+ public CompletableFuture<Void> deleteRecursive(String path, Set<Option>
opts) {
Optional<MetadataStoreException> ex =
programmedFailure(OperationType.DELETE, path);
if (ex.isPresent()) {
return FutureUtil.failedFuture(ex.get());
}
- return store.deleteRecursive(path);
+ return store.deleteRecursive(path, opts);
}
@Override
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 130eb6ff0da..c1f00c081c5 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -108,7 +108,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
}
@Override
- public CompletableFuture<Optional<GetResult>> storeGet(String path) {
+ public CompletableFuture<Optional<GetResult>> storeGet(String path,
Set<Option> opts) {
synchronized (map) {
Value v = map.get(path);
if (v != null) {
@@ -123,7 +123,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
}
@Override
- protected CompletableFuture<Void> storeScanChildren(String parentPath,
ScanConsumer consumer) {
+ protected CompletableFuture<Void> storeScanChildren(String parentPath,
ScanConsumer consumer, Set<Option> opts) {
// Snapshot the immediate children under the lock, then dispatch
outside it so a slow
// consumer can't stall other store operations.
List<GetResult> snapshot = new ArrayList<>();
@@ -156,7 +156,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
}
@Override
- public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+ public CompletableFuture<List<String>> getChildrenFromStore(String path,
Set<Option> opts) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new
MetadataStoreException.InvalidPathException(path));
}
@@ -178,7 +178,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
}
@Override
- public CompletableFuture<Boolean> existsFromStore(String path) {
+ public CompletableFuture<Boolean> existsFromStore(String path, Set<Option>
opts) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new
MetadataStoreException.InvalidPathException(path));
}
@@ -242,7 +242,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
}
@Override
- public CompletableFuture<Void> storeDelete(String path, Optional<Long>
optExpectedVersion) {
+ public CompletableFuture<Void> storeDelete(String path, Optional<Long>
optExpectedVersion, Set<Option> opts) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new
MetadataStoreException.InvalidPathException(path));
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 17836a7d383..5e9024a3104 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -375,7 +375,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
}
@Override
- public CompletableFuture<Optional<GetResult>> storeGet(String path) {
+ public CompletableFuture<Optional<GetResult>> storeGet(String path,
Set<Option> opts) {
log.debug().attr("path", path).attr("instanceId",
instanceId).log("getFromStore");
try {
dbStateLock.readLock().lock();
@@ -408,7 +408,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
}
@Override
- protected CompletableFuture<Void> storeScanChildren(String parentPath,
ScanConsumer consumer) {
+ protected CompletableFuture<Void> storeScanChildren(String parentPath,
ScanConsumer consumer, Set<Option> opts) {
// Native iterator-based scan over the parent's key range, with the
same direct-child
// filter getChildrenFromStore applies. Snapshot under the read lock
then dispatch
// outside it.
@@ -487,7 +487,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
}
@Override
- public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+ public CompletableFuture<List<String>> getChildrenFromStore(String path,
Set<Option> opts) {
log.debug().attr("path", path).attr("instanceId",
instanceId).log("getChildrenFromStore");
try {
dbStateLock.readLock().lock();
@@ -530,7 +530,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
}
@Override
- protected CompletableFuture<Boolean> existsFromStore(String path) {
+ protected CompletableFuture<Boolean> existsFromStore(String path,
Set<Option> opts) {
log.debug().attr("path", path).attr("instanceId",
instanceId).log("existsFromStore");
try {
dbStateLock.readLock().lock();
@@ -551,7 +551,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
}
@Override
- protected CompletableFuture<Void> storeDelete(String path, Optional<Long>
expectedVersion) {
+ protected CompletableFuture<Void> storeDelete(String path, Optional<Long>
expectedVersion, Set<Option> opts) {
log.debug().attr("path", path).attr("instanceId",
instanceId).log("storeDelete");
try {
dbStateLock.readLock().lock();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 5e7c57fac77..fc5c9e4f5e4 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -390,7 +390,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
}
@Override
- public CompletableFuture<Boolean> existsFromStore(String path) {
+ public CompletableFuture<Boolean> existsFromStore(String path, Set<Option>
opts) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
try {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 2d7b56ad650..7db4c9146f5 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -127,22 +127,22 @@ public abstract class AbstractBatchedMetadataStore
extends AbstractMetadataStore
}
@Override
- public final CompletableFuture<Optional<GetResult>> storeGet(String path) {
- OpGet op = new OpGet(path);
+ public final CompletableFuture<Optional<GetResult>> storeGet(String path,
Set<Option> opts) {
+ OpGet op = new OpGet(path, opts);
enqueue(readOps, op);
return op.getFuture();
}
@Override
- public final CompletableFuture<List<String>> getChildrenFromStore(String
path) {
- OpGetChildren op = new OpGetChildren(path);
+ public final CompletableFuture<List<String>> getChildrenFromStore(String
path, Set<Option> opts) {
+ OpGetChildren op = new OpGetChildren(path, opts);
enqueue(readOps, op);
return op.getFuture();
}
@Override
- protected final CompletableFuture<Void> storeDelete(String path,
Optional<Long> expectedVersion) {
- OpDelete op = new OpDelete(path, expectedVersion);
+ protected final CompletableFuture<Void> storeDelete(String path,
Optional<Long> expectedVersion, Set<Option> opts) {
+ OpDelete op = new OpDelete(path, expectedVersion, opts);
enqueue(writeOps, op);
return op.getFuture();
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
index f773d40fe08..7e5da62fded 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
@@ -19,15 +19,18 @@
package org.apache.pulsar.metadata.impl.batching;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.AllArgsConstructor;
import lombok.Data;
+import org.apache.pulsar.metadata.api.Option;
@Data
@AllArgsConstructor
public class OpDelete implements MetadataOp {
private final String path;
private final Optional<Long> optExpectedVersion;
+ private final Set<Option> options;
public final long created = System.currentTimeMillis();
private final CompletableFuture<Void> future = new CompletableFuture<>();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
index 8c684b9d661..1726c37fecb 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
@@ -19,16 +19,19 @@
package org.apache.pulsar.metadata.impl.batching;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.Option;
@Data
@AllArgsConstructor
public class OpGet implements MetadataOp {
private final String path;
+ private final Set<Option> options;
public final long created = System.currentTimeMillis();
private final CompletableFuture<Optional<GetResult>> future = new
CompletableFuture<>();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
index 777c8f7d0a3..9f4799a02f2 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
@@ -19,15 +19,18 @@
package org.apache.pulsar.metadata.impl.batching;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.AllArgsConstructor;
import lombok.Data;
+import org.apache.pulsar.metadata.api.Option;
@Data
@AllArgsConstructor
public class OpGetChildren implements MetadataOp {
private final String path;
+ private final Set<Option> options;
public final long created = System.currentTimeMillis();
private final CompletableFuture<List<String>> future = new
CompletableFuture<>();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 3b89a99bb67..105332bf74e 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -27,8 +27,10 @@ import io.oxia.client.api.Version;
import io.oxia.client.api.exceptions.KeyAlreadyExistsException;
import io.oxia.client.api.exceptions.UnexpectedVersionIdException;
import io.oxia.client.api.options.DeleteOption;
+import io.oxia.client.api.options.GetOption;
import io.oxia.client.api.options.ListOption;
import io.oxia.client.api.options.PutOption;
+import io.oxia.client.api.options.RangeScanOption;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
@@ -152,11 +154,11 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
}
@Override
- public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+ public CompletableFuture<List<String>> getChildrenFromStore(String path,
Set<Option> opts) {
var pathWithSlash = path.endsWith("/") ? path : path + "/";
return client
- .list(pathWithSlash, pathWithSlash + "/")
+ .list(pathWithSlash, pathWithSlash + "/", listOptions(opts))
.thenApply(
children ->
children.stream().map(child ->
child.substring(pathWithSlash.length())).toList())
@@ -164,30 +166,27 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
}
@Override
- protected CompletableFuture<Boolean> existsFromStore(String path) {
- return client.get(path).thenApply(Objects::nonNull)
+ protected CompletableFuture<Boolean> existsFromStore(String path,
Set<Option> opts) {
+ return client.get(path, getOptions(opts)).thenApply(Objects::nonNull)
.exceptionallyCompose(this::convertException);
}
@Override
- protected CompletableFuture<Optional<GetResult>> storeGet(String path) {
- return client.get(path).thenApply(res -> convertGetResult(path, res))
+ protected CompletableFuture<Optional<GetResult>> storeGet(String path,
Set<Option> opts) {
+ return client.get(path, getOptions(opts)).thenApply(res ->
convertGetResult(path, res))
.exceptionallyCompose(this::convertException);
}
@Override
- protected CompletableFuture<Void> storeDelete(String path, Optional<Long>
expectedVersion) {
- return getChildrenFromStore(path)
+ protected CompletableFuture<Void> storeDelete(String path, Optional<Long>
expectedVersion, Set<Option> opts) {
+ return getChildrenFromStore(path, opts)
.thenCompose(
children -> {
if (!children.isEmpty()) {
return CompletableFuture.failedFuture(
new MetadataStoreException("Key '" +
path + "' has children"));
} else {
- Set<DeleteOption> delOption =
- expectedVersion
- .map(v ->
Collections.singleton(DeleteOption.IfVersionIdEquals(v)))
-
.orElse(Collections.emptySet());
+ Set<DeleteOption> delOption =
deleteOptions(opts, expectedVersion);
CompletableFuture<Boolean> result =
client.delete(path, delOption);
return result
.thenCompose(
@@ -211,7 +210,7 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
}
@Override
- protected CompletableFuture<Void> storeScanChildren(String parentPath,
ScanConsumer consumer) {
+ protected CompletableFuture<Void> storeScanChildren(String parentPath,
ScanConsumer consumer, Set<Option> opts) {
// Oxia's hierarchical sort makes [parentPath + "/", parentPath +
"//") the canonical
// range covering exactly the immediate children — same convention
getChildrenFromStore
// uses with `client.list(...)`.
@@ -237,7 +236,7 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
consumer.onCompleted();
done.complete(null);
}
- });
+ }, rangeScanOptions(opts));
} catch (Throwable t) {
consumer.onError(t);
done.completeExceptionally(t);
@@ -248,12 +247,14 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
@Override
protected CompletableFuture<List<GetResult>> storeFindByIndex(
String scanPathPrefix, String indexName, String secondaryKey,
- Predicate<GetResult> fallbackFilter) {
+ Predicate<GetResult> fallbackFilter, Set<Option> opts) {
String scopedKey = scanPathPrefix + "/" + secondaryKey;
- return client.list(scopedKey, scopedKey + "~",
Set.of(ListOption.UseIndex(indexName)))
+ Set<ListOption> listOpts = new HashSet<>(listOptions(opts));
+ listOpts.add(ListOption.UseIndex(indexName));
+ return client.list(scopedKey, scopedKey + "~", listOpts)
.thenCompose(primaryKeys -> {
List<CompletableFuture<Optional<GetResult>>> futures =
primaryKeys.stream()
- .map(this::storeGet)
+ .map(p -> storeGet(p, opts))
.toList();
return FutureUtil.waitForAll(futures)
.thenApply(__ -> futures.stream()
@@ -307,6 +308,10 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
if (ephemeral) {
putOptions.add(PutOption.AsEphemeralRecord);
}
+ String partitionKey = OptionsHelper.partitionKey(opts);
+ if (partitionKey != null) {
+ putOptions.add(PutOption.PartitionKey(partitionKey));
+ }
var parentPath = parent(path);
var parentPrefix = parentPath == null ? "" : parentPath;
secondaryIndexes.forEach((indexName, secondaryKey) ->
@@ -324,6 +329,41 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
});
}
+ /** Build the Oxia {@link GetOption} set from {@code opts}, currently
routing the partition key. */
+ private static Set<GetOption> getOptions(Set<Option> opts) {
+ String partitionKey = OptionsHelper.partitionKey(opts);
+ return partitionKey == null ? Set.of() :
Set.of(GetOption.PartitionKey(partitionKey));
+ }
+
+ /** Build the Oxia {@link ListOption} set from {@code opts}, currently
routing the partition key. */
+ private static Set<ListOption> listOptions(Set<Option> opts) {
+ String partitionKey = OptionsHelper.partitionKey(opts);
+ return partitionKey == null ? Set.of() :
Set.of(ListOption.PartitionKey(partitionKey));
+ }
+
+ /**
+ * Build the Oxia {@link DeleteOption} set from {@code opts} together with
the optional expected
+ * version.
+ */
+ private static Set<DeleteOption> deleteOptions(Set<Option> opts,
Optional<Long> expectedVersion) {
+ String partitionKey = OptionsHelper.partitionKey(opts);
+ if (partitionKey == null && expectedVersion.isEmpty()) {
+ return Set.of();
+ }
+ Set<DeleteOption> result = new HashSet<>();
+ expectedVersion.ifPresent(v ->
result.add(DeleteOption.IfVersionIdEquals(v)));
+ if (partitionKey != null) {
+ result.add(DeleteOption.PartitionKey(partitionKey));
+ }
+ return result;
+ }
+
+ /** Build the Oxia {@link RangeScanOption} set from {@code opts},
currently routing the partition key. */
+ private static Set<RangeScanOption> rangeScanOptions(Set<Option> opts) {
+ String partitionKey = OptionsHelper.partitionKey(opts);
+ return partitionKey == null ? Set.of() :
Set.of(RangeScanOption.PartitionKey(partitionKey));
+ }
+
private <T> CompletionStage<T> convertException(Throwable ex) {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof UnexpectedVersionIdException || actEx instanceof
KeyAlreadyExistsException) {
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaPartitionKeyTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaPartitionKeyTest.java
new file mode 100644
index 00000000000..6dbe6c363dd
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaPartitionKeyTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import io.oxia.testcontainers.OxiaContainer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Option;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests for {@link Option.PartitionKey} routing on a multi-shard
Oxia cluster.
+ *
+ * <p>These tests verify that the {@link Option.PartitionKey} option is
plumbed end-to-end through
+ * the {@code MetadataStore} API down into the Oxia client. They run against a
3-shard cluster so
+ * routed operations exercise the cross-shard path, but they do not assert
which shard a particular
+ * key lands in (Oxia's hash function is an implementation detail).
+ */
+public class OxiaPartitionKeyTest {
+
+ private static final int SHARDS = 3;
+ private OxiaContainer oxiaServer;
+
+ @BeforeClass
+ public void start() {
+ oxiaServer = new
OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME).withShards(SHARDS);
+ oxiaServer.start();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void stop() {
+ if (oxiaServer != null) {
+ oxiaServer.close();
+ oxiaServer = null;
+ }
+ }
+
+ private MetadataStore newStore() throws Exception {
+ return MetadataStoreFactory.create(
+ "oxia://" + oxiaServer.getServiceAddress(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+ }
+
+ @Test
+ public void putAndGetRoundTripWithPartitionKey() throws Exception {
+ @Cleanup
+ MetadataStore store = newStore();
+
+ String path = "/partition-key-roundtrip-" + System.nanoTime();
+ byte[] value = "hello".getBytes(StandardCharsets.UTF_8);
+ Set<Option> opts = Set.of(new Option.PartitionKey("group-A"));
+
+ store.put(path, value, Optional.empty(), opts).get();
+
+ var result = store.get(path, opts).get();
+ assertTrue(result.isPresent(), "value should be reachable with the
same partition key");
+ assertEquals(result.get().getValue(), value);
+ }
+
+ @Test
+ public void deleteUsesPartitionKey() throws Exception {
+ @Cleanup
+ MetadataStore store = newStore();
+
+ String path = "/partition-key-delete-" + System.nanoTime();
+ Set<Option> opts = Set.of(new Option.PartitionKey("group-X"));
+
+ store.put(path, "v".getBytes(StandardCharsets.UTF_8),
Optional.empty(), opts).get();
+ assertTrue(store.get(path, opts).get().isPresent());
+
+ store.delete(path, Optional.empty(), opts).get();
+ assertFalse(store.get(path, opts).get().isPresent());
+ }
+
+ @Test
+ public void getChildrenWithPartitionKey() throws Exception {
+ @Cleanup
+ MetadataStore store = newStore();
+
+ String parent = "/partition-key-children-" + System.nanoTime();
+ Set<Option> opts = Set.of(new Option.PartitionKey("group-Y"));
+
+ store.put(parent + "/a", "1".getBytes(StandardCharsets.UTF_8),
Optional.empty(), opts).get();
+ store.put(parent + "/b", "2".getBytes(StandardCharsets.UTF_8),
Optional.empty(), opts).get();
+
+ List<String> children = store.getChildren(parent, opts).get();
+ assertTrue(children.containsAll(List.of("a", "b")),
+ "expected children a and b, got: " + children);
+ }
+}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index fdb636d7a7e..a20d7affce7 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -100,22 +100,22 @@ public class MetadataStoreFactoryImplTest {
}
@Override
- public CompletableFuture<List<String>> getChildrenFromStore(String
path) {
+ public CompletableFuture<List<String>> getChildrenFromStore(String
path, Set<Option> opts) {
return null;
}
@Override
- protected CompletableFuture<Boolean> existsFromStore(String path) {
+ protected CompletableFuture<Boolean> existsFromStore(String path,
Set<Option> opts) {
return null;
}
@Override
- protected CompletableFuture<Optional<GetResult>> storeGet(String path)
{
+ protected CompletableFuture<Optional<GetResult>> storeGet(String path,
Set<Option> opts) {
return null;
}
@Override
- protected CompletableFuture<Void> storeDelete(String path,
Optional<Long> expectedVersion) {
+ protected CompletableFuture<Void> storeDelete(String path,
Optional<Long> expectedVersion, Set<Option> opts) {
return null;
}