This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3650b1e  PIP-45: Converted bookies REST endpoint to use metadata store 
(#11210)
3650b1e is described below

commit 3650b1e08eca47bdf904262af788e17b857d9045
Author: Matteo Merli <[email protected]>
AuthorDate: Sat Jul 3 19:22:48 2021 +0200

    PIP-45: Converted bookies REST endpoint to use metadata store (#11210)
    
    * PIP-45: Converted bookies REST endpoint to use metadata store
    
    * Removed unused imports
---
 .../pulsar/broker/resources/BookieResources.java   |  43 +++++++
 .../pulsar/broker/resources/PulsarResources.java   |   3 +
 .../org/apache/pulsar/broker/admin/v2/Bookies.java | 125 +++++++++++----------
 .../pulsar/broker/web/PulsarWebResource.java       |   5 +
 4 files changed, 116 insertions(+), 60 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BookieResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BookieResources.java
new file mode 100644
index 0000000..2190999
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BookieResources.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
+
+public class BookieResources extends BaseResources<BookiesRackConfiguration> {
+
+    public BookieResources(MetadataStoreExtended store, int 
operationTimeoutSec) {
+        super(store, BookiesRackConfiguration.class, operationTimeoutSec);
+    }
+
+    public CompletableFuture<Optional<BookiesRackConfiguration>> get() {
+        return getAsync(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
+    }
+
+    public CompletableFuture<Void> 
update(Function<Optional<BookiesRackConfiguration>,
+            BookiesRackConfiguration> modifyFunction) {
+        return 
getCache().readModifyUpdateOrCreate(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
+                modifyFunction).thenApply(__ -> null);
+    }
+}
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index fa4853a..1e82e78 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -38,6 +38,8 @@ public class PulsarResources {
     private DynamicConfigurationResources dynamicConfigResources;
     private LocalPoliciesResources localPolicies;
     private LoadManagerReportResources loadReportResources;
+    private BookieResources bookieResources;
+
     private Optional<MetadataStoreExtended> localMetadataStore;
     private Optional<MetadataStoreExtended> configurationMetadataStore;
 
@@ -56,6 +58,7 @@ public class PulsarResources {
             dynamicConfigResources = new 
DynamicConfigurationResources(localMetadataStore, operationTimeoutSec);
             localPolicies = new LocalPoliciesResources(localMetadataStore, 
operationTimeoutSec);
             loadReportResources = new 
LoadManagerReportResources(localMetadataStore, operationTimeoutSec);
+            bookieResources = new BookieResources(localMetadataStore, 
operationTimeoutSec);
         }
         this.localMetadataStore = Optional.ofNullable(localMetadataStore);
         this.configurationMetadataStore = 
Optional.ofNullable(configurationMetadataStore);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
index 57e5e47..1af839f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
@@ -24,7 +24,6 @@ import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import javax.ws.rs.DELETE;
@@ -34,8 +33,12 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.meta.MetadataClientDriver;
@@ -46,15 +49,11 @@ import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.policies.data.BookiesClusterInfo;
 import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
 import org.apache.pulsar.common.policies.data.RawBookieInfo;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Path("/bookies")
 @Api(value = "/bookies", description = "Configure bookies rack placement", 
tags = "bookies")
 @Produces(MediaType.APPLICATION_JSON)
+@Slf4j
 public class Bookies extends AdminResource {
 
     @GET
@@ -62,13 +61,16 @@ public class Bookies extends AdminResource {
     @ApiOperation(value = "Gets the rack placement information for all the 
bookies in the cluster",
             response = BookiesRackConfiguration.class)
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission")})
-    public BookiesRackConfiguration getBookiesRackInfo() throws Exception {
+    public void getBookiesRackInfo(@Suspended final AsyncResponse 
asyncResponse) {
         validateSuperUserAccess();
 
-        return 
localZkCache().getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
-                (key, content) ->
-                        
ObjectMapperFactory.getThreadLocal().readValue(content, 
BookiesRackConfiguration.class))
-                .orElse(new BookiesRackConfiguration());
+        getPulsarResources().getBookieResources().get()
+                .thenAccept(b -> {
+                    asyncResponse.resume(b.orElseGet(() -> new 
BookiesRackConfiguration()));
+                }).exceptionally(ex -> {
+            asyncResponse.resume(ex);
+            return null;
+        });
     }
 
     @GET
@@ -97,44 +99,52 @@ public class Bookies extends AdminResource {
     @ApiOperation(value = "Gets the rack placement information for a specific 
bookie in the cluster",
             response = BookieInfo.class)
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission")})
-    public BookieInfo getBookieRackInfo(@PathParam("bookie") String 
bookieAddress) throws Exception {
+    public void getBookieRackInfo(@Suspended final AsyncResponse asyncResponse,
+                                  @PathParam("bookie") String bookieAddress) 
throws Exception {
         validateSuperUserAccess();
 
-        BookiesRackConfiguration racks = localZkCache()
-                .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
(key, content) -> ObjectMapperFactory
-                        .getThreadLocal().readValue(content, 
BookiesRackConfiguration.class))
-                .orElse(new BookiesRackConfiguration());
-
-        return racks.getBookie(bookieAddress)
-                .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Bookie 
address not found: " + bookieAddress));
+        getPulsarResources().getBookieResources().get()
+                .thenAccept(b -> {
+                    Optional<BookieInfo> bi = b.orElseGet(() -> new 
BookiesRackConfiguration())
+                            .getBookie(bookieAddress);
+                    if (bi.isPresent()) {
+                        asyncResponse.resume(bi.get());
+                    } else {
+                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                "Bookie address not found: " + bookieAddress));
+                    }
+                }).exceptionally(ex -> {
+            asyncResponse.resume(ex);
+            return null;
+        });
     }
 
     @DELETE
     @Path("/racks-info/{bookie}")
     @ApiOperation(value = "Removed the rack placement information for a 
specific bookie in the cluster")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission") })
-    public void deleteBookieRackInfo(@PathParam("bookie") String 
bookieAddress) throws Exception {
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission")})
+    public void deleteBookieRackInfo(@Suspended final AsyncResponse 
asyncResponse,
+                                     @PathParam("bookie") String 
bookieAddress) throws Exception {
         validateSuperUserAccess();
 
-
-        Optional<Entry<BookiesRackConfiguration, Stat>> entry = localZkCache()
-            .getEntry(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, (key, 
content) -> ObjectMapperFactory
-                .getThreadLocal().readValue(content, 
BookiesRackConfiguration.class));
-
-        if (entry.isPresent()) {
-            BookiesRackConfiguration racks = entry.get().getKey();
-            if (!racks.removeBookie(bookieAddress)) {
-                throw new RestException(Status.NOT_FOUND, "Bookie address not 
found: " + bookieAddress);
-            } else {
-                
localZk().setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
-                    jsonMapper().writeValueAsBytes(racks),
-                    entry.get().getValue().getVersion());
-                log.info("Removed {} from rack mapping info", bookieAddress);
-            }
-        } else {
-            throw new RestException(Status.NOT_FOUND, "Bookie rack placement 
info is not found");
-        }
-
+        getPulsarResources().getBookieResources()
+                .update(optionalBookiesRackConfiguration -> {
+                    BookiesRackConfiguration brc = 
optionalBookiesRackConfiguration
+                            .orElseGet(() -> new BookiesRackConfiguration());
+
+                    if (!brc.removeBookie(bookieAddress)) {
+                        asyncResponse.resume(new 
RestException(Status.NOT_FOUND,
+                                "Bookie address not found: " + bookieAddress));
+                    }
+
+                    return brc;
+                }).thenAccept(__ -> {
+            log.info("Removed {} from rack mapping info", bookieAddress);
+            asyncResponse.resume(Response.noContent().build());
+        }).exceptionally(ex -> {
+            asyncResponse.resume(ex);
+            return null;
+        });
     }
 
     @POST
@@ -142,7 +152,9 @@ public class Bookies extends AdminResource {
     @ApiOperation(value = "Updates the rack placement information for a 
specific bookie in the cluster (note."
             + " bookie address format:`address:port`)")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission")})
-    public void updateBookieRackInfo(@PathParam("bookie") String 
bookieAddress, @QueryParam("group") String group,
+    public void updateBookieRackInfo(@Suspended final AsyncResponse 
asyncResponse,
+                                     @PathParam("bookie") String bookieAddress,
+                                     @QueryParam("group") String group,
             BookieInfo bookieInfo) throws Exception {
         validateSuperUserAccess();
 
@@ -150,27 +162,20 @@ public class Bookies extends AdminResource {
             throw new RestException(Status.PRECONDITION_FAILED, "Bookie 
'group' parameters is missing");
         }
 
-        Optional<Entry<BookiesRackConfiguration, Stat>> entry = localZkCache()
-                .getEntry(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
(key, content) -> ObjectMapperFactory
-                        .getThreadLocal().readValue(content, 
BookiesRackConfiguration.class));
+        getPulsarResources().getBookieResources()
+                .update(optionalBookiesRackConfiguration -> {
+                    BookiesRackConfiguration brc = 
optionalBookiesRackConfiguration
+                            .orElseGet(() -> new BookiesRackConfiguration());
 
-        if (entry.isPresent()) {
-            // Update the racks info
-            BookiesRackConfiguration racks = entry.get().getKey();
-            racks.updateBookie(group, bookieAddress, bookieInfo);
+                    brc.updateBookie(group, bookieAddress, bookieInfo);
 
-            
localZk().setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper().writeValueAsBytes(racks),
-                    entry.get().getValue().getVersion());
-            
localZkCache().invalidate(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH);
+                    return brc;
+                }).thenAccept(__ -> {
             log.info("Updated rack mapping info for {}", bookieAddress);
-        } else {
-            // Creates the z-node with racks info
-            BookiesRackConfiguration racks = new BookiesRackConfiguration();
-            racks.updateBookie(group, bookieAddress, bookieInfo);
-            localZKCreate(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper().writeValueAsBytes(racks));
-            log.info("Created rack mapping info and added {}", bookieAddress);
-        }
+            asyncResponse.resume(Response.noContent().build());
+        }).exceptionally(ex -> {
+            asyncResponse.resume(ex);
+            return null;
+        });
     }
-
-    private static final Logger log = LoggerFactory.getLogger(Bookies.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index cc23438..d001262 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -60,6 +60,7 @@ import 
org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.BaseResources;
+import org.apache.pulsar.broker.resources.BookieResources;
 import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
 import org.apache.pulsar.broker.resources.LocalPoliciesResources;
@@ -900,6 +901,10 @@ public abstract class PulsarWebResource {
         return pulsar().getPulsarResources().getClusterResources();
     }
 
+    protected BookieResources bookieResources() {
+        return pulsar().getPulsarResources().getBookieResources();
+    }
+
     protected NamespaceResources namespaceResources() {
         return pulsar().getPulsarResources().getNamespaceResources();
     }

Reply via email to