This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 76c93a3  Fixes function api can not use authdata to check superuser 
(#10364)
76c93a3 is described below

commit 76c93a3865db84287d0efca851e6ea27d148e4d0
Author: Yong Zhang <[email protected]>
AuthorDate: Tue May 4 10:32:39 2021 +0800

    Fixes function api can not use authdata to check superuser (#10364)
    
    ---
    
    Fixes #10332
    
    *Motivation*
    
    Sometimes, the superuser is not only needed the client
    role but also needs some data in the authentication data
    to check whether the role is the superuser.
    
    The changed interface is introduced from 
https://github.com/apache/pulsar/pull/8560,
    it's safe to change it since there is no release for that.
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  4 +--
 .../apache/pulsar/broker/admin/impl/SinksBase.java |  2 +-
 .../pulsar/broker/admin/impl/SourcesBase.java      |  2 +-
 .../functions/worker/rest/api/ComponentImpl.java   | 17 ++++++------
 .../functions/worker/rest/api/FunctionsImpl.java   |  5 ++--
 .../functions/worker/rest/api/FunctionsImplV2.java |  4 +--
 .../worker/rest/api/v3/FunctionsApiV3Resource.java |  4 +--
 .../worker/rest/api/v3/SinksApiV3Resource.java     |  2 +-
 .../worker/rest/api/v3/SourcesApiV3Resource.java   |  2 +-
 .../functions/worker/service/api/Component.java    |  5 ++--
 .../functions/worker/service/api/Functions.java    |  3 +-
 .../worker/rest/api/FunctionsImplTest.java         | 32 +++++++++++++++++-----
 12 files changed, 52 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 9c1575f..6eb1301 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -667,7 +667,7 @@ public class FunctionsBase extends AdminResource {
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public void uploadFunction(final @FormDataParam("data") InputStream 
uploadedInputStream,
                                final @FormDataParam("path") String path) {
-        functions().uploadFunction(uploadedInputStream, path, clientAppId());
+        functions().uploadFunction(uploadedInputStream, path, clientAppId(), 
clientAuthData());
     }
 
     @GET
@@ -735,6 +735,6 @@ public class FunctionsBase extends AdminResource {
                                              final @FormDataParam("delete") 
boolean delete) {
 
         functions().updateFunctionOnWorkerLeader(tenant, namespace, 
functionName, uploadedInputStream,
-                delete, uri.getRequestUri(), clientAppId());
+                delete, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index d79eeee..5d57723 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -554,6 +554,6 @@ public class SinksBase extends AdminResource {
     })
     @Path("/reloadBuiltInSinks")
     public void reloadSinks() {
-        sinks().reloadConnectors(clientAppId());
+        sinks().reloadConnectors(clientAppId(), clientAuthData());
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index 1d0f555..2d951b0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -506,6 +506,6 @@ public class SourcesBase extends AdminResource {
     })
     @Path("/reloadBuiltInSources")
     public void reloadSources() {
-        sources().reloadConnectors(clientAppId());
+        sources().reloadConnectors(clientAppId(), clientAuthData());
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index aff7d60..1c442a4 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -901,13 +901,13 @@ public abstract class ComponentImpl implements 
Component<PulsarWorkerService> {
     }
 
     @Override
-    public void reloadConnectors(String clientRole) {
+    public void reloadConnectors(String clientRole, AuthenticationDataSource 
authenticationData) {
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
         if (worker().getWorkerConfig().isAuthorizationEnabled()) {
             // Only superuser has permission to do this operation.
-            if (!isSuperUser(clientRole)) {
+            if (!isSuperUser(clientRole, authenticationData)) {
                 throw new RestException(Status.UNAUTHORIZED, "This operation 
requires super-user access");
             }
         }
@@ -1205,13 +1205,14 @@ public abstract class ComponentImpl implements 
Component<PulsarWorkerService> {
     }
 
     @Override
-    public void uploadFunction(final InputStream uploadedInputStream, final 
String path, String clientRole) {
+    public void uploadFunction(final InputStream uploadedInputStream, final 
String path, String clientRole,
+                               AuthenticationDataSource authenticationData) {
 
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
-        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole)) {
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && 
!isSuperUser(clientRole, authenticationData)) {
             throw new RestException(Status.UNAUTHORIZED, "client is not 
authorize to perform operation");
         }
 
@@ -1309,7 +1310,7 @@ public abstract class ComponentImpl implements 
Component<PulsarWorkerService> {
                     throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
                 }
             } else {
-                if (!isSuperUser(clientRole)) {
+                if (!isSuperUser(clientRole, clientAuthenticationDataHttps)) {
                     throw new RestException(Status.UNAUTHORIZED, "client is 
not authorize to perform operation");
                 }
             }
@@ -1441,7 +1442,7 @@ public abstract class ComponentImpl implements 
Component<PulsarWorkerService> {
                                     AuthenticationDataSource 
authenticationData) throws PulsarAdminException {
         if (worker().getWorkerConfig().isAuthorizationEnabled()) {
             // skip authorization if client role is super-user
-            if (isSuperUser(clientRole)) {
+            if (isSuperUser(clientRole, authenticationData)) {
                 return true;
             }
 
@@ -1524,14 +1525,14 @@ public abstract class ComponentImpl implements 
Component<PulsarWorkerService> {
         }
     }
 
-    public boolean isSuperUser(String clientRole) {
+    public boolean isSuperUser(String clientRole, AuthenticationDataSource 
authenticationData) {
         if (clientRole != null) {
             try {
                 if ((worker().getWorkerConfig().getSuperUserRoles() != null
                     && 
worker().getWorkerConfig().getSuperUserRoles().contains(clientRole))) {
                     return true;
                 }
-                return 
worker().getAuthorizationService().isSuperUser(clientRole, null)
+                return 
worker().getAuthorizationService().isSuperUser(clientRole, authenticationData)
                     
.get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS);
             } catch (InterruptedException e) {
                 log.warn("Time-out {} sec while checking the role {} is a 
super user role ",
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 5d2f8ee..8236052 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -670,14 +670,15 @@ public class FunctionsImpl extends ComponentImpl 
implements Functions<PulsarWork
                                final InputStream uploadedInputStream,
                                final boolean delete,
                                URI uri,
-                               final String clientRole) {
+                               final String clientRole,
+                               AuthenticationDataSource authenticationData) {
 
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
         if (worker().getWorkerConfig().isAuthorizationEnabled()) {
-            if (!isSuperUser(clientRole)) {
+            if (!isSuperUser(clientRole, authenticationData)) {
                 log.error("{}/{}/{} Client [{}] is not superuser to update on 
worker leader {}", tenant, namespace,
                         functionName, clientRole, 
ComponentTypeUtils.toString(componentType));
                 throw new RestException(Response.Status.UNAUTHORIZED, "client 
is not authorize to perform operation");
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index 9ed6afb..26479ba 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -195,7 +195,7 @@ public class FunctionsImplV2 implements 
FunctionsV2<PulsarWorkerService> {
 
     @Override
     public Response uploadFunction(InputStream uploadedInputStream, String 
path, String clientRole) {
-        delegate.uploadFunction(uploadedInputStream, path, clientRole);
+        delegate.uploadFunction(uploadedInputStream, path, clientRole, null);
         return Response.ok().build();
     }
 
@@ -248,4 +248,4 @@ public class FunctionsImplV2 implements 
FunctionsV2<PulsarWorkerService> {
 
         return functionStatus;
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 6d233a8..6d037c9 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -312,7 +312,7 @@ public class FunctionsApiV3Resource extends 
FunctionApiResource {
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public void uploadFunction(final @FormDataParam("data") InputStream 
uploadedInputStream,
                                final @FormDataParam("path") String path) {
-        functions().uploadFunction(uploadedInputStream, path, clientAppId());
+        functions().uploadFunction(uploadedInputStream, path, clientAppId(), 
clientAuthData());
     }
 
     @GET
@@ -386,6 +386,6 @@ public class FunctionsApiV3Resource extends 
FunctionApiResource {
                                                  final 
@FormDataParam("delete") boolean delete) {
 
         functions().updateFunctionOnWorkerLeader(tenant, namespace, 
functionName, uploadedInputStream,
-                delete, uri.getRequestUri(), clientAppId());
+                delete, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
index af56d24..1b3811e 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
@@ -274,6 +274,6 @@ public class SinksApiV3Resource extends FunctionApiResource 
{
     })
     @Path("/reloadBuiltInSinks")
     public void reloadSinks() {
-        sinks().reloadConnectors(clientAppId());
+        sinks().reloadConnectors(clientAppId(), clientAuthData());
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
index db8eb24..29fed9a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
@@ -290,6 +290,6 @@ public class SourcesApiV3Resource extends 
FunctionApiResource {
     })
     @Path("/reloadBuiltInSources")
     public void reloadSources() {
-        sources().reloadConnectors(clientAppId());
+        sources().reloadConnectors(clientAppId(), clientAuthData());
     }
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
index 6139a5c..94d2a49 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
@@ -139,7 +139,8 @@ public interface Component<W extends WorkerService> {
 
     void uploadFunction(final InputStream uploadedInputStream,
                         final String path,
-                        String clientRole);
+                        String clientRole,
+                        final AuthenticationDataSource 
clientAuthenticationDataHttps);
 
     StreamingOutput downloadFunction(String path,
                                      String clientRole,
@@ -154,5 +155,5 @@ public interface Component<W extends WorkerService> {
     List<ConnectorDefinition> getListOfConnectors();
 
 
-    void reloadConnectors(String clientRole);
+    void reloadConnectors(String clientRole, AuthenticationDataSource 
clientAuthenticationDataHttps);
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
index 9f68439..aa874ab 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
@@ -61,7 +61,8 @@ public interface Functions<W extends WorkerService> extends 
Component<W> {
                                       final InputStream uploadedInputStream,
                                       final boolean delete,
                                       URI uri,
-                                      final String clientRole);
+                                      final String clientRole,
+                                      final AuthenticationDataSource 
clientAuthenticationDataHttps);
 
     FunctionStatus getFunctionStatus(final String tenant,
                                      final String namespace,
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 03c1a14..76586b7 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker.rest.api;
 
 import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttp;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.client.admin.Namespaces;
@@ -55,6 +56,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
+import javax.servlet.http.HttpServletRequest;
 import java.io.InputStream;
 import java.util.Collections;
 import java.util.HashMap;
@@ -261,9 +263,10 @@ public class FunctionsImplTest {
 
         // test pulsar super user
         final String pulsarSuperUser = "pulsarSuperUser";
-        when(authorizationService.isSuperUser(pulsarSuperUser, 
null)).thenReturn(CompletableFuture.completedFuture(true));
+        when(authorizationService.isSuperUser(eq(pulsarSuperUser), any()))
+            .thenReturn(CompletableFuture.completedFuture(true));
         assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", 
pulsarSuperUser, authenticationDataSource));
-        assertTrue(functionImpl.isSuperUser(pulsarSuperUser));
+        assertTrue(functionImpl.isSuperUser(pulsarSuperUser, null));
 
         // test normal user
         functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
@@ -274,7 +277,7 @@ public class FunctionsImplTest {
         when(admin.tenants()).thenReturn(tenants);
         when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
         when(authorizationService.isTenantAdmin("test-tenant", "test-user", 
tenantInfo, 
authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false));
-        when(authorizationService.isSuperUser("test-user", 
null)).thenReturn(CompletableFuture.completedFuture(false));
+        when(authorizationService.isSuperUser(eq("test-user"), 
any())).thenReturn(CompletableFuture.completedFuture(false));
         assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns", 
"test-user", authenticationDataSource));
 
         // if user is tenant admin
@@ -333,10 +336,10 @@ public class FunctionsImplTest {
             });
 
         AuthenticationDataSource authenticationDataSource = 
mock(AuthenticationDataSource.class);
-        assertTrue(functionImpl.isSuperUser(superUser));
+        assertTrue(functionImpl.isSuperUser(superUser, null));
 
-        assertFalse(functionImpl.isSuperUser("normal-user"));
-        assertFalse(functionImpl.isSuperUser( null));
+        assertFalse(functionImpl.isSuperUser("normal-user", null));
+        assertFalse(functionImpl.isSuperUser( null, null));
 
         // test super roles is null and it's not a pulsar super user
         when(authorizationService.isSuperUser(superUser, null))
@@ -345,7 +348,22 @@ public class FunctionsImplTest {
         workerConfig = new WorkerConfig();
         workerConfig.setAuthorizationEnabled(true);
         doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig();
-        assertFalse(functionImpl.isSuperUser(superUser));
+        assertFalse(functionImpl.isSuperUser(superUser, null));
+
+        // test super role is null but the auth datasource contains superuser
+        when(authorizationService.isSuperUser(anyString(), 
any(AuthenticationDataSource.class)))
+            .thenAnswer((invocationOnMock -> {
+                AuthenticationDataSource authData = 
invocationOnMock.getArgument(1, AuthenticationDataSource.class);
+                String user = authData.getHttpHeader("mockedUser");
+                return 
CompletableFuture.completedFuture(superUser.equals(user));
+            }));
+        AuthenticationDataSource authData = 
mock(AuthenticationDataSource.class);
+        when(authData.getHttpHeader("mockedUser")).thenReturn(superUser);
+        assertTrue(functionImpl.isSuperUser("non-superuser", authData));
+
+        AuthenticationDataSource nonSuperuserAuthData = 
mock(AuthenticationDataSource.class);
+        
when(nonSuperuserAuthData.getHttpHeader("mockedUser")).thenReturn("non-superuser");
+        assertFalse(functionImpl.isSuperUser("non-superuser", 
nonSuperuserAuthData));
     }
 
     public static FunctionConfig createDefaultFunctionConfig() {

Reply via email to