This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 20b41de  REST and CLI to get function metrics in json for monitoring 
(#2296)
20b41de is described below

commit 20b41de73912ac1f82280d70a291684cb30d8151
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Wed Aug 8 16:57:22 2018 -0700

    REST and CLI to get function metrics in json for monitoring (#2296)
    
    * REST and CLI to get function metrics in json for monitoring
    
    * add worker-stats end-point
---
 .../apache/pulsar/broker/admin/v2/WorkerStats.java |  47 ++++++++++
 .../org/apache/pulsar/client/admin/Functions.java  |   1 +
 .../apache/pulsar/client/admin/PulsarAdmin.java    |  11 +++
 .../apache/pulsar/client/admin/WorkerStats.java    |  35 +++++++
 .../client/admin/internal/FunctionsImpl.java       |   2 +-
 .../client/admin/internal/WorkerStatsImpl.java     |  58 ++++++++++++
 .../pulsar/admin/cli/CmdFunctionWorkerStats.java   |  79 ++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |   2 +-
 .../apache/pulsar/admin/cli/PulsarAdminTool.java   |   1 +
 .../src/main/proto/InstanceCommunication.proto     |   9 ++
 .../pulsar/functions/worker/rest/Resources.java    |   2 +
 .../functions/worker/rest/api/FunctionsImpl.java   | 103 +++++++++++++++++----
 .../functions/worker/rest/api/v2/WorkerStats.java  |  54 +++++++++++
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  |   4 +-
 site2/website/scripts/replace.js                   |   3 +-
 15 files changed, 386 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
new file mode 100644
index 0000000..962c483
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import java.io.IOException;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Path("/worker-stats")
+public class WorkerStats extends FunctionApiResource {
+
+    @GET
+    @Path("/functions")
+    @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = Metrics.class)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 503, message = "Worker service is not 
running") })
+    public Response getMetrics() throws IOException {
+        return functions.getFunctionsMetrcis(clientAppId());
+    }
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 293304f..c04873d 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -27,6 +27,7 @@ import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedExc
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
 import org.apache.pulsar.functions.worker.WorkerInfo;
 
 /**
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 48aea94..7a41fe9 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.admin.internal.NamespacesImpl;
 import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
 import org.apache.pulsar.client.admin.internal.SchemasImpl;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
+import org.apache.pulsar.client.admin.internal.WorkerStatsImpl;
 import org.apache.pulsar.client.admin.internal.TenantsImpl;
 import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
 import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
@@ -84,6 +85,7 @@ public class PulsarAdmin implements Closeable {
     private final String serviceUrl;
     private final Lookup lookups;
     private final Functions functions;
+    private final WorkerStats workerStats;
     private final Schemas schemas;
     protected final WebTarget root;
     protected final Authentication auth;
@@ -187,6 +189,7 @@ public class PulsarAdmin implements Closeable {
         this.resourceQuotas = new ResourceQuotasImpl(root, auth);
         this.lookups = new LookupImpl(root, auth, useTls);
         this.functions = new FunctionsImpl(root, auth);
+        this.workerStats = new WorkerStatsImpl(root, auth);
         this.schemas = new SchemasImpl(root, auth);
         this.bookies = new BookiesImpl(root, auth);
     }
@@ -355,6 +358,14 @@ public class PulsarAdmin implements Closeable {
     }
 
     /**
+    *
+    * @return the Worker stats
+    */
+   public WorkerStats workerStats() {
+       return workerStats;
+   }
+    
+    /**
      * @return the broker statics
      */
     public BrokerStats brokerStats() {
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
new file mode 100644
index 0000000..069e58f
--- /dev/null
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/WorkerStats.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+
+/**
+ * Admin interface for worker stats management.
+ */
+public interface WorkerStats {
+    
+    
+    /**
+     * Get all functions stats on a worker
+     * @return
+     * @throws PulsarAdminException 
+     */
+    Metrics getFunctionsStats() throws PulsarAdminException;
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index fdb3588..028da3c 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -281,7 +281,7 @@ public class FunctionsImpl extends BaseResource implements 
Functions {
             throw getApiException(e);
         }
     }
-    
+
     public static void mergeJson(String json, Builder builder) throws 
IOException {
         JsonFormat.parser().merge(json, builder);
     }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
new file mode 100644
index 0000000..5b6762c
--- /dev/null
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerStatsImpl.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.WorkerStats;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class WorkerStatsImpl extends BaseResource implements WorkerStats {
+
+    private final WebTarget workerStats;
+
+    public WorkerStatsImpl(WebTarget web, Authentication auth) {
+        super(auth);
+        this.workerStats = web.path("/admin/worker-stats");
+    }
+
+    @Override
+    public Metrics getFunctionsStats() throws PulsarAdminException {
+        try {
+            Response response = request(workerStats.path("functions")).get();
+           if (!response.getStatusInfo().equals(Response.Status.OK)) {
+               throw new ClientErrorException(response);
+           }
+           String jsonResponse = response.readEntity(String.class);
+           Metrics.Builder metricsBuilder = Metrics.newBuilder();
+           mergeJson(jsonResponse, metricsBuilder);
+           return metricsBuilder.build();
+       } catch (Exception e) {
+           throw getApiException(e);
+       }
+   }
+}
\ No newline at end of file
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
new file mode 100644
index 0000000..640d7d7
--- /dev/null
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctionWorkerStats.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.functions.utils.Utils;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Parameters(commandDescription = "Operations to collect function-worker 
statistics")
+public class CmdFunctionWorkerStats extends CmdBase {
+
+    private final FunctionsStats functionsStats;
+
+    /**
+     * Base command
+     */
+    @Getter
+    abstract class BaseCommand extends CliCommand {
+        @Override
+        void run() throws Exception {
+            processArguments();
+            runCmd();
+        }
+
+        void processArguments() throws Exception {
+        }
+
+        abstract void runCmd() throws Exception;
+    }
+
+    @Parameters(commandDescription = "dump all functions stats")
+    class FunctionsStats extends BaseCommand {
+
+        @Parameter(names = { "-i", "--indent" }, description = "Indent JSON 
output", required = false)
+        boolean indent = false;
+
+        @Override
+        void runCmd() throws Exception {
+            String json = 
Utils.printJson(admin.workerStats().getFunctionsStats());
+            GsonBuilder gsonBuilder = new GsonBuilder();
+            if (indent) {
+                gsonBuilder.setPrettyPrinting();
+            }
+            System.out.println(gsonBuilder.create().toJson(new 
JsonParser().parse(json)));
+        }
+    }
+
+    public CmdFunctionWorkerStats(PulsarAdmin admin) throws 
PulsarClientException {
+        super("functions", admin);
+        functionsStats = new FunctionsStats();
+        jcommander.addCommand("functions", functionsStats);
+    }
+
+}
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index a4152a4..b11dabe 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -1027,7 +1027,7 @@ public class CmdFunctions extends CmdBase {
             System.out.println(gson.toJson(new JsonParser().parse(json)));
         }
     }
-    
+
     public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
         super("functions", admin);
         localRunner = new LocalRunner();
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index 60cb84e..f30b887 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -99,6 +99,7 @@ public class PulsarAdminTool {
 
         commandMap.put("resource-quotas", CmdResourceQuotas.class);
         commandMap.put("functions", CmdFunctions.class);
+        commandMap.put("functions-worker-stats", CmdFunctionWorkerStats.class);
         commandMap.put("source", CmdSources.class);
         commandMap.put("sink", CmdSinks.class);
     }
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto 
b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 0078bc2..65d1b2f 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -79,3 +79,12 @@ service InstanceControl {
     rpc GetMetrics(google.protobuf.Empty) returns (MetricsData) {}
     rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResult) {}
 }
+
+message Metrics {
+       message InstanceMetrics {
+               string name = 1;
+               int32 instanceId = 2;
+               MetricsData metricsData = 3;
+       }
+       repeated InstanceMetrics metrics = 1;
+}
\ No newline at end of file
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index ff43e25..8b73c13 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker.rest;
 
 import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
 import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStats;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 
 import java.util.Arrays;
@@ -36,6 +37,7 @@ public final class Resources {
         return new HashSet<>(
                 Arrays.asList(
                         FunctionApiV2Resource.class,
+                        WorkerStats.class,
                         MultiPartFeature.class
                 ));
     }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 50c09a0..c35dd52 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -77,8 +77,14 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.Builder;
+import 
org.apache.pulsar.functions.proto.InstanceCommunication.Metrics.InstanceMetrics;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.MembershipManager;
 import org.apache.pulsar.functions.worker.Utils;
@@ -127,11 +133,11 @@ public class FunctionsImpl {
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
         }
-        
+
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register function", tenant, namespace, functionName,
-                        clientRole);
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register function", tenant, namespace,
+                        functionName, clientRole);
                 return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
                         .entity(new ErrorData("client is not authorize to 
perform operation")).build();
             }
@@ -204,7 +210,7 @@ public class FunctionsImpl {
             return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
-        
+
         FunctionDetails functionDetails;
         boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
         // validate parameters
@@ -267,7 +273,7 @@ public class FunctionsImpl {
             return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
-        
+
         // validate parameters
         try {
             validateDeregisterRequestParams(tenant, namespace, functionName);
@@ -511,9 +517,9 @@ public class FunctionsImpl {
         WorkerInfo leader = membershipManager.getLeader();
 
         if (leader == null) {
-            throw new WebApplicationException(
-                    
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
-                            .entity(new ErrorData("Leader cannot be 
determined")).build());}
+            throw new 
WebApplicationException(Response.status(Status.INTERNAL_SERVER_ERROR)
+                    .type(MediaType.APPLICATION_JSON).entity(new 
ErrorData("Leader cannot be determined")).build());
+        }
 
         return leader;
     }
@@ -781,7 +787,6 @@ public class FunctionsImpl {
         return null;
     }
 
-
     private FunctionDetails validateUpdateRequestParams(String tenant, String 
namespace, String functionName,
             String functionDetailsJson, File jarWithFileUrl) throws 
IllegalArgumentException {
         if (tenant == null) {
@@ -858,12 +863,12 @@ public class FunctionsImpl {
         // validate function class-type
         Object functionObject = 
createInstance(functionDetailsBuilder.getClassName(), classLoader);
         Class<?>[] typeArgs = 
org.apache.pulsar.functions.utils.Utils.getFunctionTypes(functionObject, false);
-        
+
         if (!(functionObject instanceof 
org.apache.pulsar.functions.api.Function)
                 && !(functionObject instanceof java.util.function.Function)) {
             throw new RuntimeException("User class must either be Function or 
java.util.Function");
         }
-        
+
         if (functionDetailsBuilder.hasSource() && 
functionDetailsBuilder.getSource() != null
                 && 
isNotBlank(functionDetailsBuilder.getSource().getClassName())) {
             try {
@@ -873,8 +878,7 @@ public class FunctionsImpl {
                         
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
 
                 // if sink-class not present then set same arg as source
-                if (!functionDetailsBuilder.hasSink()
-                        || 
isBlank(functionDetailsBuilder.getSink().getClassName())) {
+                if (!functionDetailsBuilder.hasSink() || 
isBlank(functionDetailsBuilder.getSink().getClassName())) {
                     functionDetailsBuilder
                             
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
                 }
@@ -899,8 +903,7 @@ public class FunctionsImpl {
                 
functionDetailsBuilder.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(argClassName));
 
                 // if source-class not present then set same arg as sink
-                if (!functionDetailsBuilder.hasSource()
-                        || 
isBlank(functionDetailsBuilder.getSource().getClassName())) {
+                if (!functionDetailsBuilder.hasSource() || 
isBlank(functionDetailsBuilder.getSource().getClassName())) {
                     functionDetailsBuilder
                             
.setSource(functionDetailsBuilder.getSourceBuilder().setTypeClassName(argClassName));
                 }
@@ -911,7 +914,7 @@ public class FunctionsImpl {
                 log.error("Failed to validate sink class", e);
                 throw new IllegalArgumentException("Failed to validate sink 
class-name", e);
             }
-        } else 
if(isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())){
+        } else if 
(isBlank(functionDetailsBuilder.getSinkBuilder().getTypeClassName())) {
             // if function-sink-class is not present then set function-sink 
type-class according to function class
             functionDetailsBuilder
                     
.setSink(functionDetailsBuilder.getSinkBuilder().setTypeClassName(typeArgs[1].getName()));
@@ -956,11 +959,11 @@ public class FunctionsImpl {
         return String.format("%s/%s/%s/%s", tenant, namespace, 
Codec.encode(functionName),
                 Utils.getUniquePackageName(Codec.encode(fileName)));
     }
-    
-    private boolean isAuthorizedRole(String tenant, String clientRole) throws 
PulsarAdminException {
+
+    public boolean isAuthorizedRole(String tenant, String clientRole) throws 
PulsarAdminException {
         if (worker().getWorkerConfig().isAuthorizationEnabled()) {
             // skip authorization if client role is super-user
-            if (clientRole != null && 
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole)) {
+            if (isSuperUser(clientRole)) {
                 return true;
             }
             TenantInfo tenantInfo = 
worker().getAdmin().tenants().getTenantInfo(tenant);
@@ -970,4 +973,66 @@ public class FunctionsImpl {
         return true;
     }
 
+    public boolean isSuperUser(String clientRole) {
+        return clientRole != null && 
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
+    }
+    
+    public Response getFunctionsMetrcis(String clientRole) throws IOException {
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
+            log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
+            return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("client is not authorize to perform 
operation")).build();
+        }
+        return getFunctionsMetrcis();
+    }
+
+    private Response getFunctionsMetrcis() throws IOException {
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        WorkerService workerService = worker();
+        Map<String, FunctionRuntimeInfo> functionRuntimes = 
workerService.getFunctionRuntimeManager()
+                .getFunctionRuntimeInfos();
+
+        Metrics.Builder metricsBuilder = Metrics.newBuilder();
+        for (Map.Entry<String, FunctionRuntimeInfo> entry : 
functionRuntimes.entrySet()) {
+            String fullyQualifiedInstanceName = entry.getKey();
+            FunctionRuntimeInfo functionRuntimeInfo = entry.getValue();
+            RuntimeSpawner functionRuntimeSpawner = 
functionRuntimeInfo.getRuntimeSpawner();
+
+            if (functionRuntimeSpawner != null) {
+                Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
+                if (functionRuntime != null) {
+                    try {
+                        InstanceCommunication.MetricsData metricsData = 
workerService.getWorkerConfig()
+                                .getMetricsSamplingPeriodSec() > 0 ? 
functionRuntime.getMetrics().get()
+                                        : 
functionRuntime.getAndResetMetrics().get();
+
+                        String tenant = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                                .getFunctionDetails().getTenant();
+                        String namespace = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                                .getFunctionDetails().getNamespace();
+                        String name = 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
+                                .getFunctionDetails().getName();
+                        int instanceId = 
functionRuntimeInfo.getFunctionInstance().getInstanceId();
+                        String qualifiedFunctionName = 
String.format("%s/%s/%s", tenant, namespace, name);
+
+                        InstanceMetrics.Builder instanceBuilder = 
InstanceMetrics.newBuilder();
+                        instanceBuilder.setName(qualifiedFunctionName);
+                        instanceBuilder.setInstanceId(instanceId);
+                        if (metricsData != null) {
+                            instanceBuilder.setMetricsData(metricsData);
+                        }
+                        metricsBuilder.addMetrics(instanceBuilder.build());
+                    } catch (InterruptedException | ExecutionException e) {
+                        log.warn("Failed to collect metrics for function 
instance {}", fullyQualifiedInstanceName, e);
+                    }
+                }
+            }
+        }
+        String jsonResponse = 
org.apache.pulsar.functions.utils.Utils.printJson(metricsBuilder);
+        return Response.status(Status.OK).entity(jsonResponse).build();
+    }
+
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
new file mode 100644
index 0000000..6e4ae55
--- /dev/null
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerStats.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import java.io.IOException;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
+
+@Slf4j
+@Path("/worker-stats")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+@Api(value = "/worker-stats", description = "Workers admin api", tags = 
"workers")
+public class WorkerStats extends FunctionApiResource {
+
+    @GET
+    @Path("/functions")
+    @ApiOperation(value = "Get metrics for all functions owned by worker", 
notes = "Requested should be executed by Monitoring agent on each worker to 
fetch the metrics", response = Metrics.class)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 503, message = "Worker service is not 
running") })
+    public Response getMetrics() throws IOException {
+        return functions.getFunctionsMetrcis(clientAppId());
+    }
+}
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 67de21a..4dda58f 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -231,8 +231,8 @@ public class KinesisSink implements Sink<byte[]> {
 
         @Override
         public void onFailure(Throwable exception) {
-            LOG.error("[{}] Failed to published message for replicator of 
{}-{} ", kinesisSink.streamName,
-                    resultContext.getPartitionId(), 
resultContext.getRecordSequence());
+            LOG.error("[{}] Failed to published message for replicator of 
{}-{}, {} ", kinesisSink.streamName,
+                    resultContext.getPartitionId(), 
resultContext.getRecordSequence(), exception.getMessage());
             kinesisSink.previousPublishFailed = TRUE;
             if (kinesisSink.sinkContext != null) {
                 kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
diff --git a/site2/website/scripts/replace.js b/site2/website/scripts/replace.js
index 8366185..a46f0aa 100644
--- a/site2/website/scripts/replace.js
+++ b/site2/website/scripts/replace.js
@@ -122,5 +122,4 @@ for (v of versions) {
     dry: true
   };
   doReplace(opts);
-}
-
+}
\ No newline at end of file

Reply via email to