This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3650b1e PIP-45: Converted bookies REST endpoint to use metadata store
(#11210)
3650b1e is described below
commit 3650b1e08eca47bdf904262af788e17b857d9045
Author: Matteo Merli <[email protected]>
AuthorDate: Sat Jul 3 19:22:48 2021 +0200
PIP-45: Converted bookies REST endpoint to use metadata store (#11210)
* PIP-45: Converted bookies REST endpoint to use metadata store
* Removed unused imports
---
.../pulsar/broker/resources/BookieResources.java | 43 +++++++
.../pulsar/broker/resources/PulsarResources.java | 3 +
.../org/apache/pulsar/broker/admin/v2/Bookies.java | 125 +++++++++++----------
.../pulsar/broker/web/PulsarWebResource.java | 5 +
4 files changed, 116 insertions(+), 60 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BookieResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BookieResources.java
new file mode 100644
index 0000000..2190999
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BookieResources.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
+
+public class BookieResources extends BaseResources<BookiesRackConfiguration> {
+
+ public BookieResources(MetadataStoreExtended store, int
operationTimeoutSec) {
+ super(store, BookiesRackConfiguration.class, operationTimeoutSec);
+ }
+
+ public CompletableFuture<Optional<BookiesRackConfiguration>> get() {
+ return getAsync(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
+ }
+
+ public CompletableFuture<Void>
update(Function<Optional<BookiesRackConfiguration>,
+ BookiesRackConfiguration> modifyFunction) {
+ return
getCache().readModifyUpdateOrCreate(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
+ modifyFunction).thenApply(__ -> null);
+ }
+}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index fa4853a..1e82e78 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -38,6 +38,8 @@ public class PulsarResources {
private DynamicConfigurationResources dynamicConfigResources;
private LocalPoliciesResources localPolicies;
private LoadManagerReportResources loadReportResources;
+ private BookieResources bookieResources;
+
private Optional<MetadataStoreExtended> localMetadataStore;
private Optional<MetadataStoreExtended> configurationMetadataStore;
@@ -56,6 +58,7 @@ public class PulsarResources {
dynamicConfigResources = new
DynamicConfigurationResources(localMetadataStore, operationTimeoutSec);
localPolicies = new LocalPoliciesResources(localMetadataStore,
operationTimeoutSec);
loadReportResources = new
LoadManagerReportResources(localMetadataStore, operationTimeoutSec);
+ bookieResources = new BookieResources(localMetadataStore,
operationTimeoutSec);
}
this.localMetadataStore = Optional.ofNullable(localMetadataStore);
this.configurationMetadataStore =
Optional.ofNullable(configurationMetadataStore);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
index 57e5e47..1af839f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
@@ -24,7 +24,6 @@ import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import javax.ws.rs.DELETE;
@@ -34,8 +33,12 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.meta.MetadataClientDriver;
@@ -46,15 +49,11 @@ import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesClusterInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.policies.data.RawBookieInfo;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Path("/bookies")
@Api(value = "/bookies", description = "Configure bookies rack placement",
tags = "bookies")
@Produces(MediaType.APPLICATION_JSON)
+@Slf4j
public class Bookies extends AdminResource {
@GET
@@ -62,13 +61,16 @@ public class Bookies extends AdminResource {
@ApiOperation(value = "Gets the rack placement information for all the
bookies in the cluster",
response = BookiesRackConfiguration.class)
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission")})
- public BookiesRackConfiguration getBookiesRackInfo() throws Exception {
+ public void getBookiesRackInfo(@Suspended final AsyncResponse
asyncResponse) {
validateSuperUserAccess();
- return
localZkCache().getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
- (key, content) ->
-
ObjectMapperFactory.getThreadLocal().readValue(content,
BookiesRackConfiguration.class))
- .orElse(new BookiesRackConfiguration());
+ getPulsarResources().getBookieResources().get()
+ .thenAccept(b -> {
+ asyncResponse.resume(b.orElseGet(() -> new
BookiesRackConfiguration()));
+ }).exceptionally(ex -> {
+ asyncResponse.resume(ex);
+ return null;
+ });
}
@GET
@@ -97,44 +99,52 @@ public class Bookies extends AdminResource {
@ApiOperation(value = "Gets the rack placement information for a specific
bookie in the cluster",
response = BookieInfo.class)
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission")})
- public BookieInfo getBookieRackInfo(@PathParam("bookie") String
bookieAddress) throws Exception {
+ public void getBookieRackInfo(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("bookie") String bookieAddress)
throws Exception {
validateSuperUserAccess();
- BookiesRackConfiguration racks = localZkCache()
- .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
(key, content) -> ObjectMapperFactory
- .getThreadLocal().readValue(content,
BookiesRackConfiguration.class))
- .orElse(new BookiesRackConfiguration());
-
- return racks.getBookie(bookieAddress)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Bookie
address not found: " + bookieAddress));
+ getPulsarResources().getBookieResources().get()
+ .thenAccept(b -> {
+ Optional<BookieInfo> bi = b.orElseGet(() -> new
BookiesRackConfiguration())
+ .getBookie(bookieAddress);
+ if (bi.isPresent()) {
+ asyncResponse.resume(bi.get());
+ } else {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND,
+ "Bookie address not found: " + bookieAddress));
+ }
+ }).exceptionally(ex -> {
+ asyncResponse.resume(ex);
+ return null;
+ });
}
@DELETE
@Path("/racks-info/{bookie}")
@ApiOperation(value = "Removed the rack placement information for a
specific bookie in the cluster")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission") })
- public void deleteBookieRackInfo(@PathParam("bookie") String
bookieAddress) throws Exception {
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission")})
+ public void deleteBookieRackInfo(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("bookie") String
bookieAddress) throws Exception {
validateSuperUserAccess();
-
- Optional<Entry<BookiesRackConfiguration, Stat>> entry = localZkCache()
- .getEntry(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, (key,
content) -> ObjectMapperFactory
- .getThreadLocal().readValue(content,
BookiesRackConfiguration.class));
-
- if (entry.isPresent()) {
- BookiesRackConfiguration racks = entry.get().getKey();
- if (!racks.removeBookie(bookieAddress)) {
- throw new RestException(Status.NOT_FOUND, "Bookie address not
found: " + bookieAddress);
- } else {
-
localZk().setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
- jsonMapper().writeValueAsBytes(racks),
- entry.get().getValue().getVersion());
- log.info("Removed {} from rack mapping info", bookieAddress);
- }
- } else {
- throw new RestException(Status.NOT_FOUND, "Bookie rack placement
info is not found");
- }
-
+ getPulsarResources().getBookieResources()
+ .update(optionalBookiesRackConfiguration -> {
+ BookiesRackConfiguration brc =
optionalBookiesRackConfiguration
+ .orElseGet(() -> new BookiesRackConfiguration());
+
+ if (!brc.removeBookie(bookieAddress)) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND,
+ "Bookie address not found: " + bookieAddress));
+ }
+
+ return brc;
+ }).thenAccept(__ -> {
+ log.info("Removed {} from rack mapping info", bookieAddress);
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ asyncResponse.resume(ex);
+ return null;
+ });
}
@POST
@@ -142,7 +152,9 @@ public class Bookies extends AdminResource {
@ApiOperation(value = "Updates the rack placement information for a
specific bookie in the cluster (note."
+ " bookie address format:`address:port`)")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission")})
- public void updateBookieRackInfo(@PathParam("bookie") String
bookieAddress, @QueryParam("group") String group,
+ public void updateBookieRackInfo(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("bookie") String bookieAddress,
+ @QueryParam("group") String group,
BookieInfo bookieInfo) throws Exception {
validateSuperUserAccess();
@@ -150,27 +162,20 @@ public class Bookies extends AdminResource {
throw new RestException(Status.PRECONDITION_FAILED, "Bookie
'group' parameters is missing");
}
- Optional<Entry<BookiesRackConfiguration, Stat>> entry = localZkCache()
- .getEntry(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
(key, content) -> ObjectMapperFactory
- .getThreadLocal().readValue(content,
BookiesRackConfiguration.class));
+ getPulsarResources().getBookieResources()
+ .update(optionalBookiesRackConfiguration -> {
+ BookiesRackConfiguration brc =
optionalBookiesRackConfiguration
+ .orElseGet(() -> new BookiesRackConfiguration());
- if (entry.isPresent()) {
- // Update the racks info
- BookiesRackConfiguration racks = entry.get().getKey();
- racks.updateBookie(group, bookieAddress, bookieInfo);
+ brc.updateBookie(group, bookieAddress, bookieInfo);
-
localZk().setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper().writeValueAsBytes(racks),
- entry.get().getValue().getVersion());
-
localZkCache().invalidate(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
+ return brc;
+ }).thenAccept(__ -> {
log.info("Updated rack mapping info for {}", bookieAddress);
- } else {
- // Creates the z-node with racks info
- BookiesRackConfiguration racks = new BookiesRackConfiguration();
- racks.updateBookie(group, bookieAddress, bookieInfo);
- localZKCreate(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper().writeValueAsBytes(racks));
- log.info("Created rack mapping info and added {}", bookieAddress);
- }
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ asyncResponse.resume(ex);
+ return null;
+ });
}
-
- private static final Logger log = LoggerFactory.getLogger(Bookies.class);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index cc23438..d001262 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -60,6 +60,7 @@ import
org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.BaseResources;
+import org.apache.pulsar.broker.resources.BookieResources;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
@@ -900,6 +901,10 @@ public abstract class PulsarWebResource {
return pulsar().getPulsarResources().getClusterResources();
}
+ protected BookieResources bookieResources() {
+ return pulsar().getPulsarResources().getBookieResources();
+ }
+
protected NamespaceResources namespaceResources() {
return pulsar().getPulsarResources().getNamespaceResources();
}