This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 76c93a3 Fixes function api can not use authdata to check superuser
(#10364)
76c93a3 is described below
commit 76c93a3865db84287d0efca851e6ea27d148e4d0
Author: Yong Zhang <[email protected]>
AuthorDate: Tue May 4 10:32:39 2021 +0800
Fixes function api can not use authdata to check superuser (#10364)
---
Fixes #10332
*Motivation*
Sometimes, the superuser is not only needed the client
role but also needs some data in the authentication data
to check whether the role is the superuser.
The changed interface is introduced from
https://github.com/apache/pulsar/pull/8560,
it's safe to change it since there is no release for that.
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 4 +--
.../apache/pulsar/broker/admin/impl/SinksBase.java | 2 +-
.../pulsar/broker/admin/impl/SourcesBase.java | 2 +-
.../functions/worker/rest/api/ComponentImpl.java | 17 ++++++------
.../functions/worker/rest/api/FunctionsImpl.java | 5 ++--
.../functions/worker/rest/api/FunctionsImplV2.java | 4 +--
.../worker/rest/api/v3/FunctionsApiV3Resource.java | 4 +--
.../worker/rest/api/v3/SinksApiV3Resource.java | 2 +-
.../worker/rest/api/v3/SourcesApiV3Resource.java | 2 +-
.../functions/worker/service/api/Component.java | 5 ++--
.../functions/worker/service/api/Functions.java | 3 +-
.../worker/rest/api/FunctionsImplTest.java | 32 +++++++++++++++++-----
12 files changed, 52 insertions(+), 30 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 9c1575f..6eb1301 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -667,7 +667,7 @@ public class FunctionsBase extends AdminResource {
@Consumes(MediaType.MULTIPART_FORM_DATA)
public void uploadFunction(final @FormDataParam("data") InputStream
uploadedInputStream,
final @FormDataParam("path") String path) {
- functions().uploadFunction(uploadedInputStream, path, clientAppId());
+ functions().uploadFunction(uploadedInputStream, path, clientAppId(),
clientAuthData());
}
@GET
@@ -735,6 +735,6 @@ public class FunctionsBase extends AdminResource {
final @FormDataParam("delete")
boolean delete) {
functions().updateFunctionOnWorkerLeader(tenant, namespace,
functionName, uploadedInputStream,
- delete, uri.getRequestUri(), clientAppId());
+ delete, uri.getRequestUri(), clientAppId(), clientAuthData());
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index d79eeee..5d57723 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -554,6 +554,6 @@ public class SinksBase extends AdminResource {
})
@Path("/reloadBuiltInSinks")
public void reloadSinks() {
- sinks().reloadConnectors(clientAppId());
+ sinks().reloadConnectors(clientAppId(), clientAuthData());
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index 1d0f555..2d951b0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -506,6 +506,6 @@ public class SourcesBase extends AdminResource {
})
@Path("/reloadBuiltInSources")
public void reloadSources() {
- sources().reloadConnectors(clientAppId());
+ sources().reloadConnectors(clientAppId(), clientAuthData());
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index aff7d60..1c442a4 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -901,13 +901,13 @@ public abstract class ComponentImpl implements
Component<PulsarWorkerService> {
}
@Override
- public void reloadConnectors(String clientRole) {
+ public void reloadConnectors(String clientRole, AuthenticationDataSource
authenticationData) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
if (worker().getWorkerConfig().isAuthorizationEnabled()) {
// Only superuser has permission to do this operation.
- if (!isSuperUser(clientRole)) {
+ if (!isSuperUser(clientRole, authenticationData)) {
throw new RestException(Status.UNAUTHORIZED, "This operation
requires super-user access");
}
}
@@ -1205,13 +1205,14 @@ public abstract class ComponentImpl implements
Component<PulsarWorkerService> {
}
@Override
- public void uploadFunction(final InputStream uploadedInputStream, final
String path, String clientRole) {
+ public void uploadFunction(final InputStream uploadedInputStream, final
String path, String clientRole,
+ AuthenticationDataSource authenticationData) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
- if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole)) {
+ if (worker().getWorkerConfig().isAuthorizationEnabled() &&
!isSuperUser(clientRole, authenticationData)) {
throw new RestException(Status.UNAUTHORIZED, "client is not
authorize to perform operation");
}
@@ -1309,7 +1310,7 @@ public abstract class ComponentImpl implements
Component<PulsarWorkerService> {
throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
}
} else {
- if (!isSuperUser(clientRole)) {
+ if (!isSuperUser(clientRole, clientAuthenticationDataHttps)) {
throw new RestException(Status.UNAUTHORIZED, "client is
not authorize to perform operation");
}
}
@@ -1441,7 +1442,7 @@ public abstract class ComponentImpl implements
Component<PulsarWorkerService> {
AuthenticationDataSource
authenticationData) throws PulsarAdminException {
if (worker().getWorkerConfig().isAuthorizationEnabled()) {
// skip authorization if client role is super-user
- if (isSuperUser(clientRole)) {
+ if (isSuperUser(clientRole, authenticationData)) {
return true;
}
@@ -1524,14 +1525,14 @@ public abstract class ComponentImpl implements
Component<PulsarWorkerService> {
}
}
- public boolean isSuperUser(String clientRole) {
+ public boolean isSuperUser(String clientRole, AuthenticationDataSource
authenticationData) {
if (clientRole != null) {
try {
if ((worker().getWorkerConfig().getSuperUserRoles() != null
&&
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole))) {
return true;
}
- return
worker().getAuthorizationService().isSuperUser(clientRole, null)
+ return
worker().getAuthorizationService().isSuperUser(clientRole, authenticationData)
.get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
log.warn("Time-out {} sec while checking the role {} is a
super user role ",
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 5d2f8ee..8236052 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -670,14 +670,15 @@ public class FunctionsImpl extends ComponentImpl
implements Functions<PulsarWork
final InputStream uploadedInputStream,
final boolean delete,
URI uri,
- final String clientRole) {
+ final String clientRole,
+ AuthenticationDataSource authenticationData) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
if (worker().getWorkerConfig().isAuthorizationEnabled()) {
- if (!isSuperUser(clientRole)) {
+ if (!isSuperUser(clientRole, authenticationData)) {
log.error("{}/{}/{} Client [{}] is not superuser to update on
worker leader {}", tenant, namespace,
functionName, clientRole,
ComponentTypeUtils.toString(componentType));
throw new RestException(Response.Status.UNAUTHORIZED, "client
is not authorize to perform operation");
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index 9ed6afb..26479ba 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -195,7 +195,7 @@ public class FunctionsImplV2 implements
FunctionsV2<PulsarWorkerService> {
@Override
public Response uploadFunction(InputStream uploadedInputStream, String
path, String clientRole) {
- delegate.uploadFunction(uploadedInputStream, path, clientRole);
+ delegate.uploadFunction(uploadedInputStream, path, clientRole, null);
return Response.ok().build();
}
@@ -248,4 +248,4 @@ public class FunctionsImplV2 implements
FunctionsV2<PulsarWorkerService> {
return functionStatus;
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 6d233a8..6d037c9 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -312,7 +312,7 @@ public class FunctionsApiV3Resource extends
FunctionApiResource {
@Consumes(MediaType.MULTIPART_FORM_DATA)
public void uploadFunction(final @FormDataParam("data") InputStream
uploadedInputStream,
final @FormDataParam("path") String path) {
- functions().uploadFunction(uploadedInputStream, path, clientAppId());
+ functions().uploadFunction(uploadedInputStream, path, clientAppId(),
clientAuthData());
}
@GET
@@ -386,6 +386,6 @@ public class FunctionsApiV3Resource extends
FunctionApiResource {
final
@FormDataParam("delete") boolean delete) {
functions().updateFunctionOnWorkerLeader(tenant, namespace,
functionName, uploadedInputStream,
- delete, uri.getRequestUri(), clientAppId());
+ delete, uri.getRequestUri(), clientAppId(), clientAuthData());
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
index af56d24..1b3811e 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
@@ -274,6 +274,6 @@ public class SinksApiV3Resource extends FunctionApiResource
{
})
@Path("/reloadBuiltInSinks")
public void reloadSinks() {
- sinks().reloadConnectors(clientAppId());
+ sinks().reloadConnectors(clientAppId(), clientAuthData());
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
index db8eb24..29fed9a 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
@@ -290,6 +290,6 @@ public class SourcesApiV3Resource extends
FunctionApiResource {
})
@Path("/reloadBuiltInSources")
public void reloadSources() {
- sources().reloadConnectors(clientAppId());
+ sources().reloadConnectors(clientAppId(), clientAuthData());
}
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
index 6139a5c..94d2a49 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
@@ -139,7 +139,8 @@ public interface Component<W extends WorkerService> {
void uploadFunction(final InputStream uploadedInputStream,
final String path,
- String clientRole);
+ String clientRole,
+ final AuthenticationDataSource
clientAuthenticationDataHttps);
StreamingOutput downloadFunction(String path,
String clientRole,
@@ -154,5 +155,5 @@ public interface Component<W extends WorkerService> {
List<ConnectorDefinition> getListOfConnectors();
- void reloadConnectors(String clientRole);
+ void reloadConnectors(String clientRole, AuthenticationDataSource
clientAuthenticationDataHttps);
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
index 9f68439..aa874ab 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
@@ -61,7 +61,8 @@ public interface Functions<W extends WorkerService> extends
Component<W> {
final InputStream uploadedInputStream,
final boolean delete,
URI uri,
- final String clientRole);
+ final String clientRole,
+ final AuthenticationDataSource
clientAuthenticationDataHttps);
FunctionStatus getFunctionStatus(final String tenant,
final String namespace,
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 03c1a14..76586b7 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker.rest.api;
import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttp;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.client.admin.Namespaces;
@@ -55,6 +56,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
+import javax.servlet.http.HttpServletRequest;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
@@ -261,9 +263,10 @@ public class FunctionsImplTest {
// test pulsar super user
final String pulsarSuperUser = "pulsarSuperUser";
- when(authorizationService.isSuperUser(pulsarSuperUser,
null)).thenReturn(CompletableFuture.completedFuture(true));
+ when(authorizationService.isSuperUser(eq(pulsarSuperUser), any()))
+ .thenReturn(CompletableFuture.completedFuture(true));
assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns",
pulsarSuperUser, authenticationDataSource));
- assertTrue(functionImpl.isSuperUser(pulsarSuperUser));
+ assertTrue(functionImpl.isSuperUser(pulsarSuperUser, null));
// test normal user
functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
@@ -274,7 +277,7 @@ public class FunctionsImplTest {
when(admin.tenants()).thenReturn(tenants);
when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
when(authorizationService.isTenantAdmin("test-tenant", "test-user",
tenantInfo,
authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false));
- when(authorizationService.isSuperUser("test-user",
null)).thenReturn(CompletableFuture.completedFuture(false));
+ when(authorizationService.isSuperUser(eq("test-user"),
any())).thenReturn(CompletableFuture.completedFuture(false));
assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns",
"test-user", authenticationDataSource));
// if user is tenant admin
@@ -333,10 +336,10 @@ public class FunctionsImplTest {
});
AuthenticationDataSource authenticationDataSource =
mock(AuthenticationDataSource.class);
- assertTrue(functionImpl.isSuperUser(superUser));
+ assertTrue(functionImpl.isSuperUser(superUser, null));
- assertFalse(functionImpl.isSuperUser("normal-user"));
- assertFalse(functionImpl.isSuperUser( null));
+ assertFalse(functionImpl.isSuperUser("normal-user", null));
+ assertFalse(functionImpl.isSuperUser( null, null));
// test super roles is null and it's not a pulsar super user
when(authorizationService.isSuperUser(superUser, null))
@@ -345,7 +348,22 @@ public class FunctionsImplTest {
workerConfig = new WorkerConfig();
workerConfig.setAuthorizationEnabled(true);
doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig();
- assertFalse(functionImpl.isSuperUser(superUser));
+ assertFalse(functionImpl.isSuperUser(superUser, null));
+
+ // test super role is null but the auth datasource contains superuser
+ when(authorizationService.isSuperUser(anyString(),
any(AuthenticationDataSource.class)))
+ .thenAnswer((invocationOnMock -> {
+ AuthenticationDataSource authData =
invocationOnMock.getArgument(1, AuthenticationDataSource.class);
+ String user = authData.getHttpHeader("mockedUser");
+ return
CompletableFuture.completedFuture(superUser.equals(user));
+ }));
+ AuthenticationDataSource authData =
mock(AuthenticationDataSource.class);
+ when(authData.getHttpHeader("mockedUser")).thenReturn(superUser);
+ assertTrue(functionImpl.isSuperUser("non-superuser", authData));
+
+ AuthenticationDataSource nonSuperuserAuthData =
mock(AuthenticationDataSource.class);
+
when(nonSuperuserAuthData.getHttpHeader("mockedUser")).thenReturn("non-superuser");
+ assertFalse(functionImpl.isSuperUser("non-superuser",
nonSuperuserAuthData));
}
public static FunctionConfig createDefaultFunctionConfig() {