This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f44d367 Added cli commands to get function cluster related
information (#2426)
f44d367 is described below
commit f44d367f1bd0de30b4702c174c4697d591f072fc
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Fri Aug 24 23:28:34 2018 -0700
Added cli commands to get function cluster related information (#2426)
* Added command line to get cluster/cluster leader/function assignment
information.
Also refactored such kind of meta requests to a seperate endpoint
* Removed left-over debug statements
* Added back /functionsmetrics
* Added /functionsmetrics back to the broker worker
* Removed leftover references of getcluster
* Fix integration tests
* Fixed instantiation of service in worker only mode
* Removed log statement
* Seperated stats calls to a seperate endpoint to mimic broker
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 44 -----
.../org/apache/pulsar/broker/admin/v2/Worker.java | 94 ++++++++++
.../apache/pulsar/broker/admin/v2/WorkerStats.java | 52 +++---
.../apache/pulsar/io/PulsarFunctionAdminTest.java | 8 -
.../org/apache/pulsar/client/admin/Functions.java | 7 -
.../apache/pulsar/client/admin/PulsarAdmin.java | 10 +-
.../client/admin/{WorkerStats.java => Worker.java} | 26 ++-
.../{WorkerStatsImpl.java => WorkerImpl.java} | 54 +++++-
...tionWorkerStats.java => CmdFunctionWorker.java} | 71 +++++++-
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 13 --
.../apache/pulsar/admin/cli/PulsarAdminTool.java | 2 +-
.../functions/worker/rest/FunctionApiResource.java | 1 -
.../pulsar/functions/worker/rest/Resources.java | 5 +-
.../pulsar/functions/worker/rest/WorkerServer.java | 4 +
.../functions/worker/rest/api/FunctionsImpl.java | 134 +-------------
.../functions/worker/rest/api/WorkerImpl.java | 199 +++++++++++++++++++++
.../worker/rest/api/v2/FunctionApiV2Resource.java | 25 ---
.../worker/rest/api/v2/WorkerApiV2Resource.java | 108 +++++++++++
...kerStats.java => WorkerStatsApiV2Resource.java} | 75 +++++---
...est.java => WorkerApiV2ResourceConfigTest.java} | 2 +-
.../integration/containers/WorkerContainer.java | 2 +-
21 files changed, 640 insertions(+), 296 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 315027f..def5452 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -198,50 +198,6 @@ public class FunctionsBase extends AdminResource
implements Supplier<WorkerServi
}
- @GET
- @ApiOperation(
- value = "Fetches information about the Pulsar cluster running
Pulsar Functions",
- response = WorkerInfo.class,
- responseContainer = "List"
- )
- @ApiResponses(value = {
- @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
-
- })
- @Path("/cluster")
- @Produces(MediaType.APPLICATION_JSON)
- public List<WorkerInfo> getCluster() {
- return functions.getCluster();
- }
-
- @GET
- @ApiOperation(
- value = "Fetches info about the leader node of the Pulsar cluster
running Pulsar Functions",
- response = WorkerInfo.class
- )
- @ApiResponses(value = {
- @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
-
- })
- @Path("/cluster/leader")
- public WorkerInfo getClusterLeader() {
- return functions.getClusterLeader();
- }
-
- @GET
- @ApiOperation(
- value = "Fetches information about which Pulsar Functions are
assigned to which Pulsar clusters",
- response = Assignment.class,
- responseContainer = "Map"
- )
- @ApiResponses(value = {
- @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
- })
- @Path("/assignments")
- public Response getAssignments() {
- return functions.getAssignments();
- }
-
@POST
@ApiOperation(
value = "Triggers a Pulsar Function with a user-specified value or
file data",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
new file mode 100644
index 0000000..46427cb
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import java.util.function.Supplier;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.worker.WorkerService;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
+
+@Slf4j
+@Path("/worker")
+public class Worker extends AdminResource implements Supplier<WorkerService> {
+
+ private final WorkerImpl worker;
+
+ public Worker() {
+ this.worker = new WorkerImpl(this);
+ }
+
+ @Override
+ public WorkerService get() {
+ return pulsar().getWorkerService();
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetches information about the Pulsar cluster running
Pulsar Functions"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
+
+ })
+ @Path("/cluster")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getCluster() {
+ return worker.getCluster();
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetches info about the leader node of the Pulsar cluster
running Pulsar Functions")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
+
+ })
+ @Path("/cluster/leader")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getClusterLeader() {
+ return worker.getClusterLeader();
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetches information about which Pulsar Functions are
assigned to which Pulsar clusters",
+ response = Function.Assignment.class,
+ responseContainer = "Map"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have
admin permissions")
+ })
+ @Path("/assignments")
+ public Response getAssignments() {
+ return worker.getAssignments();
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
index 5a0e4b7..1a9c7ad 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
@@ -18,39 +18,51 @@
*/
package org.apache.pulsar.broker.admin.v2;
-import java.io.IOException;
-import java.util.Collection;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.core.Response;
-
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.function.Supplier;
@Slf4j
@Path("/worker-stats")
-public class WorkerStats extends FunctionApiResource {
+public class WorkerStats extends AdminResource implements
Supplier<WorkerService> {
- @GET
- @Path("/functions")
- @ApiOperation(value = "Get metrics for all functions owned by worker",
notes = "Requested should be executed by Monitoring agent on each worker to
fetch the metrics", response = Metrics.class)
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
- @ApiResponse(code = 503, message = "Worker service is not
running") })
- public Response getStats() throws IOException {
- return functions.getFunctionsMetrcis(clientAppId());
+ private final WorkerImpl worker;
+
+ public WorkerStats() {
+ this.worker = new WorkerImpl(this);
+ }
+
+ @Override
+ public WorkerService get() {
+ return pulsar().getWorkerService();
}
-
+
@GET
@Path("/metrics")
@ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request
should be executed by Monitoring agent on each worker to fetch the
worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class,
responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have
admin permission") })
public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics()
throws Exception {
- return functions.getWorkerMetrcis(clientAppId());
+ return worker.getWorkerMetrcis(clientAppId());
+ }
+
+ @GET
+ @Path("/functionsmetrics")
+ @ApiOperation(value = "Get metrics for all functions owned by worker",
notes = "Requested should be executed by Monitoring agent on each worker to
fetch the metrics", response = Metrics.class)
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 503, message = "Worker service is not
running") })
+ public Response getStats() throws IOException {
+ return worker.getFunctionsMetrics(clientAppId());
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index 427976f..a758c87 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -209,12 +209,4 @@ public class PulsarFunctionAdminTest {
return new WorkerService(workerConfig);
}
-
- @Test
- public void testGetWokersApi() throws Exception {
- List<WorkerInfo> workers = admin.functions().getCluster();
- Assert.assertEquals(workers.size(), 1);
- Assert.assertEquals(workers.get(0).getPort(), workerServicePort);
- }
-
}
\ No newline at end of file
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 2fc1796..96e9cbc 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -320,11 +320,4 @@ public interface Functions {
*
*/
Set<String> getSinks() throws PulsarAdminException;
-
- /**
- * Get list of workers present under a cluster
- * @return
- * @throws PulsarAdminException
- */
- List<WorkerInfo> getCluster() throws PulsarAdminException;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 7a41fe9..a1502b0 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -43,7 +43,7 @@ import org.apache.pulsar.client.admin.internal.NamespacesImpl;
import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
import org.apache.pulsar.client.admin.internal.SchemasImpl;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
-import org.apache.pulsar.client.admin.internal.WorkerStatsImpl;
+import org.apache.pulsar.client.admin.internal.WorkerImpl;
import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
@@ -85,7 +85,7 @@ public class PulsarAdmin implements Closeable {
private final String serviceUrl;
private final Lookup lookups;
private final Functions functions;
- private final WorkerStats workerStats;
+ private final Worker worker;
private final Schemas schemas;
protected final WebTarget root;
protected final Authentication auth;
@@ -189,7 +189,7 @@ public class PulsarAdmin implements Closeable {
this.resourceQuotas = new ResourceQuotasImpl(root, auth);
this.lookups = new LookupImpl(root, auth, useTls);
this.functions = new FunctionsImpl(root, auth);
- this.workerStats = new WorkerStatsImpl(root, auth);
+ this.worker = new WorkerImpl(root, auth);
this.schemas = new SchemasImpl(root, auth);
this.bookies = new BookiesImpl(root, auth);
}
@@ -361,8 +361,8 @@ public class PulsarAdmin implements Closeable {
*
* @return the Worker stats
*/
- public WorkerStats workerStats() {
- return workerStats;
+ public Worker worker() {
+ return worker;
}
/**
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
similarity index 66%
rename from
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
rename to
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
index 4fc242f..709b713 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Worker.java
@@ -19,13 +19,16 @@
package org.apache.pulsar.client.admin;
import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.worker.WorkerInfo;
/**
* Admin interface for worker stats management.
*/
-public interface WorkerStats {
+public interface Worker {
/**
@@ -41,4 +44,25 @@ public interface WorkerStats {
* @throws PulsarAdminException
*/
Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws
PulsarAdminException;
+
+ /**
+ * Get List of all workers belonging to this cluster
+ * @return
+ * @throws PulsarAdminException
+ */
+ List<WorkerInfo> getCluster() throws PulsarAdminException;
+
+ /**
+ * Get the worker who is the leader of the cluster
+ * @return
+ * @throws PulsarAdminException
+ */
+ WorkerInfo getClusterLeader() throws PulsarAdminException;
+
+ /**
+ * Get the function assignment among the cluster
+ * @return
+ * @throws PulsarAdminException
+ */
+ Map<String, Collection<String>> getAssignments() throws
PulsarAdminException;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
similarity index 56%
rename from
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
rename to
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index f492d31..4ca46cc 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -20,35 +20,42 @@ package org.apache.pulsar.client.admin.internal;
import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;
+import java.lang.reflect.Type;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.WorkerStats;
+import org.apache.pulsar.client.admin.Worker;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.WorkerInfo;
@Slf4j
-public class WorkerStatsImpl extends BaseResource implements WorkerStats {
+public class WorkerImpl extends BaseResource implements Worker {
private final WebTarget workerStats;
+ private final WebTarget worker;
- public WorkerStatsImpl(WebTarget web, Authentication auth) {
+ public WorkerImpl(WebTarget web, Authentication auth) {
super(auth);
- this.workerStats = web.path("/admin/worker-stats");
+ this.worker = web.path("/admin/v2/worker");
+ this.workerStats = web.path("/admin/v2/worker-stats");
}
@Override
public Metrics getFunctionsStats() throws PulsarAdminException {
try {
- Response response = request(workerStats.path("functions")).get();
+ Response response =
request(workerStats.path("functionsmetrics")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
@@ -71,4 +78,41 @@ public class WorkerStatsImpl extends BaseResource implements
WorkerStats {
throw getApiException(e);
}
}
+
+ @Override
+ public List<WorkerInfo> getCluster() throws PulsarAdminException {
+ try {
+ return request(worker.path("cluster"))
+ .get(new GenericType<List<WorkerInfo>>() {
+ });
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public WorkerInfo getClusterLeader() throws PulsarAdminException {
+ try {
+ return request(worker.path("cluster").path("leader"))
+ .get(new GenericType<WorkerInfo>(){});
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public Map<String, Collection<String>> getAssignments() throws
PulsarAdminException {
+ try {
+ Response response = request(worker.path("assignments")).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ String jsonResponse = response.readEntity(String.class);
+ Type type = new TypeToken<Map<String,
Collection<String>>>(){}.getType();
+ Map<String, Collection<String>> assignments = new
Gson().fromJson(jsonResponse, type);
+ return assignments;
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
}
\ No newline at end of file
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
similarity index 51%
rename from
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
rename to
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
index 6bd22ae..9122375 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorker.java
@@ -30,10 +30,15 @@ import com.google.gson.JsonParser;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.WorkerInfo;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
@Slf4j
@Parameters(commandDescription = "Operations to collect function-worker
statistics")
-public class CmdFunctionWorkerStats extends CmdBase {
+public class CmdFunctionWorker extends CmdBase {
/**
* Base command
@@ -60,7 +65,7 @@ public class CmdFunctionWorkerStats extends CmdBase {
@Override
void runCmd() throws Exception {
- String json =
Utils.printJson(admin.workerStats().getFunctionsStats());
+ String json = Utils.printJson(admin.worker().getFunctionsStats());
GsonBuilder gsonBuilder = new GsonBuilder();
if (indent) {
gsonBuilder.setPrettyPrinting();
@@ -77,7 +82,7 @@ public class CmdFunctionWorkerStats extends CmdBase {
@Override
void runCmd() throws Exception {
- String json = new Gson().toJson(admin.workerStats().getMetrics());
+ String json = new Gson().toJson(admin.worker().getMetrics());
GsonBuilder gsonBuilder = new GsonBuilder();
if (indent) {
gsonBuilder.setPrettyPrinting();
@@ -86,10 +91,64 @@ public class CmdFunctionWorkerStats extends CmdBase {
}
}
- public CmdFunctionWorkerStats(PulsarAdmin admin) throws
PulsarClientException {
- super("functions", admin);
- jcommander.addCommand("functions", new FunctionsStats());
+ @Parameters(commandDescription = "Get all workers belonging to this
cluster")
+ class GetCluster extends BaseCommand {
+
+ @Parameter(names = { "-i", "--indent" }, description = "Indent JSON
output", required = false)
+ boolean indent = false;
+
+ @Override
+ void runCmd() throws Exception {
+ List<WorkerInfo> workers = admin.worker().getCluster();
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ if (indent) {
+ gsonBuilder.setPrettyPrinting();
+ }
+ System.out.println(gsonBuilder.create().toJson(workers));
+ }
+ }
+
+ @Parameters(commandDescription = "Get the leader of the worker cluster")
+ class GetClusterLeader extends BaseCommand {
+
+ @Parameter(names = { "-i", "--indent" }, description = "Indent JSON
output", required = false)
+ boolean indent = false;
+
+ @Override
+ void runCmd() throws Exception {
+ WorkerInfo leader = admin.worker().getClusterLeader();
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ if (indent) {
+ gsonBuilder.setPrettyPrinting();
+ }
+ System.out.println(gsonBuilder.create().toJson(leader));
+ }
+ }
+
+ @Parameters(commandDescription = "Get the assignments of the functions
accross the worker cluster")
+ class GetFunctionAssignments extends BaseCommand {
+
+ @Parameter(names = { "-i", "--indent" }, description = "Indent JSON
output", required = false)
+ boolean indent = false;
+
+ @Override
+ void runCmd() throws Exception {
+ Map<String, Collection<String>> assignments =
admin.worker().getAssignments();
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ if (indent) {
+ gsonBuilder.setPrettyPrinting();
+ }
+ System.out.println(gsonBuilder.create().toJson(assignments));
+ }
+ }
+
+ public CmdFunctionWorker(PulsarAdmin admin) throws PulsarClientException {
+ super("functions-worker", admin);
+ jcommander.addCommand("function-stats", new FunctionsStats());
jcommander.addCommand("monitoring-metrics", new
CmdMonitoringMetrics());
+ jcommander.addCommand("get-cluster", new GetCluster());
+ jcommander.addCommand("get-cluster-leader", new GetClusterLeader());
+ jcommander.addCommand("get-function-assignments", new
GetFunctionAssignments());
}
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index f31fd82..88412a6 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -112,7 +112,6 @@ public class CmdFunctions extends CmdBase {
private final TriggerFunction triggerer;
private final UploadFunction uploader;
private final DownloadFunction downloader;
- private final GetCluster cluster;
/**
* Base command
@@ -1107,16 +1106,6 @@ public class CmdFunctions extends CmdBase {
}
}
- @Parameters(commandDescription = "Get list of workers registered in
cluster")
- class GetCluster extends BaseCommand {
- @Override
- void runCmd() throws Exception {
- String json = (new Gson()).toJson(admin.functions().getCluster());
- Gson gson = new GsonBuilder().setPrettyPrinting().create();
- System.out.println(gson.toJson(new JsonParser().parse(json)));
- }
- }
-
public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
super("functions", admin);
localRunner = new LocalRunner();
@@ -1130,7 +1119,6 @@ public class CmdFunctions extends CmdBase {
triggerer = new TriggerFunction();
uploader = new UploadFunction();
downloader = new DownloadFunction();
- cluster = new GetCluster();
restart = new RestartFunction();
stop = new StopFunction();
jcommander.addCommand("localrun", getLocalRunner());
@@ -1146,7 +1134,6 @@ public class CmdFunctions extends CmdBase {
jcommander.addCommand("trigger", getTriggerer());
jcommander.addCommand("upload", getUploader());
jcommander.addCommand("download", getDownloader());
- jcommander.addCommand("cluster", cluster);
}
@VisibleForTesting
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index f30b887..b82172b 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -99,7 +99,7 @@ public class PulsarAdminTool {
commandMap.put("resource-quotas", CmdResourceQuotas.class);
commandMap.put("functions", CmdFunctions.class);
- commandMap.put("functions-worker-stats", CmdFunctionWorkerStats.class);
+ commandMap.put("functions-worker", CmdFunctionWorker.class);
commandMap.put("source", CmdSources.class);
commandMap.put("sink", CmdSinks.class);
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index be97986..2456a0d 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.functions.worker.rest;
-import java.util.Optional;
import java.util.function.Supplier;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index 8b73c13..26c7127 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -20,12 +20,11 @@ package org.apache.pulsar.functions.worker.rest;
import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
-import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStats;
+import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
import java.util.Arrays;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
public final class Resources {
@@ -37,7 +36,7 @@ public final class Resources {
return new HashSet<>(
Arrays.asList(
FunctionApiV2Resource.class,
- WorkerStats.class,
+ WorkerApiV2Resource.class,
MultiPartFeature.class
));
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 1c26c7d..a091472 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -31,6 +31,8 @@ import java.util.concurrent.Executors;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStatsApiV2Resource;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@@ -141,6 +143,8 @@ public class WorkerServer {
new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
contextHandler.setAttribute(FunctionApiResource.ATTRIBUTE_FUNCTION_WORKER,
workerService);
+
contextHandler.setAttribute(WorkerApiV2Resource.ATTRIBUTE_WORKER_SERVICE,
workerService);
+
contextHandler.setAttribute(WorkerStatsApiV2Resource.ATTRIBUTE_WORKERSTATS_SERVICE,
workerService);
contextHandler.setContextPath(contextPath);
final ServletHolder apiServlet =
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index d5f3f57..bbb4f8f 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -27,11 +27,7 @@ import static
org.apache.pulsar.functions.utils.functioncache.FunctionClassLoade
import com.google.gson.Gson;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.*;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -39,10 +35,8 @@ import java.net.URL;
import java.net.URLClassLoader;
import java.util.Base64;
import java.util.Collection;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -71,7 +65,6 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
@@ -79,17 +72,10 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import
org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics;
-import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.Utils;
-import org.apache.pulsar.functions.worker.WorkerInfo;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.io.core.Sink;
@@ -583,48 +569,6 @@ public class FunctionsImpl {
return this.worker().getConnectorsManager().getConnectors();
}
- public List<WorkerInfo> getCluster() {
- if (!isWorkerServiceAvailable()) {
- throw new WebApplicationException(
-
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData("Function worker service is
not avaialable")).build());
- }
- return worker().getMembershipManager().getCurrentMembership();
- }
-
- public WorkerInfo getClusterLeader() {
- if (!isWorkerServiceAvailable()) {
- throw new WebApplicationException(
-
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData("Function worker service is
not avaialable")).build());
- }
-
- MembershipManager membershipManager = worker().getMembershipManager();
- WorkerInfo leader = membershipManager.getLeader();
-
- if (leader == null) {
- throw new
WebApplicationException(Response.status(Status.INTERNAL_SERVER_ERROR)
- .type(MediaType.APPLICATION_JSON).entity(new
ErrorData("Leader cannot be determined")).build());
- }
-
- return leader;
- }
-
- public Response getAssignments() {
-
- if (!isWorkerServiceAvailable()) {
- return getUnavailableResponse();
- }
-
- FunctionRuntimeManager functionRuntimeManager =
worker().getFunctionRuntimeManager();
- Map<String, Map<String, Function.Assignment>> assignments =
functionRuntimeManager.getCurrentAssignments();
- Map<String, Collection<String>> ret = new HashMap<>();
- for (Map.Entry<String, Map<String, Function.Assignment>> entry :
assignments.entrySet()) {
- ret.put(entry.getKey(), entry.getValue().keySet());
- }
- return Response.status(Status.OK).entity(new
Gson().toJson(ret)).build();
- }
-
public Response triggerFunction(final String tenant, final String
namespace, final String functionName,
final String input, final InputStream uploadedInputStream, final
String topic) {
@@ -1069,80 +1013,4 @@ public class FunctionsImpl {
return clientRole != null &&
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
}
- public List<org.apache.pulsar.common.stats.Metrics>
getWorkerMetrcis(String clientRole) throws IOException {
- if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole)) {
- log.error("Client [{}] is not admin and authorized to get
function-stats", clientRole);
- throw new
WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(clientRole + " is not authorize to
get metrics")).build());
- }
- return getWorkerMetrcis();
- }
-
- private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis() {
- if (!isWorkerServiceAvailable()) {
- throw new WebApplicationException(
-
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData("Function worker service is
not avaialable")).build());
- }
- return worker().getMetricsGenerator().generate();
- }
-
- public Response getFunctionsMetrcis(String clientRole) throws IOException {
- if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole)) {
- log.error("Client [{}] is not admin and authorized to get
function-stats", clientRole);
- return
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData("client is not authorize to perform
operation")).build();
- }
- return getFunctionsMetrcis();
- }
-
- private Response getFunctionsMetrcis() throws IOException {
- if (!isWorkerServiceAvailable()) {
- return getUnavailableResponse();
- }
-
- WorkerService workerService = worker();
- Map<String, FunctionRuntimeInfo> functionRuntimes =
workerService.getFunctionRuntimeManager()
- .getFunctionRuntimeInfos();
-
- Metrics.Builder metricsBuilder = Metrics.newBuilder();
- for (Map.Entry<String, FunctionRuntimeInfo> entry :
functionRuntimes.entrySet()) {
- String fullyQualifiedInstanceName = entry.getKey();
- FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
- RuntimeSpawner functionRuntimeSpawner =
functionRuntimeInfo.getRuntimeSpawner();
-
- if (functionRuntimeSpawner != null) {
- Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
- if (functionRuntime != null) {
- try {
- InstanceCommunication.MetricsData metricsData =
workerService.getWorkerConfig()
- .getMetricsSamplingPeriodSec() > 0 ?
functionRuntime.getMetrics().get()
- :
functionRuntime.getAndResetMetrics().get();
-
- String tenant =
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
- .getFunctionDetails().getTenant();
- String namespace =
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
- .getFunctionDetails().getNamespace();
- String name =
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
- .getFunctionDetails().getName();
- int instanceId =
functionRuntimeInfo.getFunctionInstance().getInstanceId();
- String qualifiedFunctionName =
String.format("%s/%s/%s", tenant, namespace, name);
-
- InstanceMetrics.Builder instanceBuilder =
InstanceMetrics.newBuilder();
- instanceBuilder.setName(qualifiedFunctionName);
- instanceBuilder.setInstanceId(instanceId);
- if (metricsData != null) {
- instanceBuilder.setMetricsData(metricsData);
- }
- metricsBuilder.addMetrics(instanceBuilder.build());
- } catch (InterruptedException | ExecutionException e) {
- log.warn("Failed to collect metrics for function
instance {}", fullyQualifiedInstanceName, e);
- }
- }
- }
- }
- String jsonResponse =
org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder);
- return Response.status(Status.OK).entity(jsonResponse).build();
- }
-
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
new file mode 100644
index 0000000..87e9bc2
--- /dev/null
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import
org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.worker.*;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@Slf4j
+public class WorkerImpl {
+
+ private final Supplier<WorkerService> workerServiceSupplier;
+
+ public WorkerImpl(Supplier<WorkerService> workerServiceSupplier) {
+ this.workerServiceSupplier = workerServiceSupplier;
+ }
+
+ private WorkerService worker() {
+ try {
+ return checkNotNull(workerServiceSupplier.get());
+ } catch (Throwable t) {
+ log.info("Failed to get worker service", t);
+ throw t;
+ }
+ }
+
+ private boolean isWorkerServiceAvailable() {
+ WorkerService workerService = workerServiceSupplier.get();
+ if (workerService == null) {
+ return false;
+ }
+ if (!workerService.isInitialized()) {
+ return false;
+ }
+ return true;
+ }
+
+ public Response getCluster() {
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+ List<WorkerInfo> workers =
worker().getMembershipManager().getCurrentMembership();
+ String jsonString = new Gson().toJson(workers);
+ return
Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build();
+ }
+
+ public Response getClusterLeader() {
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
+ MembershipManager membershipManager = worker().getMembershipManager();
+ WorkerInfo leader = membershipManager.getLeader();
+
+ if (leader == null) {
+ return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("Leader cannot be
determined")).build();
+ }
+
+ String jsonString = new Gson().toJson(leader);
+ return
Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(jsonString).build();
+ }
+
+ public Response getAssignments() {
+
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
+ FunctionRuntimeManager functionRuntimeManager =
worker().getFunctionRuntimeManager();
+ Map<String, Map<String, Function.Assignment>> assignments =
functionRuntimeManager.getCurrentAssignments();
+ Map<String, Collection<String>> ret = new HashMap<>();
+ for (Map.Entry<String, Map<String, Function.Assignment>> entry :
assignments.entrySet()) {
+ ret.put(entry.getKey(), entry.getValue().keySet());
+ }
+ return
Response.status(Status.OK).type(MediaType.APPLICATION_JSON).entity(new
Gson().toJson(ret)).build();
+ }
+
+ private Response getUnavailableResponse() {
+ return
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(
+ "Function worker service is not done initializing. " +
"Please try again in a little while."))
+ .build();
+ }
+
+ public boolean isSuperUser(String clientRole) {
+ return clientRole != null &&
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
+ }
+
+ public List<org.apache.pulsar.common.stats.Metrics>
getWorkerMetrcis(String clientRole) throws IOException {
+ if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole)) {
+ log.error("Client [{}] is not admin and authorized to get
function-stats", clientRole);
+ throw new
WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(clientRole + " is not authorize to
get metrics")).build());
+ }
+ return getWorkerMetrcis();
+ }
+
+ private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrcis() {
+ if (!isWorkerServiceAvailable()) {
+ throw new WebApplicationException(
+
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("Function worker service is
not avaialable")).build());
+ }
+ return worker().getMetricsGenerator().generate();
+ }
+
+ public Response getFunctionsMetrics(String clientRole) throws IOException {
+ if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole)) {
+ log.error("Client [{}] is not admin and authorized to get
function-stats", clientRole);
+ return
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("client is not authorize to perform
operation")).build();
+ }
+ return getFunctionsMetrics();
+ }
+
+ private Response getFunctionsMetrics() throws IOException {
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
+ WorkerService workerService = worker();
+ Map<String, FunctionRuntimeInfo> functionRuntimes =
workerService.getFunctionRuntimeManager()
+ .getFunctionRuntimeInfos();
+
+ Metrics.Builder metricsBuilder = Metrics.newBuilder();
+ for (Map.Entry<String, FunctionRuntimeInfo> entry :
functionRuntimes.entrySet()) {
+ String fullyQualifiedInstanceName = entry.getKey();
+ FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
+ RuntimeSpawner functionRuntimeSpawner =
functionRuntimeInfo.getRuntimeSpawner();
+
+ if (functionRuntimeSpawner != null) {
+ Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
+ if (functionRuntime != null) {
+ try {
+ InstanceCommunication.MetricsData metricsData =
workerService.getWorkerConfig()
+ .getMetricsSamplingPeriodSec() > 0 ?
functionRuntime.getMetrics().get()
+ : functionRuntime.getAndResetMetrics().get();
+
+ String tenant =
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+ .getFunctionDetails().getTenant();
+ String namespace =
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+ .getFunctionDetails().getNamespace();
+ String name =
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+ .getFunctionDetails().getName();
+ int instanceId =
functionRuntimeInfo.getFunctionInstance().getInstanceId();
+ String qualifiedFunctionName =
String.format("%s/%s/%s", tenant, namespace, name);
+
+ InstanceMetrics.Builder instanceBuilder =
InstanceMetrics.newBuilder();
+ instanceBuilder.setName(qualifiedFunctionName);
+ instanceBuilder.setInstanceId(instanceId);
+ if (metricsData != null) {
+ instanceBuilder.setMetricsData(metricsData);
+ }
+ metricsBuilder.addMetrics(instanceBuilder.build());
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Failed to collect metrics for function
instance {}", fullyQualifiedInstanceName, e);
+ }
+ }
+ }
+ }
+ String jsonResponse =
org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder);
+ return Response.status(Status.OK).entity(jsonResponse).build();
+ }
+}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index e13f69c..95fe687 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -35,17 +35,13 @@ import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.apache.pulsar.functions.worker.WorkerInfo;
-
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
-import lombok.extern.slf4j.Slf4j;
@Slf4j
@Path("/functions")
@@ -129,27 +125,6 @@ public class FunctionApiV2Resource extends
FunctionApiResource {
}
- @GET
- @Path("/cluster")
- @Produces(MediaType.APPLICATION_JSON)
- @ApiOperation(value = "Fetches information about the Pulsar cluster
running Pulsar Functions")
- public List<WorkerInfo> getCluster() {
- return functions.getCluster();
- }
-
- @GET
- @Path("/cluster/leader")
- @Produces(MediaType.APPLICATION_JSON)
- public WorkerInfo getClusterLeader() {
- return functions.getClusterLeader();
- }
-
- @GET
- @Path("/assignments")
- public Response getAssignments() {
- return functions.getAssignments();
- }
-
@POST
@Path("/{tenant}/{namespace}/{functionName}/trigger")
@Consumes(MediaType.MULTIPART_FORM_DATA)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
new file mode 100644
index 0000000..8cf0797
--- /dev/null
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import java.util.function.Supplier;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.worker.WorkerService;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
+
+@Slf4j
+@Path("/worker")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+@Api(value = "/worker", description = "Workers admin api", tags = "workers")
+public class WorkerApiV2Resource implements Supplier<WorkerService> {
+
+ public static final String ATTRIBUTE_WORKER_SERVICE = "worker";
+
+ protected final WorkerImpl worker;
+ private WorkerService workerService;
+ @Context
+ protected ServletContext servletContext;
+ @Context
+ protected HttpServletRequest httpRequest;
+
+ public WorkerApiV2Resource() {
+ this.worker = new WorkerImpl(this);
+ }
+
+ @Override
+ public synchronized WorkerService get() {
+ if (this.workerService == null) {
+ this.workerService = (WorkerService)
servletContext.getAttribute(ATTRIBUTE_WORKER_SERVICE);
+ }
+ return this.workerService;
+ }
+
+ public String clientAppId() {
+ return httpRequest != null
+ ? (String)
httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
+ : null;
+ }
+
+ @GET
+ @Path("/cluster")
+ @ApiOperation(value = "Fetches information about the Pulsar cluster
running Pulsar Functions")
+ @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have
admin permission"),
+ @ApiResponse(code = 503, message = "WorkerApiV2Resource service is
not running") })
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getCluster() {
+ return worker.getCluster();
+ }
+
+ @GET
+ @Path("/cluster/leader")
+ @ApiOperation(value = "Fetches info about the leader node of the Pulsar
cluster running Pulsar Functions")
+ @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have
admin permission"),
+ @ApiResponse(code = 503, message = "WorkerApiV2Resource service is
not running") })
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getClusterLeader() {
+ return worker.getClusterLeader();
+ }
+
+ @GET
+ @Path("/assignments")
+ @ApiOperation(value = "Fetches information about which Pulsar Functions
are assigned to which Pulsar clusters",
+ response = Function.Assignment.class,
+ responseContainer = "Map")
+ @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have
admin permission"),
+ @ApiResponse(code = 503, message = "WorkerApiV2Resource service is
not running") })
+ public Response getAssignments() {
+ return worker.getAssignments();
+ }
+}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
similarity index 53%
rename from
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
rename to
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
index 146bb21..c802da7 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStatsApiV2Resource.java
@@ -18,46 +18,77 @@
*/
package org.apache.pulsar.functions.worker.rest.api.v2;
-import java.io.IOException;
-import java.util.Collection;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.web.AuthenticationFilter;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-
-import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.function.Supplier;
@Slf4j
@Path("/worker-stats")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
-@Api(value = "/worker-stats", description = "Workers admin api", tags =
"workers")
-public class WorkerStats extends FunctionApiResource {
+@Api(value = "/worker-stats", description = "Workers stats api", tags =
"workers-stats")
+public class WorkerStatsApiV2Resource implements Supplier<WorkerService> {
- @GET
- @Path("/functions")
- @ApiOperation(value = "Get stats for all functions owned by worker", notes
= "Request should be executed by Monitoring agent on each worker to fetch the
function-metrics", response = Metrics.class)
- @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have
admin permission"),
- @ApiResponse(code = 503, message = "Worker service is not
running") })
- public Response getStats() throws IOException {
- return functions.getFunctionsMetrcis(clientAppId());
+ public static final String ATTRIBUTE_WORKERSTATS_SERVICE = "worker-stats";
+
+ protected final WorkerImpl worker;
+ private WorkerService workerService;
+ @Context
+ protected ServletContext servletContext;
+ @Context
+ protected HttpServletRequest httpRequest;
+
+ public WorkerStatsApiV2Resource() {
+ this.worker = new WorkerImpl(this);
+ }
+
+ @Override
+ public synchronized WorkerService get() {
+ if (this.workerService == null) {
+ this.workerService = (WorkerService)
servletContext.getAttribute(ATTRIBUTE_WORKERSTATS_SERVICE);
+ }
+ return this.workerService;
}
-
+
+ public String clientAppId() {
+ return httpRequest != null
+ ? (String)
httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
+ : null;
+ }
+
@GET
@Path("/metrics")
@ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request
should be executed by Monitoring agent on each worker to fetch the
worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class,
responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have
admin permission") })
public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics()
throws Exception {
- return functions.getWorkerMetrcis(clientAppId());
+ return worker.getWorkerMetrcis(clientAppId());
+ }
+
+ @GET
+ @Path("/functionsmetrics")
+ @ApiOperation(value = "Get metrics for all functions owned by worker",
notes = "Requested should be executed by Monitoring agent on each worker to
fetch the metrics", response = Metrics.class)
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 503, message = "Worker service is not
running") })
+ public Response getFunctionsMetrics() throws IOException {
+ return worker.getFunctionsMetrics(clientAppId());
}
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerConfigTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
similarity index 98%
rename from
pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerConfigTest.java
rename to
pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
index b830034..4c6d249 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerConfigTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerApiV2ResourceConfigTest.java
@@ -26,7 +26,7 @@ import org.testng.annotations.Test;
/**
* Unit test of {@link WorkerConfig}.
*/
-public class WorkerConfigTest {
+public class WorkerApiV2ResourceConfigTest {
private static final String TEST_NAME = "test-worker-config";
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
index dcc0999..947acb7 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/WorkerContainer.java
@@ -33,6 +33,6 @@ public class WorkerContainer extends
PulsarContainer<WorkerContainer> {
"bin/run-functions-worker.sh",
-1,
BROKER_HTTP_PORT,
- "/admin/v2/functions/cluster");
+ "/admin/v2/worker/cluster");
}
}