This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 09364a95f84 [improve] Upgrade to Oxia client 0.2.0 (#22663)
09364a95f84 is described below
commit 09364a95f8429b12a5951d4d1ff45766b13e92cb
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 7 07:13:45 2024 -0700
[improve] Upgrade to Oxia client 0.2.0 (#22663)
---
distribution/licenses/LICENSE-Reactive-gRPC.txt | 29 ------------
distribution/server/src/assemble/LICENSE.bin.txt | 9 +---
pom.xml | 2 +-
.../metadata/impl/oxia/OxiaMetadataStore.java | 53 ++++++++++++----------
4 files changed, 33 insertions(+), 60 deletions(-)
diff --git a/distribution/licenses/LICENSE-Reactive-gRPC.txt
b/distribution/licenses/LICENSE-Reactive-gRPC.txt
deleted file mode 100644
index bc589401e7b..00000000000
--- a/distribution/licenses/LICENSE-Reactive-gRPC.txt
+++ /dev/null
@@ -1,29 +0,0 @@
-BSD 3-Clause License
-
-Copyright (c) 2019, Salesforce.com, Inc.
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
-* Redistributions of source code must retain the above copyright notice, this
- list of conditions and the following disclaimer.
-
-* Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
-* Neither the name of the copyright holder nor the names of its
- contributors may be used to endorse or promote products derived from
- this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
-FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
-CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
-OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt
b/distribution/server/src/assemble/LICENSE.bin.txt
index aec4df2a93a..818f389be88 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -481,12 +481,10 @@ The Apache Software License, Version 2.0
* Prometheus
- io.prometheus-simpleclient_httpserver-0.16.0.jar
* Oxia
- - io.streamnative.oxia-oxia-client-0.1.6.jar
- - io.streamnative.oxia-oxia-client-metrics-api-0.1.6.jar
+ - io.streamnative.oxia-oxia-client-api-0.2.0.jar
+ - io.streamnative.oxia-oxia-client-0.2.0.jar
* OpenHFT
- net.openhft-zero-allocation-hashing-0.16.jar
- * Project reactor
- - io.projectreactor-reactor-core-3.5.2.jar
* Java JSON WebTokens
- io.jsonwebtoken-jjwt-api-0.11.1.jar
- io.jsonwebtoken-jjwt-impl-0.11.1.jar
@@ -552,9 +550,6 @@ BSD 3-clause "New" or "Revised" License
* JSR305 -- com.google.code.findbugs-jsr305-3.0.2.jar --
../licenses/LICENSE-JSR305.txt
* JLine -- jline-jline-2.14.6.jar -- ../licenses/LICENSE-JLine.txt
* JLine3 -- org.jline-jline-3.21.0.jar -- ../licenses/LICENSE-JLine.txt
- * Reactive gRPC
- - com.salesforce.servicelibs-reactive-grpc-common-1.2.4.jar --
../licenses/LICENSE-Reactive-gRPC.txt
- - com.salesforce.servicelibs-reactor-grpc-stub-1.2.4.jar --
../licenses/LICENSE-Reactive-gRPC.txt
BSD 2-Clause License
* HdrHistogram -- org.hdrhistogram-HdrHistogram-2.1.9.jar --
../licenses/LICENSE-HdrHistogram.txt
diff --git a/pom.xml b/pom.xml
index 8f7ae2ed1fc..92e021d1eaa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -249,7 +249,7 @@ flexible messaging model and an intuitive client
API.</description>
<apache-http-client.version>4.5.13</apache-http-client.version>
<apache-httpcomponents.version>4.4.15</apache-httpcomponents.version>
<jetcd.version>0.7.5</jetcd.version>
- <oxia.version>0.1.6</oxia.version>
+ <oxia.version>0.2.0</oxia.version>
<snakeyaml.version>2.0</snakeyaml.version>
<ant.version>1.10.12</ant.version>
<seancfoley.ipaddress.version>5.3.3</seancfoley.ipaddress.version>
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 2ab744e2053..728bc1175b9 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -18,20 +18,23 @@
*/
package org.apache.pulsar.metadata.impl.oxia;
-import io.streamnative.oxia.client.OxiaClientBuilder;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.DeleteOption;
-import io.streamnative.oxia.client.api.KeyAlreadyExistsException;
import io.streamnative.oxia.client.api.Notification;
+import io.streamnative.oxia.client.api.OxiaClientBuilder;
import io.streamnative.oxia.client.api.PutOption;
import io.streamnative.oxia.client.api.PutResult;
-import io.streamnative.oxia.client.api.UnexpectedVersionIdException;
import io.streamnative.oxia.client.api.Version;
+import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException;
+import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException;
import java.time.Duration;
+import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -69,7 +72,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
this.synchronizer =
Optional.ofNullable(metadataStoreConfig.getSynchronizer());
identity = UUID.randomUUID().toString();
client =
- new OxiaClientBuilder(serviceAddress)
+ OxiaClientBuilder.create(serviceAddress)
.clientIdentifier(identity)
.namespace(namespace)
.sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis()))
@@ -153,14 +156,14 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
return getChildrenFromStore(path)
.thenCompose(
children -> {
- if (children.size() > 0) {
+ if (!children.isEmpty()) {
return CompletableFuture.failedFuture(
new MetadataStoreException("Key '" +
path + "' has children"));
} else {
- var delOption =
+ Set<DeleteOption> delOption =
expectedVersion
-
.map(DeleteOption::ifVersionIdEquals)
-
.orElse(DeleteOption.Unconditionally);
+ .map(v ->
Collections.singleton(DeleteOption.IfVersionIdEquals(v)))
+
.orElse(Collections.emptySet());
CompletableFuture<Boolean> result =
client.delete(path, delOption);
return result
.thenCompose(
@@ -205,20 +208,20 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
} else {
actualPath = CompletableFuture.completedFuture(path);
}
- var versionCondition =
- expectedVersion
- .map(
- ver -> {
- if (ver == -1) {
- return
PutOption.IfRecordDoesNotExist;
- }
- return
PutOption.ifVersionIdEquals(ver);
- })
- .orElse(PutOption.Unconditionally);
- var putOptions =
- options.contains(CreateOption.Ephemeral)
- ? new PutOption[]
{PutOption.AsEphemeralRecord, versionCondition}
- : new PutOption[] {versionCondition};
+ Set<PutOption> putOptions = new HashSet<>();
+ expectedVersion
+ .map(
+ ver -> {
+ if (ver == -1) {
+ return
PutOption.IfRecordDoesNotExist;
+ }
+ return
PutOption.IfVersionIdEquals(ver);
+ })
+ .ifPresent(putOptions::add);
+
+ if (options.contains(CreateOption.Ephemeral)) {
+ putOptions.add(PutOption.AsEphemeralRecord);
+ }
return actualPath
.thenCompose(
aPath ->
@@ -242,6 +245,10 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
}
}
+ private static final byte[] EMPTY_VALUE = new byte[0];
+ private static final Set<PutOption> IF_RECORD_DOES_NOT_EXIST =
+ Collections.singleton(PutOption.IfRecordDoesNotExist);
+
private CompletableFuture<Void> createParents(String path) {
var parent = parent(path);
if (parent == null || parent.isEmpty()) {
@@ -254,7 +261,7 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
return CompletableFuture.completedFuture(null);
} else {
return client
- .put(parent, new byte[] {},
PutOption.IfRecordDoesNotExist)
+ .put(parent, EMPTY_VALUE,
IF_RECORD_DOES_NOT_EXIST)
.thenCompose(__ ->
createParents(parent));
}
})