This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 20b41de REST and CLI to get function metrics in json for monitoring
(#2296)
20b41de is described below
commit 20b41de73912ac1f82280d70a291684cb30d8151
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Aug 8 16:57:22 2018 -0700
REST and CLI to get function metrics in json for monitoring (#2296)
* REST and CLI to get function metrics in json for monitoring
* add worker-stats end-point
---
.../apache/pulsar/broker/admin/v2/WorkerStats.java | 47 ++++++++++
.../org/apache/pulsar/client/admin/Functions.java | 1 +
.../apache/pulsar/client/admin/PulsarAdmin.java | 11 +++
.../apache/pulsar/client/admin/WorkerStats.java | 35 +++++++
.../client/admin/internal/FunctionsImpl.java | 2 +-
.../client/admin/internal/WorkerStatsImpl.java | 58 ++++++++++++
.../pulsar/admin/cli/CmdFunctionWorkerStats.java | 79 ++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 2 +-
.../apache/pulsar/admin/cli/PulsarAdminTool.java | 1 +
.../src/main/proto/InstanceCommunication.proto | 9 ++
.../pulsar/functions/worker/rest/Resources.java | 2 +
.../functions/worker/rest/api/FunctionsImpl.java | 103 +++++++++++++++++----
.../functions/worker/rest/api/v2/WorkerStats.java | 54 +++++++++++
.../org/apache/pulsar/io/kinesis/KinesisSink.java | 4 +-
site2/website/scripts/replace.js | 3 +-
15 files changed, 386 insertions(+), 25 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
new file mode 100644
index 0000000..962c483
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import java.io.IOException;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Path("/worker-stats")
+public class WorkerStats extends FunctionApiResource {
+
+ @GET
+ @Path("/functions")
+ @ApiOperation(value = "Get metrics for all functions owned by worker",
notes = "Requested should be executed by Monitoring agent on each worker to
fetch the metrics", response = Metrics.class)
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 503, message = "Worker service is not
running") })
+ public Response getMetrics() throws IOException {
+ return functions.getFunctionsMetrcis(clientAppId());
+ }
+}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 293304f..c04873d 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -27,6 +27,7 @@ import
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedExc
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.functions.worker.WorkerInfo;
/**
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 48aea94..7a41fe9 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.admin.internal.NamespacesImpl;
import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
import org.apache.pulsar.client.admin.internal.SchemasImpl;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
+import org.apache.pulsar.client.admin.internal.WorkerStatsImpl;
import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
@@ -84,6 +85,7 @@ public class PulsarAdmin implements Closeable {
private final String serviceUrl;
private final Lookup lookups;
private final Functions functions;
+ private final WorkerStats workerStats;
private final Schemas schemas;
protected final WebTarget root;
protected final Authentication auth;
@@ -187,6 +189,7 @@ public class PulsarAdmin implements Closeable {
this.resourceQuotas = new ResourceQuotasImpl(root, auth);
this.lookups = new LookupImpl(root, auth, useTls);
this.functions = new FunctionsImpl(root, auth);
+ this.workerStats = new WorkerStatsImpl(root, auth);
this.schemas = new SchemasImpl(root, auth);
this.bookies = new BookiesImpl(root, auth);
}
@@ -355,6 +358,14 @@ public class PulsarAdmin implements Closeable {
}
/**
+ *
+ * @return the Worker stats
+ */
+ public WorkerStats workerStats() {
+ return workerStats;
+ }
+
+ /**
* @return the broker statics
*/
public BrokerStats brokerStats() {
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
new file mode 100644
index 0000000..069e58f
--- /dev/null
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+
+/**
+ * Admin interface for worker stats management.
+ */
+public interface WorkerStats {
+
+
+ /**
+ * Get all functions stats on a worker
+ * @return
+ * @throws PulsarAdminException
+ */
+ Metrics getFunctionsStats() throws PulsarAdminException;
+}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index fdb3588..028da3c 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -281,7 +281,7 @@ public class FunctionsImpl extends BaseResource implements
Functions {
throw getApiException(e);
}
}
-
+
public static void mergeJson(String json, Builder builder) throws
IOException {
JsonFormat.parser().merge(json, builder);
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
new file mode 100644
index 0000000..5b6762c
--- /dev/null
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.WorkerStats;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class WorkerStatsImpl extends BaseResource implements WorkerStats {
+
+ private final WebTarget workerStats;
+
+ public WorkerStatsImpl(WebTarget web, Authentication auth) {
+ super(auth);
+ this.workerStats = web.path("/admin/worker-stats");
+ }
+
+ @Override
+ public Metrics getFunctionsStats() throws PulsarAdminException {
+ try {
+ Response response = request(workerStats.path("functions")).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ String jsonResponse = response.readEntity(String.class);
+ Metrics.Builder metricsBuilder = Metrics.newBuilder();
+ mergeJson(jsonResponse, metricsBuilder);
+ return metricsBuilder.build();
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
new file mode 100644
index 0000000..640d7d7
--- /dev/null
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.functions.utils.Utils;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Parameters(commandDescription = "Operations to collect function-worker
statistics")
+public class CmdFunctionWorkerStats extends CmdBase {
+
+ private final FunctionsStats functionsStats;
+
+ /**
+ * Base command
+ */
+ @Getter
+ abstract class BaseCommand extends CliCommand {
+ @Override
+ void run() throws Exception {
+ processArguments();
+ runCmd();
+ }
+
+ void processArguments() throws Exception {
+ }
+
+ abstract void runCmd() throws Exception;
+ }
+
+ @Parameters(commandDescription = "dump all functions stats")
+ class FunctionsStats extends BaseCommand {
+
+ @Parameter(names = { "-i", "--indent" }, description = "Indent JSON
output", required = false)
+ boolean indent = false;
+
+ @Override
+ void runCmd() throws Exception {
+ String json =
Utils.printJson(admin.workerStats().getFunctionsStats());
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ if (indent) {
+ gsonBuilder.setPrettyPrinting();
+ }
+ System.out.println(gsonBuilder.create().toJson(new
JsonParser().parse(json)));
+ }
+ }
+
+ public CmdFunctionWorkerStats(PulsarAdmin admin) throws
PulsarClientException {
+ super("functions", admin);
+ functionsStats = new FunctionsStats();
+ jcommander.addCommand("functions", functionsStats);
+ }
+
+}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index a4152a4..b11dabe 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -1027,7 +1027,7 @@ public class CmdFunctions extends CmdBase {
System.out.println(gson.toJson(new JsonParser().parse(json)));
}
}
-
+
public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
super("functions", admin);
localRunner = new LocalRunner();
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index 60cb84e..f30b887 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -99,6 +99,7 @@ public class PulsarAdminTool {
commandMap.put("resource-quotas", CmdResourceQuotas.class);
commandMap.put("functions", CmdFunctions.class);
+ commandMap.put("functions-worker-stats", CmdFunctionWorkerStats.class);
commandMap.put("source", CmdSources.class);
commandMap.put("sink", CmdSinks.class);
}
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 0078bc2..65d1b2f 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -79,3 +79,12 @@ service InstanceControl {
rpc GetMetrics(google.protobuf.Empty) returns (MetricsData) {}
rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResult) {}
}
+
+message Metrics {
+ message InstanceMetrics {
+ string name = 1;
+ int32 instanceId = 2;
+ MetricsData metricsData = 3;
+ }
+ repeated InstanceMetrics metrics = 1;
+}
\ No newline at end of file
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index ff43e25..8b73c13 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker.rest;
import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStats;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
import java.util.Arrays;
@@ -36,6 +37,7 @@ public final class Resources {
return new HashSet<>(
Arrays.asList(
FunctionApiV2Resource.class,
+ WorkerStats.class,
MultiPartFeature.class
));
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 50c09a0..c35dd52 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -77,8 +77,14 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.Builder;
+import
org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.Utils;
@@ -127,11 +133,11 @@ public class FunctionsImpl {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
}
-
+
try {
if (!isAuthorizedRole(tenant, clientRole)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
register function", tenant, namespace, functionName,
- clientRole);
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to
register function", tenant, namespace,
+ functionName, clientRole);
return
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData("client is not authorize to
perform operation")).build();
}
@@ -204,7 +210,7 @@ public class FunctionsImpl {
return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
-
+
FunctionDetails functionDetails;
boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
// validate parameters
@@ -267,7 +273,7 @@ public class FunctionsImpl {
return
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData(e.getMessage())).build();
}
-
+
// validate parameters
try {
validateDeregisterRequestParams(tenant, namespace, functionName);
@@ -511,9 +517,9 @@ public class FunctionsImpl {
WorkerInfo leader = membershipManager.getLeader();
if (leader == null) {
- throw new WebApplicationException(
-
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData("Leader cannot be
determined")).build());}
+ throw new
WebApplicationException(Response.status(Status.INTERNAL_SERVER_ERROR)
+ .type(MediaType.APPLICATION_JSON).entity(new
ErrorData("Leader cannot be determined")).build());
+ }
return leader;
}
@@ -781,7 +787,6 @@ public class FunctionsImpl {
return null;
}
-
private FunctionDetails validateUpdateRequestParams(String tenant, String
namespace, String functionName,
String functionDetailsJson, File jarWithFileUrl) throws
IllegalArgumentException {
if (tenant == null) {
@@ -858,12 +863,12 @@ public class FunctionsImpl {
// validate function class-type
Object functionObject =
createInstance(functionDetailsBuilder.getClassName(), classLoader);
Class<?>[] typeArgs =
org.apache.pulsar.functions.utils.Utils.getFunctionTypes(functionObject, false);
-
+
if (!(functionObject instanceof
org.apache.pulsar.functions.api.Function)
&& !(functionObject instanceof java.util.function.Function)) {
throw new RuntimeException("User class must either be Function or
java.util.Function");
}
-
+
if (functionDetailsBuilder.hasSource() &&
functionDetailsBuilder.getSource() != null
&&
isNotBlank(functionDetailsBuilder.getSource().getClassName())) {
try {
@@ -873,8 +878,7 @@ public class FunctionsImpl {
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
// if sink-class not present then set same arg as source
- if (!functionDetailsBuilder.hasSink()
- ||
isBlank(functionDetailsBuilder.getSink().getClassName())) {
+ if (!functionDetailsBuilder.hasSink() ||
isBlank(functionDetailsBuilder.getSink().getClassName())) {
functionDetailsBuilder
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
}
@@ -899,8 +903,7 @@ public class FunctionsImpl {
functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
// if source-class not present then set same arg as sink
- if (!functionDetailsBuilder.hasSource()
- ||
isBlank(functionDetailsBuilder.getSource().getClassName())) {
+ if (!functionDetailsBuilder.hasSource() ||
isBlank(functionDetailsBuilder.getSource().getClassName())) {
functionDetailsBuilder
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
}
@@ -911,7 +914,7 @@ public class FunctionsImpl {
log.error("Failed to validate sink class", e);
throw new IllegalArgumentException("Failed to validate sink
class-name", e);
}
- } else
if(isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())){
+ } else if
(isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())) {
// if function-sink-class is not present then set function-sink
type-class according to function class
functionDetailsBuilder
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(typeArgs[1].getName()));
@@ -956,11 +959,11 @@ public class FunctionsImpl {
return String.format("%s/%s/%s/%s", tenant, namespace,
Codec.encode(functionName),
Utils.getUniquePackageName(Codec.encode(fileName)));
}
-
- private boolean isAuthorizedRole(String tenant, String clientRole) throws
PulsarAdminException {
+
+ public boolean isAuthorizedRole(String tenant, String clientRole) throws
PulsarAdminException {
if (worker().getWorkerConfig().isAuthorizationEnabled()) {
// skip authorization if client role is super-user
- if (clientRole != null &&
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole)) {
+ if (isSuperUser(clientRole)) {
return true;
}
TenantInfo tenantInfo =
worker().getAdmin().tenants().getTenantInfo(tenant);
@@ -970,4 +973,66 @@ public class FunctionsImpl {
return true;
}
+ public boolean isSuperUser(String clientRole) {
+ return clientRole != null &&
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
+ }
+
+ public Response getFunctionsMetrcis(String clientRole) throws IOException {
+ if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole)) {
+ log.error("Client [{}] is not admin and authorized to get
function-stats", clientRole);
+ return
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData("client is not authorize to perform
operation")).build();
+ }
+ return getFunctionsMetrcis();
+ }
+
+ private Response getFunctionsMetrcis() throws IOException {
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
+ WorkerService workerService = worker();
+ Map<String, FunctionRuntimeInfo> functionRuntimes =
workerService.getFunctionRuntimeManager()
+ .getFunctionRuntimeInfos();
+
+ Metrics.Builder metricsBuilder = Metrics.newBuilder();
+ for (Map.Entry<String, FunctionRuntimeInfo> entry :
functionRuntimes.entrySet()) {
+ String fullyQualifiedInstanceName = entry.getKey();
+ FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
+ RuntimeSpawner functionRuntimeSpawner =
functionRuntimeInfo.getRuntimeSpawner();
+
+ if (functionRuntimeSpawner != null) {
+ Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
+ if (functionRuntime != null) {
+ try {
+ InstanceCommunication.MetricsData metricsData =
workerService.getWorkerConfig()
+ .getMetricsSamplingPeriodSec() > 0 ?
functionRuntime.getMetrics().get()
+ :
functionRuntime.getAndResetMetrics().get();
+
+ String tenant =
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+ .getFunctionDetails().getTenant();
+ String namespace =
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+ .getFunctionDetails().getNamespace();
+ String name =
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+ .getFunctionDetails().getName();
+ int instanceId =
functionRuntimeInfo.getFunctionInstance().getInstanceId();
+ String qualifiedFunctionName =
String.format("%s/%s/%s", tenant, namespace, name);
+
+ InstanceMetrics.Builder instanceBuilder =
InstanceMetrics.newBuilder();
+ instanceBuilder.setName(qualifiedFunctionName);
+ instanceBuilder.setInstanceId(instanceId);
+ if (metricsData != null) {
+ instanceBuilder.setMetricsData(metricsData);
+ }
+ metricsBuilder.addMetrics(instanceBuilder.build());
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Failed to collect metrics for function
instance {}", fullyQualifiedInstanceName, e);
+ }
+ }
+ }
+ }
+ String jsonResponse =
org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder);
+ return Response.status(Status.OK).entity(jsonResponse).build();
+ }
+
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
new file mode 100644
index 0000000..6e4ae55
--- /dev/null
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import java.io.IOException;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+
+@Slf4j
+@Path("/worker-stats")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+@Api(value = "/worker-stats", description = "Workers admin api", tags =
"workers")
+public class WorkerStats extends FunctionApiResource {
+
+ @GET
+ @Path("/functions")
+ @ApiOperation(value = "Get metrics for all functions owned by worker",
notes = "Requested should be executed by Monitoring agent on each worker to
fetch the metrics", response = Metrics.class)
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 503, message = "Worker service is not
running") })
+ public Response getMetrics() throws IOException {
+ return functions.getFunctionsMetrcis(clientAppId());
+ }
+}
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 67de21a..4dda58f 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -231,8 +231,8 @@ public class KinesisSink implements Sink<byte[]> {
@Override
public void onFailure(Throwable exception) {
- LOG.error("[{}] Failed to published message for replicator of
{}-{} ", kinesisSink.streamName,
- resultContext.getPartitionId(),
resultContext.getRecordSequence());
+ LOG.error("[{}] Failed to published message for replicator of
{}-{}, {} ", kinesisSink.streamName,
+ resultContext.getPartitionId(),
resultContext.getRecordSequence(), exception.getMessage());
kinesisSink.previousPublishFailed = TRUE;
if (kinesisSink.sinkContext != null) {
kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
diff --git a/site2/website/scripts/replace.js b/site2/website/scripts/replace.js
index 8366185..a46f0aa 100644
--- a/site2/website/scripts/replace.js
+++ b/site2/website/scripts/replace.js
@@ -122,5 +122,4 @@ for (v of versions) {
dry: true
};
doReplace(opts);
-}
-
+}
\ No newline at end of file