This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c0a57d0 Add authorization to function worker REST endpoints (#4628)
c0a57d0 is described below
commit c0a57d0eb618dce1d85590a9b360af0c08e4a94c
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sat Jun 29 11:02:19 2019 -0700
Add authorization to function worker REST endpoints (#4628)
---
.../org/apache/pulsar/broker/admin/v2/Worker.java | 8 +--
.../functions/worker/rest/api/ComponentImpl.java | 26 +++++-----
.../functions/worker/rest/api/FunctionsImpl.java | 6 +--
.../functions/worker/rest/api/SinksImpl.java | 6 +--
.../functions/worker/rest/api/SourcesImpl.java | 6 +--
.../functions/worker/rest/api/WorkerImpl.java | 60 ++++++++++++----------
.../worker/rest/api/v2/WorkerApiV2Resource.java | 8 +--
7 files changed, 64 insertions(+), 56 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
index 8307b1a..332a43f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
@@ -66,7 +66,7 @@ public class Worker extends AdminResource implements
Supplier<WorkerService> {
@Path("/cluster")
@Produces(MediaType.APPLICATION_JSON)
public List<WorkerInfo> getCluster() {
- return worker.getCluster();
+ return worker.getCluster(clientAppId());
}
@GET
@@ -81,7 +81,7 @@ public class Worker extends AdminResource implements
Supplier<WorkerService> {
@Path("/cluster/leader")
@Produces(MediaType.APPLICATION_JSON)
public WorkerInfo getClusterLeader() {
- return worker.getClusterLeader();
+ return worker.getClusterLeader(clientAppId());
}
@GET
@@ -96,7 +96,7 @@ public class Worker extends AdminResource implements
Supplier<WorkerService> {
@Path("/assignments")
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Collection<String>> getAssignments() {
- return worker.getAssignments();
+ return worker.getAssignments(clientAppId());
}
@GET
@@ -112,6 +112,6 @@ public class Worker extends AdminResource implements
Supplier<WorkerService> {
@Path("/connectors")
@Produces(MediaType.APPLICATION_JSON)
public List<ConnectorDefinition> getConnectorsList() throws IOException {
- return worker.getListOfConnectors();
+ return worker.getListOfConnectors(clientAppId());
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 4b9a655..3b26c31 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -349,7 +349,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
deregister {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to
deregister {}", tenant, namespace,
componentName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
@@ -424,7 +424,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
get {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to get {}",
tenant, namespace,
componentName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
@@ -490,7 +490,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
start/stop {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to
start/stop {}", tenant, namespace,
componentName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
@@ -548,7 +548,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
restart {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to restart
{}", tenant, namespace,
componentName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
@@ -618,7 +618,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
start/stop {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to
start/stop {}", tenant, namespace,
componentName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
@@ -673,7 +673,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
restart {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to restart
{}", tenant, namespace,
componentName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
@@ -725,7 +725,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
get stats for {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to get stats
for {}", tenant, namespace,
componentName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
@@ -781,7 +781,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
get stats for {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to get stats
for {}", tenant, namespace,
componentName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
@@ -842,7 +842,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{} Client [{}] is not admin and authorized to
list {}", tenant, namespace, clientRole,
ComponentTypeUtils.toString(componentType));
+ log.error("{}/{} Client [{}] is not authorized to list {}",
tenant, namespace, clientRole, ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
} catch (PulsarAdminException e) {
@@ -914,7 +914,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
trigger {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to trigger
{}", tenant, namespace,
functionName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
@@ -1029,7 +1029,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
get state for {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to get state
for {}", tenant, namespace,
functionName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
@@ -1115,7 +1115,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
put state for {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to put state
for {}", tenant, namespace,
functionName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
@@ -1380,7 +1380,7 @@ public abstract class ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized
get status for {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized get status
for {}", tenant, namespace,
componentName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 3cfb3ae..af23496 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -95,7 +95,7 @@ public class FunctionsImpl extends ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
register {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to register
{}", tenant, namespace,
functionName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client
is not authorize to perform operation");
}
@@ -119,7 +119,7 @@ public class FunctionsImpl extends ComponentImpl {
}
}
} catch (PulsarAdminException.NotAuthorizedException e) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
operate {} on tenant", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to operate {} on
tenant", tenant, namespace,
functionName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client is
not authorize to perform operation");
} catch (PulsarAdminException.NotFoundException e) {
@@ -259,7 +259,7 @@ public class FunctionsImpl extends ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
update {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to update
{}", tenant, namespace,
functionName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client
is not authorize to perform operation");
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index ea6f33c..9052266 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -97,7 +97,7 @@ public class SinksImpl extends ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
register {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to register
{}", tenant, namespace,
sinkName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client
is not authorize to perform operation");
}
@@ -121,7 +121,7 @@ public class SinksImpl extends ComponentImpl {
}
}
} catch (PulsarAdminException.NotAuthorizedException e) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
operate {} on tenant", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to operate {} on
tenant", tenant, namespace,
sinkName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client is
not authorize to perform operation");
} catch (PulsarAdminException.NotFoundException e) {
@@ -261,7 +261,7 @@ public class SinksImpl extends ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
update {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to update
{}", tenant, namespace,
sinkName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client
is not authorize to perform operation");
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 71ae375..e14f167 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -97,7 +97,7 @@ public class SourcesImpl extends ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
register {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to register
{}", tenant, namespace,
sourceName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client
is not authorize to perform operation");
}
@@ -121,7 +121,7 @@ public class SourcesImpl extends ComponentImpl {
}
}
} catch (PulsarAdminException.NotAuthorizedException e) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
operate {} on tenant", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to operate {} on
tenant", tenant, namespace,
sourceName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client is
not authorize to perform operation");
} catch (PulsarAdminException.NotFoundException e) {
@@ -261,7 +261,7 @@ public class SourcesImpl extends ComponentImpl {
try {
if (!isAuthorizedRole(tenant, namespace, clientRole,
clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to
update {}", tenant, namespace,
+ log.error("{}/{}/{} Client [{}] is not authorized to update
{}", tenant, namespace,
sourceName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client
is not authorize to perform operation");
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index d27eaef..801d32e 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.worker.rest.api;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
import org.apache.pulsar.functions.proto.Function;
@@ -29,13 +28,10 @@ import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.MembershipManager;
-import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.RestException;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.util.ArrayList;
@@ -77,18 +73,29 @@ public class WorkerImpl {
return true;
}
- public List<WorkerInfo> getCluster() {
+ public List<WorkerInfo> getCluster(String clientRole) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
+
+ if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole)) {
+ throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
+ }
+
List<WorkerInfo> workers =
worker().getMembershipManager().getCurrentMembership();
return workers;
}
- public WorkerInfo getClusterLeader() {
+ public WorkerInfo getClusterLeader(String clientRole) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
+
+ if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole)) {
+ log.error("Client [{}] is not authorized to get cluster leader",
clientRole);
+ throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
+ }
+
MembershipManager membershipManager = worker().getMembershipManager();
WorkerInfo leader = membershipManager.getLeader();
@@ -99,11 +106,16 @@ public class WorkerImpl {
return leader;
}
- public Map<String, Collection<String>> getAssignments() {
+ public Map<String, Collection<String>> getAssignments(String clientRole) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
+ if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole)) {
+ log.error("Client [{}] is not authorized to get cluster
assignments", clientRole);
+ throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
+ }
+
FunctionRuntimeManager functionRuntimeManager =
worker().getFunctionRuntimeManager();
Map<String, Map<String, Function.Assignment>> assignments =
functionRuntimeManager.getCurrentAssignments();
Map<String, Collection<String>> ret = new HashMap<>();
@@ -113,40 +125,32 @@ public class WorkerImpl {
return ret;
}
- public boolean isSuperUser(final String clientRole) {
+ private boolean isSuperUser(final String clientRole) {
return clientRole != null &&
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
}
public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(final
String clientRole) {
- if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole)) {
- log.error("Client [{}] is not admin and authorized to get
function-stats", clientRole);
- throw new
WebApplicationException(Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
- .entity(new ErrorData(clientRole + " is not authorize to
get metrics")).build());
- }
- return getWorkerMetrics();
- }
-
- private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics() {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
- return worker().getMetricsGenerator().generate();
- }
-
- public List<WorkerFunctionInstanceStats> getFunctionsMetrics(String
clientRole) throws IOException {
if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole)) {
- log.error("Client [{}] is not admin and authorized to get
function-stats", clientRole);
+ log.error("Client [{}] is not authorized to get worker stats",
clientRole);
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
- return getFunctionsMetrics();
+ return worker().getMetricsGenerator().generate();
}
- private List<WorkerFunctionInstanceStats> getFunctionsMetrics() throws
IOException {
+ public List<WorkerFunctionInstanceStats> getFunctionsMetrics(String
clientRole) throws IOException {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
+ if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole)) {
+ log.error("Client [{}] is not authorized to get function stats",
clientRole);
+ throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
+ }
+
WorkerService workerService = worker();
Map<String, FunctionRuntimeInfo> functionRuntimes =
workerService.getFunctionRuntimeManager()
.getFunctionRuntimeInfos();
@@ -183,11 +187,15 @@ public class WorkerImpl {
return metricsList;
}
- public List<ConnectorDefinition> getListOfConnectors() {
+ public List<ConnectorDefinition> getListOfConnectors(String clientRole) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
+ if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole)) {
+ throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
+ }
+
return this.worker().getConnectorsManager().getConnectors();
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
index e2b8c03..68716c6 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java
@@ -90,7 +90,7 @@ public class WorkerApiV2Resource implements
Supplier<WorkerService> {
@Path("/cluster")
@Produces(MediaType.APPLICATION_JSON)
public List<WorkerInfo> getCluster() {
- return worker.getCluster();
+ return worker.getCluster(clientAppId());
}
@GET
@@ -105,7 +105,7 @@ public class WorkerApiV2Resource implements
Supplier<WorkerService> {
@Path("/cluster/leader")
@Produces(MediaType.APPLICATION_JSON)
public WorkerInfo getClusterLeader() {
- return worker.getClusterLeader();
+ return worker.getClusterLeader(clientAppId());
}
@GET
@@ -120,7 +120,7 @@ public class WorkerApiV2Resource implements
Supplier<WorkerService> {
@Path("/assignments")
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Collection<String>> getAssignments() {
- return worker.getAssignments();
+ return worker.getAssignments(clientAppId());
}
@ApiResponses(value = {
@@ -130,6 +130,6 @@ public class WorkerApiV2Resource implements
Supplier<WorkerService> {
})
@Path("/connectors")
public List<ConnectorDefinition> getConnectorsList() throws IOException {
- return worker.getListOfConnectors();
+ return worker.getListOfConnectors(clientAppId());
}
}