This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c0a57d0  Add authorization to function worker REST endpoints (#4628)
c0a57d0 is described below

commit c0a57d0eb618dce1d85590a9b360af0c08e4a94c
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sat Jun 29 11:02:19 2019 -0700

    Add authorization to function worker REST endpoints (#4628)
---
 .../org/apache/pulsar/broker/admin/v2/Worker.java  |  8 +--
 .../functions/worker/rest/api/ComponentImpl.java   | 26 +++++-----
 .../functions/worker/rest/api/FunctionsImpl.java   |  6 +--
 .../functions/worker/rest/api/SinksImpl.java       |  6 +--
 .../functions/worker/rest/api/SourcesImpl.java     |  6 +--
 .../functions/worker/rest/api/WorkerImpl.java      | 60 ++++++++++++----------
 .../worker/rest/api/v2/WorkerApiV2Resource.java    |  8 +--
 7 files changed, 64 insertions(+), 56 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
index 8307b1a..332a43f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
@@ -66,7 +66,7 @@ public class Worker extends AdminResource implements 
Supplier<WorkerService> {
     @Path("/cluster")
     @Produces(MediaType.APPLICATION_JSON)
     public List<WorkerInfo> getCluster() {
-        return worker.getCluster();
+        return worker.getCluster(clientAppId());
     }
 
     @GET
@@ -81,7 +81,7 @@ public class Worker extends AdminResource implements 
Supplier<WorkerService> {
     @Path("/cluster/leader")
     @Produces(MediaType.APPLICATION_JSON)
     public WorkerInfo getClusterLeader() {
-        return worker.getClusterLeader();
+        return worker.getClusterLeader(clientAppId());
     }
 
     @GET
@@ -96,7 +96,7 @@ public class Worker extends AdminResource implements 
Supplier<WorkerService> {
     @Path("/assignments")
     @Produces(MediaType.APPLICATION_JSON)
     public Map<String, Collection<String>> getAssignments() {
-        return worker.getAssignments();
+        return worker.getAssignments(clientAppId());
     }
 
     @GET
@@ -112,6 +112,6 @@ public class Worker extends AdminResource implements 
Supplier<WorkerService> {
     @Path("/connectors")
     @Produces(MediaType.APPLICATION_JSON)
     public List<ConnectorDefinition> getConnectorsList() throws IOException {
-        return worker.getListOfConnectors();
+        return worker.getListOfConnectors(clientAppId());
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 4b9a655..3b26c31 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -349,7 +349,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
deregister {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to 
deregister {}", tenant, namespace,
                         componentName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
             }
@@ -424,7 +424,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
get {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to get {}", 
tenant, namespace,
                         componentName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
             }
@@ -490,7 +490,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
start/stop {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to 
start/stop {}", tenant, namespace,
                         componentName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
             }
@@ -548,7 +548,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
restart {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to restart 
{}", tenant, namespace,
                         componentName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
             }
@@ -618,7 +618,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
start/stop {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to 
start/stop {}", tenant, namespace,
                         componentName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
             }
@@ -673,7 +673,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
restart {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to restart 
{}", tenant, namespace,
                         componentName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
             }
@@ -725,7 +725,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
get stats for {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to get stats 
for {}", tenant, namespace,
                         componentName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
             }
@@ -781,7 +781,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
get stats for {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to get stats 
for {}", tenant, namespace,
                         componentName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
             }
@@ -842,7 +842,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{} Client [{}] is not admin and authorized to 
list {}", tenant, namespace, clientRole, 
ComponentTypeUtils.toString(componentType));
+                log.error("{}/{} Client [{}] is not authorized to list {}", 
tenant, namespace, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -914,7 +914,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
trigger {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to trigger 
{}", tenant, namespace,
                         functionName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
             }
@@ -1029,7 +1029,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
get state for {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to get state 
for {}", tenant, namespace,
                         functionName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
             }
@@ -1115,7 +1115,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
put state for {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to put state 
for {}", tenant, namespace,
                         functionName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
             }
@@ -1380,7 +1380,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized 
get status for {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized get status 
for {}", tenant, namespace,
                         componentName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
             }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 3cfb3ae..af23496 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -95,7 +95,7 @@ public class FunctionsImpl extends ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to register 
{}", tenant, namespace,
                         functionName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Response.Status.UNAUTHORIZED, "client 
is not authorize to perform operation");
             }
@@ -119,7 +119,7 @@ public class FunctionsImpl extends ComponentImpl {
                 }
             }
         } catch (PulsarAdminException.NotAuthorizedException e) {
-            log.error("{}/{}/{} Client [{}] is not admin and authorized to 
operate {} on tenant", tenant, namespace,
+            log.error("{}/{}/{} Client [{}] is not authorized to operate {} on 
tenant", tenant, namespace,
                     functionName, clientRole, 
ComponentTypeUtils.toString(componentType));
             throw new RestException(Response.Status.UNAUTHORIZED, "client is 
not authorize to perform operation");
         } catch (PulsarAdminException.NotFoundException e) {
@@ -259,7 +259,7 @@ public class FunctionsImpl extends ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
update {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to update 
{}", tenant, namespace,
                         functionName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Response.Status.UNAUTHORIZED, "client 
is not authorize to perform operation");
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index ea6f33c..9052266 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -97,7 +97,7 @@ public class SinksImpl extends ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to register 
{}", tenant, namespace,
                         sinkName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Response.Status.UNAUTHORIZED, "client 
is not authorize to perform operation");
             }
@@ -121,7 +121,7 @@ public class SinksImpl extends ComponentImpl {
                 }
             }
         } catch (PulsarAdminException.NotAuthorizedException e) {
-            log.error("{}/{}/{} Client [{}] is not admin and authorized to 
operate {} on tenant", tenant, namespace,
+            log.error("{}/{}/{} Client [{}] is not authorized to operate {} on 
tenant", tenant, namespace,
                     sinkName, clientRole, 
ComponentTypeUtils.toString(componentType));
             throw new RestException(Response.Status.UNAUTHORIZED, "client is 
not authorize to perform operation");
         } catch (PulsarAdminException.NotFoundException e) {
@@ -261,7 +261,7 @@ public class SinksImpl extends ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
update {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to update 
{}", tenant, namespace,
                         sinkName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Response.Status.UNAUTHORIZED, "client 
is not authorize to perform operation");
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 71ae375..e14f167 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -97,7 +97,7 @@ public class SourcesImpl extends ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to register 
{}", tenant, namespace,
                         sourceName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Response.Status.UNAUTHORIZED, "client 
is not authorize to perform operation");
             }
@@ -121,7 +121,7 @@ public class SourcesImpl extends ComponentImpl {
                 }
             }
         } catch (PulsarAdminException.NotAuthorizedException e) {
-            log.error("{}/{}/{} Client [{}] is not admin and authorized to 
operate {} on tenant", tenant, namespace,
+            log.error("{}/{}/{} Client [{}] is not authorized to operate {} on 
tenant", tenant, namespace,
                     sourceName, clientRole, 
ComponentTypeUtils.toString(componentType));
             throw new RestException(Response.Status.UNAUTHORIZED, "client is 
not authorize to perform operation");
         } catch (PulsarAdminException.NotFoundException e) {
@@ -261,7 +261,7 @@ public class SourcesImpl extends ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, 
clientAuthenticationDataHttps)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to 
update {}", tenant, namespace,
+                log.error("{}/{}/{} Client [{}] is not authorized to update 
{}", tenant, namespace,
                         sourceName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Response.Status.UNAUTHORIZED, "client 
is not authorize to perform operation");
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index d27eaef..801d32e 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.worker.rest.api;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
 import org.apache.pulsar.functions.proto.Function;
@@ -29,13 +28,10 @@ import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.MembershipManager;
-import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -77,18 +73,29 @@ public class WorkerImpl {
         return true;
     }
 
-    public List<WorkerInfo> getCluster() {
+    public List<WorkerInfo> getCluster(String clientRole) {
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
+
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
+            throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
+        }
+
         List<WorkerInfo> workers = 
worker().getMembershipManager().getCurrentMembership();
         return workers;
     }
 
-    public WorkerInfo getClusterLeader() {
+    public WorkerInfo getClusterLeader(String clientRole) {
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
+
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
+            log.error("Client [{}] is not authorized to get cluster leader", 
clientRole);
+            throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
+        }
+
         MembershipManager membershipManager = worker().getMembershipManager();
         WorkerInfo leader = membershipManager.getLeader();
 
@@ -99,11 +106,16 @@ public class WorkerImpl {
         return leader;
     }
 
-    public Map<String, Collection<String>> getAssignments() {
+    public Map<String, Collection<String>> getAssignments(String clientRole) {
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
+            log.error("Client [{}] is not authorized to get cluster 
assignments", clientRole);
+            throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
+        }
+
         FunctionRuntimeManager functionRuntimeManager = 
worker().getFunctionRuntimeManager();
         Map<String, Map<String, Function.Assignment>> assignments = 
functionRuntimeManager.getCurrentAssignments();
         Map<String, Collection<String>> ret = new HashMap<>();
@@ -113,40 +125,32 @@ public class WorkerImpl {
         return ret;
     }
 
-    public boolean isSuperUser(final String clientRole) {
+    private boolean isSuperUser(final String clientRole) {
         return clientRole != null && 
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
     }
 
     public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(final 
String clientRole) {
-        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
-            log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
-            throw new 
WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(clientRole + " is not authorize to 
get metrics")).build());
-        }
-        return getWorkerMetrics();
-    }
-
-    private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics() {
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
-        return worker().getMetricsGenerator().generate();
-    }
-
-    public List<WorkerFunctionInstanceStats> getFunctionsMetrics(String 
clientRole) throws IOException {
 
         if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
-            log.error("Client [{}] is not admin and authorized to get 
function-stats", clientRole);
+            log.error("Client [{}] is not authorized to get worker stats", 
clientRole);
             throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
         }
-        return getFunctionsMetrics();
+        return worker().getMetricsGenerator().generate();
     }
 
-    private List<WorkerFunctionInstanceStats> getFunctionsMetrics() throws 
IOException {
+    public List<WorkerFunctionInstanceStats> getFunctionsMetrics(String 
clientRole) throws IOException {
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
+            log.error("Client [{}] is not authorized to get function stats", 
clientRole);
+            throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
+        }
+
         WorkerService workerService = worker();
         Map<String, FunctionRuntimeInfo> functionRuntimes = 
workerService.getFunctionRuntimeManager()
                 .getFunctionRuntimeInfos();
@@ -183,11 +187,15 @@ public class WorkerImpl {
         return metricsList;
     }
 
-    public List<ConnectorDefinition> getListOfConnectors() {
+    public List<ConnectorDefinition> getListOfConnectors(String clientRole) {
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
+            throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
+        }
+
         return this.worker().getConnectorsManager().getConnectors();
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
index e2b8c03..68716c6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
@@ -90,7 +90,7 @@ public class WorkerApiV2Resource implements 
Supplier<WorkerService> {
     @Path("/cluster")
     @Produces(MediaType.APPLICATION_JSON)
     public List<WorkerInfo> getCluster() {
-        return worker.getCluster();
+        return worker.getCluster(clientAppId());
     }
 
     @GET
@@ -105,7 +105,7 @@ public class WorkerApiV2Resource implements 
Supplier<WorkerService> {
     @Path("/cluster/leader")
     @Produces(MediaType.APPLICATION_JSON)
     public WorkerInfo getClusterLeader() {
-        return worker.getClusterLeader();
+        return worker.getClusterLeader(clientAppId());
     }
 
     @GET
@@ -120,7 +120,7 @@ public class WorkerApiV2Resource implements 
Supplier<WorkerService> {
     @Path("/assignments")
     @Produces(MediaType.APPLICATION_JSON)
     public Map<String, Collection<String>> getAssignments() {
-        return worker.getAssignments();
+        return worker.getAssignments(clientAppId());
     }
 
     @ApiResponses(value = {
@@ -130,6 +130,6 @@ public class WorkerApiV2Resource implements 
Supplier<WorkerService> {
     })
     @Path("/connectors")
     public List<ConnectorDefinition> getConnectorsList() throws IOException {
-        return worker.getListOfConnectors();
+        return worker.getListOfConnectors(clientAppId());
     }
 }

Reply via email to