srkukarni closed pull request #2426: Added cli commands to get function cluster 
related information
URL: https://github.com/apache/incubator-pulsar/pull/2426
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 315027fd15..def5452dbb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -198,50 +198,6 @@ public Response listFunctions(final @PathParam("tenant") 
String tenant,
 
     }
 
-    @GET
-    @ApiOperation(
-            value = "Fetches information about the Pulsar cluster running 
Pulsar Functions",
-            response = WorkerInfo.class,
-            responseContainer = "List"
-    )
-    @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
-
-    })
-    @Path("/cluster")
-    @Produces(MediaType.APPLICATION_JSON)
-    public List<WorkerInfo> getCluster() {
-        return functions.getCluster();
-    }
-
-    @GET
-    @ApiOperation(
-            value = "Fetches info about the leader node of the Pulsar cluster 
running Pulsar Functions",
-            response = WorkerInfo.class
-    )
-    @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
-
-    })
-    @Path("/cluster/leader")
-    public WorkerInfo getClusterLeader() {
-        return functions.getClusterLeader();
-    }
-
-    @GET
-    @ApiOperation(
-            value = "Fetches information about which Pulsar Functions are 
assigned to which Pulsar clusters",
-            response = Assignment.class,
-            responseContainer = "Map"
-    )
-    @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
-    })
-    @Path("/assignments")
-    public Response getAssignments() {
-        return functions.getAssignments();
-    }
-
     @POST
     @ApiOperation(
             value = "Triggers a Pulsar Function with a user-specified value or 
file data",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
new file mode 100644
index 0000000000..46427cbbd7
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import java.util.function.Supplier;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.worker.WorkerService;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
+
+@Slf4j
+@Path("/worker")
+public class Worker extends AdminResource implements Supplier<WorkerService> {
+
+    private final WorkerImpl worker;
+
+    public Worker() {
+        this.worker = new WorkerImpl(this);
+    }
+
+    @Override
+    public WorkerService get() {
+        return pulsar().getWorkerService();
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about the Pulsar cluster running 
Pulsar Functions"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
+
+    })
+    @Path("/cluster")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getCluster() {
+        return worker.getCluster();
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches info about the leader node of the Pulsar cluster 
running Pulsar Functions")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
+
+    })
+    @Path("/cluster/leader")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getClusterLeader() {
+        return worker.getClusterLeader();
+    }
+
+    @GET
+    @ApiOperation(
+            value = "Fetches information about which Pulsar Functions are 
assigned to which Pulsar clusters",
+            response = Function.Assignment.class,
+            responseContainer = "Map"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
+    })
+    @Path("/assignments")
+    public Response getAssignments() {
+        return worker.getAssignments();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
index 5a0e4b7b61..1a9c7ad10a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
@@ -18,39 +18,51 @@
  */
 package org.apache.pulsar.broker.admin.v2;
 
-import java.io.IOException;
-import java.util.Collection;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.core.Response;
-
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.function.Supplier;
 
 @Slf4j
 @Path("/worker-stats")
-public class WorkerStats extends FunctionApiResource {
+public class WorkerStats extends AdminResource implements 
Supplier<WorkerService> {
 
-    @GET
-    @Path("/functions")
-    @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = Metrics.class)
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 503, message = "Worker service is not 
running") })
-    public Response getStats() throws IOException {
-        return functions.getFunctionsMetrcis(clientAppId());
+    private final WorkerImpl worker;
+
+    public WorkerStats() {
+        this.worker = new WorkerImpl(this);
+    }
+
+    @Override
+    public WorkerService get() {
+        return pulsar().getWorkerService();
     }
-    
+
     @GET
     @Path("/metrics")
     @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request 
should be executed by Monitoring agent on each worker to fetch the 
worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, 
responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission") })
     public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() 
throws Exception {
-        return functions.getWorkerMetrcis(clientAppId());
+        return worker.getWorkerMetrcis(clientAppId());
+    }
+
+    @GET
+    @Path("/functionsmetrics")
+    @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = Metrics.class)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 503, message = "Worker service is not 
running") })
+    public Response getStats() throws IOException {
+        return worker.getFunctionsMetrics(clientAppId());
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index 427976ff5d..a758c87420 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -209,12 +209,4 @@ private WorkerService 
createPulsarFunctionWorker(ServiceConfiguration config) {
 
         return new WorkerService(workerConfig);
     }
-
-    @Test
-    public void testGetWokersApi() throws Exception {
-        List<WorkerInfo> workers = admin.functions().getCluster();
-        Assert.assertEquals(workers.size(), 1);
-        Assert.assertEquals(workers.get(0).getPort(), workerServicePort);
-    }
-
 }
\ No newline at end of file
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 2fc1796492..96e9cbc916 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -320,11 +320,4 @@
      *
      */
     Set<String> getSinks() throws PulsarAdminException;
-
-    /**
-     * Get list of workers present under a cluster
-     * @return
-     * @throws PulsarAdminException
-     */
-    List<WorkerInfo> getCluster() throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 7a41fe9809..a1502b0b05 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -43,7 +43,7 @@
 import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
 import org.apache.pulsar.client.admin.internal.SchemasImpl;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
-import org.apache.pulsar.client.admin.internal.WorkerStatsImpl;
+import org.apache.pulsar.client.admin.internal.WorkerImpl;
 import org.apache.pulsar.client.admin.internal.TenantsImpl;
 import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
 import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
@@ -85,7 +85,7 @@
     private final String serviceUrl;
     private final Lookup lookups;
     private final Functions functions;
-    private final WorkerStats workerStats;
+    private final Worker worker;
     private final Schemas schemas;
     protected final WebTarget root;
     protected final Authentication auth;
@@ -189,7 +189,7 @@ public PulsarAdmin(String serviceUrl, 
ClientConfigurationData clientConfigData)
         this.resourceQuotas = new ResourceQuotasImpl(root, auth);
         this.lookups = new LookupImpl(root, auth, useTls);
         this.functions = new FunctionsImpl(root, auth);
-        this.workerStats = new WorkerStatsImpl(root, auth);
+        this.worker = new WorkerImpl(root, auth);
         this.schemas = new SchemasImpl(root, auth);
         this.bookies = new BookiesImpl(root, auth);
     }
@@ -361,8 +361,8 @@ public Functions functions() {
     *
     * @return the Worker stats
     */
-   public WorkerStats workerStats() {
-       return workerStats;
+   public Worker worker() {
+       return worker;
    }
     
     /**
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
 b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
similarity index 66%
rename from 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
rename to 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
index 4fc242f1ed..709b713840 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
@@ -19,13 +19,16 @@
 package org.apache.pulsar.client.admin;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.worker.WorkerInfo;
 
 /**
  * Admin interface for worker stats management.
  */
-public interface WorkerStats {
+public interface Worker {
     
     
     /**
@@ -41,4 +44,25 @@
      * @throws PulsarAdminException
      */
     Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws 
PulsarAdminException;
+
+    /**
+     * Get List of all workers belonging to this cluster
+     * @return
+     * @throws PulsarAdminException
+     */
+    List<WorkerInfo> getCluster() throws PulsarAdminException;
+
+    /**
+     * Get the worker who is the leader of the cluster
+     * @return
+     * @throws PulsarAdminException
+     */
+    WorkerInfo getClusterLeader() throws PulsarAdminException;
+
+    /**
+     * Get the function assignment among the cluster
+     * @return
+     * @throws PulsarAdminException
+     */
+    Map<String, Collection<String>> getAssignments() throws 
PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
similarity index 56%
rename from 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
rename to 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index f492d31125..4ca46ccf3f 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -20,35 +20,42 @@
 
 import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;
 
+import java.lang.reflect.Type;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.Response;
 
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.WorkerStats;
+import org.apache.pulsar.client.admin.Worker;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.WorkerInfo;
 
 @Slf4j
-public class WorkerStatsImpl extends BaseResource implements WorkerStats {
+public class WorkerImpl extends BaseResource implements Worker {
 
     private final WebTarget workerStats;
+    private final WebTarget worker;
 
-    public WorkerStatsImpl(WebTarget web, Authentication auth) {
+    public WorkerImpl(WebTarget web, Authentication auth) {
         super(auth);
-        this.workerStats = web.path("/admin/worker-stats");
+        this.worker = web.path("/admin/v2/worker");
+        this.workerStats = web.path("/admin/v2/worker-stats");
     }
 
     @Override
     public Metrics getFunctionsStats() throws PulsarAdminException {
         try {
-            Response response = request(workerStats.path("functions")).get();
+            Response response = 
request(workerStats.path("functionsmetrics")).get();
            if (!response.getStatusInfo().equals(Response.Status.OK)) {
                throw new ClientErrorException(response);
            }
@@ -71,4 +78,41 @@ public Metrics getFunctionsStats() throws 
PulsarAdminException {
             throw getApiException(e);
         }
     }
+
+    @Override
+    public List<WorkerInfo> getCluster() throws PulsarAdminException {
+        try {
+            return request(worker.path("cluster"))
+                    .get(new GenericType<List<WorkerInfo>>() {
+                    });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public WorkerInfo getClusterLeader() throws PulsarAdminException {
+        try {
+            return request(worker.path("cluster").path("leader"))
+                    .get(new GenericType<WorkerInfo>(){});
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public Map<String, Collection<String>> getAssignments() throws 
PulsarAdminException {
+        try {
+            Response response = request(worker.path("assignments")).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            String jsonResponse = response.readEntity(String.class);
+            Type type = new TypeToken<Map<String, 
Collection<String>>>(){}.getType();
+            Map<String, Collection<String>> assignments = new 
Gson().fromJson(jsonResponse, type);
+            return assignments;
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
similarity index 51%
rename from 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
rename to 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
index 6bd22ae2c7..9122375889 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
@@ -30,10 +30,15 @@
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.WorkerInfo;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 
 @Slf4j
 @Parameters(commandDescription = "Operations to collect function-worker 
statistics")
-public class CmdFunctionWorkerStats extends CmdBase {
+public class CmdFunctionWorker extends CmdBase {
 
     /**
      * Base command
@@ -60,7 +65,7 @@ void processArguments() throws Exception {
 
         @Override
         void runCmd() throws Exception {
-            String json = 
Utils.printJson(admin.workerStats().getFunctionsStats());
+            String json = Utils.printJson(admin.worker().getFunctionsStats());
             GsonBuilder gsonBuilder = new GsonBuilder();
             if (indent) {
                 gsonBuilder.setPrettyPrinting();
@@ -77,7 +82,7 @@ void runCmd() throws Exception {
 
         @Override
         void runCmd() throws Exception {
-            String json = new Gson().toJson(admin.workerStats().getMetrics());
+            String json = new Gson().toJson(admin.worker().getMetrics());
             GsonBuilder gsonBuilder = new GsonBuilder();
             if (indent) {
                 gsonBuilder.setPrettyPrinting();
@@ -86,10 +91,64 @@ void runCmd() throws Exception {
         }
     }
 
-    public CmdFunctionWorkerStats(PulsarAdmin admin) throws 
PulsarClientException {
-        super("functions", admin);
-        jcommander.addCommand("functions", new FunctionsStats());
+    @Parameters(commandDescription = "Get all workers belonging to this 
cluster")
+    class GetCluster extends BaseCommand {
+
+        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
+        boolean indent = false;
+
+        @Override
+        void runCmd() throws Exception {
+            List<WorkerInfo> workers = admin.worker().getCluster();
+            GsonBuilder gsonBuilder = new GsonBuilder();
+            if (indent) {
+                gsonBuilder.setPrettyPrinting();
+            }
+            System.out.println(gsonBuilder.create().toJson(workers));
+        }
+    }
+
+    @Parameters(commandDescription = "Get the leader of the worker cluster")
+    class GetClusterLeader extends BaseCommand {
+
+        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
+        boolean indent = false;
+
+        @Override
+        void runCmd() throws Exception {
+            WorkerInfo leader = admin.worker().getClusterLeader();
+            GsonBuilder gsonBuilder = new GsonBuilder();
+            if (indent) {
+                gsonBuilder.setPrettyPrinting();
+            }
+            System.out.println(gsonBuilder.create().toJson(leader));
+        }
+    }
+
+    @Parameters(commandDescription = "Get the assignments of the functions 
accross the worker cluster")
+    class GetFunctionAssignments extends BaseCommand {
+
+        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
+        boolean indent = false;
+
+        @Override
+        void runCmd() throws Exception {
+            Map<String, Collection<String>> assignments = 
admin.worker().getAssignments();
+            GsonBuilder gsonBuilder = new GsonBuilder();
+            if (indent) {
+                gsonBuilder.setPrettyPrinting();
+            }
+            System.out.println(gsonBuilder.create().toJson(assignments));
+        }
+    }
+
+    public CmdFunctionWorker(PulsarAdmin admin) throws PulsarClientException {
+        super("functions-worker", admin);
+        jcommander.addCommand("function-stats", new FunctionsStats());
         jcommander.addCommand("monitoring-metrics", new 
CmdMonitoringMetrics());
+        jcommander.addCommand("get-cluster", new GetCluster());
+        jcommander.addCommand("get-cluster-leader", new GetClusterLeader());
+        jcommander.addCommand("get-function-assignments", new 
GetFunctionAssignments());
     }
 
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index f31fd82468..88412a6014 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -112,7 +112,6 @@
     private final TriggerFunction triggerer;
     private final UploadFunction uploader;
     private final DownloadFunction downloader;
-    private final GetCluster cluster;
 
     /**
      * Base command
@@ -1107,16 +1106,6 @@ void runCmd() throws Exception {
         }
     }
 
-    @Parameters(commandDescription = "Get list of workers registered in 
cluster")
-    class GetCluster extends BaseCommand {
-        @Override
-        void runCmd() throws Exception {
-            String json = (new Gson()).toJson(admin.functions().getCluster());
-            Gson gson = new GsonBuilder().setPrettyPrinting().create();
-            System.out.println(gson.toJson(new JsonParser().parse(json)));
-        }
-    }
-
     public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
         super("functions", admin);
         localRunner = new LocalRunner();
@@ -1130,7 +1119,6 @@ public CmdFunctions(PulsarAdmin admin) throws 
PulsarClientException {
         triggerer = new TriggerFunction();
         uploader = new UploadFunction();
         downloader = new DownloadFunction();
-        cluster = new GetCluster();
         restart = new RestartFunction();
         stop = new StopFunction();
         jcommander.addCommand("localrun", getLocalRunner());
@@ -1146,7 +1134,6 @@ public CmdFunctions(PulsarAdmin admin) throws 
PulsarClientException {
         jcommander.addCommand("trigger", getTriggerer());
         jcommander.addCommand("upload", getUploader());
         jcommander.addCommand("download", getDownloader());
-        jcommander.addCommand("cluster", cluster);
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index f30b8874f6..b82172bf6e 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -99,7 +99,7 @@
 
         commandMap.put("resource-quotas", CmdResourceQuotas.class);
         commandMap.put("functions", CmdFunctions.class);
-        commandMap.put("functions-worker-stats", CmdFunctionWorkerStats.class);
+        commandMap.put("functions-worker", CmdFunctionWorker.class);
         commandMap.put("source", CmdSources.class);
         commandMap.put("sink", CmdSinks.class);
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index be979864b2..2456a0d2b7 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.worker.rest;
 
-import java.util.Optional;
 import java.util.function.Supplier;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index 8b73c13106..26c71272c6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -20,12 +20,11 @@
 
 import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
 import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
-import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStats;
+import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 
 import java.util.Arrays;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 public final class Resources {
@@ -37,7 +36,7 @@ private Resources() {
         return new HashSet<>(
                 Arrays.asList(
                         FunctionApiV2Resource.class,
-                        WorkerStats.class,
+                        WorkerApiV2Resource.class,
                         MultiPartFeature.class
                 ));
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 1c26c7d078..a0914723a3 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -31,6 +31,8 @@
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStatsApiV2Resource;
 import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
@@ -141,6 +143,8 @@ public static ServletContextHandler 
newServletContextHandler(String contextPath,
                 new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
 
         
contextHandler.setAttribute(FunctionApiResource.ATTRIBUTE_FUNCTION_WORKER, 
workerService);
+        
contextHandler.setAttribute(WorkerApiV2Resource.ATTRIBUTE_WORKER_SERVICE, 
workerService);
+        
contextHandler.setAttribute(WorkerStatsApiV2Resource.ATTRIBUTE_WORKERSTATS_SERVICE,
 workerService);
         contextHandler.setContextPath(contextPath);
 
         final ServletHolder apiServlet =
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index d5f3f57ec5..bbb4f8f44d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -27,11 +27,7 @@
 
 import com.google.gson.Gson;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.*;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -39,10 +35,8 @@
 import java.net.URLClassLoader;
 import java.util.Base64;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -71,7 +65,6 @@
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
@@ -79,17 +72,10 @@
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import 
org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics;
-import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.MembershipManager;
 import org.apache.pulsar.functions.worker.Utils;
-import org.apache.pulsar.functions.worker.WorkerInfo;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.io.core.Sink;
@@ -583,48 +569,6 @@ private Response updateRequest(FunctionMetaData 
functionMetaData) {
         return this.worker().getConnectorsManager().getConnectors();
     }
 
-    public List<WorkerInfo> getCluster() {
-        if (!isWorkerServiceAvailable()) {
-            throw new WebApplicationException(
-                    
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
-                            .entity(new ErrorData("Function worker service is 
not avaialable")).build());
-        }
-        return worker().getMembershipManager().getCurrentMembership();
-    }
-
-    public WorkerInfo getClusterLeader() {
-        if (!isWorkerServiceAvailable()) {
-            throw new WebApplicationException(
-                    
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
-                            .entity(new ErrorData("Function worker service is 
not avaialable")).build());
-        }
-
-        MembershipManager membershipManager = worker().getMembershipManager();
-        WorkerInfo leader = membershipManager.getLeader();
-
-        if (leader == null) {
-            throw new 
WebApplicationException(Response.status(Status.INTERNAL_SERVER_ERROR)
-                    .type(MediaType.APPLICATION_JSON).entity(new 
ErrorData("Leader cannot be determined")).build());
-        }
-
-        return leader;
-    }
-
-    public Response getAssignments() {
-
-        if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
-        }
-
-        FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
-        Map<String, Map<String, Function.Assignment>> assignments = 
functionRuntimeManager.getCurrentAssignments();
-        Map<String, Collection<String>> ret = new HashMap<>();
-        for (Map.Entry<String, Map<String, Function.Assignment>> entry : 
assignments.entrySet()) {
-            ret.put(entry.getKey(), entry.getValue().keySet());
-        }
-        return Response.status(Status.OK).entity(new 
Gson().toJson(ret)).build();
-    }
-
     public Response triggerFunction(final String tenant, final String 
namespace, final String functionName,
             final String input, final InputStream uploadedInputStream, final 
String topic) {
 
@@ -1069,80 +1013,4 @@ public boolean isSuperUser(String clientRole) {
         return clientRole != null && 
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
     }
 
-    public List<org.apache.pulsar.common.stats.Metrics> 
getWorkerMetrcis(String clientRole) throws IOException {
-        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
-            log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
-            throw new 
WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(clientRole + " is not authorize to 
get metrics")).build());
-        }
-        return getWorkerMetrcis();
-    }
-
-    private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis() {
-        if (!isWorkerServiceAvailable()) {
-            throw new WebApplicationException(
-                    
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
-                            .entity(new ErrorData("Function worker service is 
not avaialable")).build());
-        }
-        return worker().getMetricsGenerator().generate();
-    }
-
-    public Response getFunctionsMetrcis(String clientRole) throws IOException {
-        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
-            log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
-            return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData("client is not authorize to perform 
operation")).build();
-        }
-        return getFunctionsMetrcis();
-    }
-
-    private Response getFunctionsMetrcis() throws IOException {
-        if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
-        }
-
-        WorkerService workerService = worker();
-        Map<String, FunctionRuntimeInfo> functionRuntimes = 
workerService.getFunctionRuntimeManager()
-                .getFunctionRuntimeInfos();
-
-        Metrics.Builder metricsBuilder = Metrics.newBuilder();
-        for (Map.Entry<String, FunctionRuntimeInfo> entry : 
functionRuntimes.entrySet()) {
-            String fullyQualifiedInstanceName = entry.getKey();
-            FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
-            RuntimeSpawner functionRuntimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
-
-            if (functionRuntimeSpawner != null) {
-                Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
-                if (functionRuntime != null) {
-                    try {
-                        InstanceCommunication.MetricsData metricsData = 
workerService.getWorkerConfig()
-                                .getMetricsSamplingPeriodSec() > 0 ? 
functionRuntime.getMetrics().get()
-                                        : 
functionRuntime.getAndResetMetrics().get();
-
-                        String tenant = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                                .getFunctionDetails().getTenant();
-                        String namespace = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                                .getFunctionDetails().getNamespace();
-                        String name = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                                .getFunctionDetails().getName();
-                        int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
-                        String qualifiedFunctionName = 
String.format("%s/%s/%s", tenant, namespace, name);
-
-                        InstanceMetrics.Builder instanceBuilder = 
InstanceMetrics.newBuilder();
-                        instanceBuilder.setName(qualifiedFunctionName);
-                        instanceBuilder.setInstanceId(instanceId);
-                        if (metricsData != null) {
-                            instanceBuilder.setMetricsData(metricsData);
-                        }
-                        metricsBuilder.addMetrics(instanceBuilder.build());
-                    } catch (InterruptedException | ExecutionException e) {
-                        log.warn("Failed to collect metrics for function 
instance {}", fullyQualifiedInstanceName, e);
-                    }
-                }
-            }
-        }
-        String jsonResponse = 
org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder);
-        return Response.status(Status.OK).entity(jsonResponse).build();
-    }
-
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
new file mode 100644
index 0000000000..87e9bc225f
--- /dev/null
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import 
org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.worker.*;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@Slf4j
+public class WorkerImpl {
+
+    private final Supplier<WorkerService> workerServiceSupplier;
+
+    public WorkerImpl(Supplier<WorkerService> workerServiceSupplier) {
+        this.workerServiceSupplier = workerServiceSupplier;
+    }
+
+    private WorkerService worker() {
+        try {
+            return checkNotNull(workerServiceSupplier.get());
+        } catch (Throwable t) {
+            log.info("Failed to get worker service", t);
+            throw t;
+        }
+    }
+
+    private boolean isWorkerServiceAvailable() {
+        WorkerService workerService = workerServiceSupplier.get();
+        if (workerService == null) {
+            return false;
+        }
+        if (!workerService.isInitialized()) {
+            return false;
+        }
+        return true;
+    }
+
+    public Response getCluster() {
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+        List<WorkerInfo> workers = 
worker().getMembershipManager().getCurrentMembership();
+        String jsonString = new Gson().toJson(workers);
+        return 
Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build();
+    }
+
+    public Response getClusterLeader() {
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        MembershipManager membershipManager = worker().getMembershipManager();
+        WorkerInfo leader = membershipManager.getLeader();
+
+        if (leader == null) {
+            return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Leader cannot be 
determined")).build();
+        }
+
+        String jsonString = new Gson().toJson(leader);
+        return 
Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build();
+    }
+
+    public Response getAssignments() {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
+        Map<String, Map<String, Function.Assignment>> assignments = 
functionRuntimeManager.getCurrentAssignments();
+        Map<String, Collection<String>> ret = new HashMap<>();
+        for (Map.Entry<String, Map<String, Function.Assignment>> entry : 
assignments.entrySet()) {
+            ret.put(entry.getKey(), entry.getValue().keySet());
+        }
+        return 
Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(new 
Gson().toJson(ret)).build();
+    }
+
+    private Response getUnavailableResponse() {
+        return 
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
+                .entity(new ErrorData(
+                        "Function worker service is not done initializing. " + 
"Please try again in a little while."))
+                .build();
+    }
+
+    public boolean isSuperUser(String clientRole) {
+        return clientRole != null && 
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
+    }
+
+    public List<org.apache.pulsar.common.stats.Metrics> 
getWorkerMetrcis(String clientRole) throws IOException {
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
+            log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
+            throw new 
WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(clientRole + " is not authorize to 
get metrics")).build());
+        }
+        return getWorkerMetrcis();
+    }
+
+    private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis() {
+        if (!isWorkerServiceAvailable()) {
+            throw new WebApplicationException(
+                    
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
+                            .entity(new ErrorData("Function worker service is 
not avaialable")).build());
+        }
+        return worker().getMetricsGenerator().generate();
+    }
+
+    public Response getFunctionsMetrics(String clientRole) throws IOException {
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
+            log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
+            return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("client is not authorize to perform 
operation")).build();
+        }
+        return getFunctionsMetrics();
+    }
+
+    private Response getFunctionsMetrics() throws IOException {
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        WorkerService workerService = worker();
+        Map<String, FunctionRuntimeInfo> functionRuntimes = 
workerService.getFunctionRuntimeManager()
+                .getFunctionRuntimeInfos();
+
+        Metrics.Builder metricsBuilder = Metrics.newBuilder();
+        for (Map.Entry<String, FunctionRuntimeInfo> entry : 
functionRuntimes.entrySet()) {
+            String fullyQualifiedInstanceName = entry.getKey();
+            FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
+            RuntimeSpawner functionRuntimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
+
+            if (functionRuntimeSpawner != null) {
+                Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
+                if (functionRuntime != null) {
+                    try {
+                        InstanceCommunication.MetricsData metricsData = 
workerService.getWorkerConfig()
+                                .getMetricsSamplingPeriodSec() > 0 ? 
functionRuntime.getMetrics().get()
+                                : functionRuntime.getAndResetMetrics().get();
+
+                        String tenant = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                                .getFunctionDetails().getTenant();
+                        String namespace = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                                .getFunctionDetails().getNamespace();
+                        String name = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                                .getFunctionDetails().getName();
+                        int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
+                        String qualifiedFunctionName = 
String.format("%s/%s/%s", tenant, namespace, name);
+
+                        InstanceMetrics.Builder instanceBuilder = 
InstanceMetrics.newBuilder();
+                        instanceBuilder.setName(qualifiedFunctionName);
+                        instanceBuilder.setInstanceId(instanceId);
+                        if (metricsData != null) {
+                            instanceBuilder.setMetricsData(metricsData);
+                        }
+                        metricsBuilder.addMetrics(instanceBuilder.build());
+                    } catch (InterruptedException | ExecutionException e) {
+                        log.warn("Failed to collect metrics for function 
instance {}", fullyQualifiedInstanceName, e);
+                    }
+                }
+            }
+        }
+        String jsonResponse = 
org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder);
+        return Response.status(Status.OK).entity(jsonResponse).build();
+    }
+}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index e13f69cb5a..95fe687804 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -35,17 +35,13 @@
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-import org.apache.pulsar.functions.worker.WorkerInfo;
-
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
-import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 @Path("/functions")
@@ -129,27 +125,6 @@ public Response listFunctions(final @PathParam("tenant") 
String tenant,
 
     }
 
-    @GET
-    @Path("/cluster")
-    @Produces(MediaType.APPLICATION_JSON)
-    @ApiOperation(value = "Fetches information about the Pulsar cluster 
running Pulsar Functions")
-    public List<WorkerInfo> getCluster() {
-        return functions.getCluster();
-    }
-
-    @GET
-    @Path("/cluster/leader")
-    @Produces(MediaType.APPLICATION_JSON)
-    public WorkerInfo getClusterLeader() {
-        return functions.getClusterLeader();
-    }
-
-    @GET
-    @Path("/assignments")
-    public Response getAssignments() {
-        return functions.getAssignments();
-    }
-
     @POST
     @Path("/{tenant}/{namespace}/{functionName}/trigger")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
new file mode 100644
index 0000000000..8cf0797411
--- /dev/null
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import java.util.function.Supplier;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.worker.WorkerService;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
+
+@Slf4j
+@Path("/worker")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+@Api(value = "/worker", description = "Workers admin api", tags = "workers")
+public class WorkerApiV2Resource implements Supplier<WorkerService> {
+
+    public static final String ATTRIBUTE_WORKER_SERVICE = "worker";
+
+    protected final WorkerImpl worker;
+    private WorkerService workerService;
+    @Context
+    protected ServletContext servletContext;
+    @Context
+    protected HttpServletRequest httpRequest;
+
+    public WorkerApiV2Resource() {
+        this.worker = new WorkerImpl(this);
+    }
+
+    @Override
+    public synchronized WorkerService get() {
+        if (this.workerService == null) {
+            this.workerService = (WorkerService) 
servletContext.getAttribute(ATTRIBUTE_WORKER_SERVICE);
+        }
+        return this.workerService;
+    }
+
+    public String clientAppId() {
+        return httpRequest != null
+                ? (String) 
httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
+                : null;
+    }
+
+    @GET
+    @Path("/cluster")
+    @ApiOperation(value = "Fetches information about the Pulsar cluster 
running Pulsar Functions")
+    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 503, message = "WorkerApiV2Resource service is 
not running") })
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getCluster() {
+        return worker.getCluster();
+    }
+
+    @GET
+    @Path("/cluster/leader")
+    @ApiOperation(value = "Fetches info about the leader node of the Pulsar 
cluster running Pulsar Functions")
+    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 503, message = "WorkerApiV2Resource service is 
not running") })
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getClusterLeader() {
+        return worker.getClusterLeader();
+    }
+
+    @GET
+    @Path("/assignments")
+    @ApiOperation(value = "Fetches information about which Pulsar Functions 
are assigned to which Pulsar clusters",
+            response = Function.Assignment.class,
+            responseContainer = "Map")
+    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 503, message = "WorkerApiV2Resource service is 
not running") })
+    public Response getAssignments() {
+        return worker.getAssignments();
+    }
+}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
similarity index 53%
rename from 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
rename to 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
index 146bb21694..c802da7c6f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
@@ -18,46 +18,77 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
-import java.io.IOException;
-import java.util.Collection;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
 
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.function.Supplier;
 
 @Slf4j
 @Path("/worker-stats")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-@Api(value = "/worker-stats", description = "Workers admin api", tags = 
"workers")
-public class WorkerStats extends FunctionApiResource {
+@Api(value = "/worker-stats", description = "Workers stats api", tags = 
"workers-stats")
+public class WorkerStatsApiV2Resource implements Supplier<WorkerService> {
 
-    @GET
-    @Path("/functions")
-    @ApiOperation(value = "Get stats for all functions owned by worker", notes 
= "Request should be executed by Monitoring agent on each worker to fetch the 
function-metrics", response = Metrics.class)
-    @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission"),
-            @ApiResponse(code = 503, message = "Worker service is not 
running") })
-    public Response getStats() throws IOException {
-        return functions.getFunctionsMetrcis(clientAppId());
+    public static final String ATTRIBUTE_WORKERSTATS_SERVICE = "worker-stats";
+
+    protected final WorkerImpl worker;
+    private WorkerService workerService;
+    @Context
+    protected ServletContext servletContext;
+    @Context
+    protected HttpServletRequest httpRequest;
+
+    public WorkerStatsApiV2Resource() {
+        this.worker = new WorkerImpl(this);
+    }
+
+    @Override
+    public synchronized WorkerService get() {
+        if (this.workerService == null) {
+            this.workerService = (WorkerService) 
servletContext.getAttribute(ATTRIBUTE_WORKERSTATS_SERVICE);
+        }
+        return this.workerService;
     }
-    
+
+    public String clientAppId() {
+        return httpRequest != null
+                ? (String) 
httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
+                : null;
+    }
+
     @GET
     @Path("/metrics")
     @ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request 
should be executed by Monitoring agent on each worker to fetch the 
worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, 
responseContainer = "List")
     @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have 
admin permission") })
     public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() 
throws Exception {
-        return functions.getWorkerMetrcis(clientAppId());
+        return worker.getWorkerMetrcis(clientAppId());
+    }
+
+    @GET
+    @Path("/functionsmetrics")
+    @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = Metrics.class)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 503, message = "Worker service is not 
running") })
+    public Response getFunctionsMetrics() throws IOException {
+        return worker.getFunctionsMetrics(clientAppId());
     }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerConfigTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
similarity index 98%
rename from 
pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerConfigTest.java
rename to 
pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
index b830034fa7..4c6d24912f 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerConfigTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
@@ -26,7 +26,7 @@
 /**
  * Unit test of {@link WorkerConfig}.
  */
-public class WorkerConfigTest {
+public class WorkerApiV2ResourceConfigTest {
 
     private static final String TEST_NAME = "test-worker-config";
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
index dcc0999bac..947acb7b25 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
@@ -33,6 +33,6 @@ public WorkerContainer(String clusterName, String hostname) {
             "bin/run-functions-worker.sh",
             -1,
             BROKER_HTTP_PORT,
-            "/admin/v2/functions/cluster");
+            "/admin/v2/worker/cluster");
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to