This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c6fe7a1d0c6f6d44fd6da78cf6f508f44cb06587 Author: Yong Zhang <[email protected]> AuthorDate: Sat Jan 23 20:37:00 2021 +0800 [Functions] Fixes function worker get superuser role (#9259) * Fixes function worker get superuser role --- Fixes #7879 *Motivation* Function worker should use authorization service to check a role if a superuser. *Modifications* - Fix the isSuperuser method in the function *Verify this change* Please pick either of following options. - Adjust the original test case to verify it * Fix the tests (cherry picked from commit d3f8440e06691736e686a76127680779b9dabfa8) --- .../apache/pulsar/io/PulsarFunctionE2ETest.java | 36 +++++++++++++++++----- .../functions/worker/rest/api/ComponentImpl.java | 23 ++++++++++++-- .../worker/rest/api/FunctionsImplTest.java | 23 +++++++++++--- 3 files changed, 68 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 4649b3d..316b10b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -22,12 +22,17 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -54,6 +59,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -67,6 +73,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; +import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; import org.apache.pulsar.client.admin.BrokerStats; @@ -1512,15 +1519,30 @@ public class PulsarFunctionE2ETest { propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use"))); admin.tenants().updateTenant(tenant, propAdmin); - String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile(); + String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader() + .getResource("pulsar-functions-api-examples.jar").getFile(); FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, "my.*", sinkTopic, subscriptionName); - try { - admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl); - assertTrue(validRoleName); - } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) { - assertFalse(validRoleName); + if (!validRoleName) { + // create a non-superuser admin to test the api + admin = spy( + PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddressTls()) + .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH) + .allowTlsInsecureConnection(true).build()); + try { + admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl); + } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) { + assertFalse(validRoleName); + } + } else { + try { + admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl); + assertTrue(validRoleName); + } catch (org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException ne) { + fail(); + } } + } @Test(timeOut = 20000) @@ -1812,4 +1834,4 @@ public class PulsarFunctionE2ETest { double value; } -} \ No newline at end of file +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 56771f6..3bcea1b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -96,6 +96,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -1513,9 +1514,25 @@ public abstract class ComponentImpl { } public boolean isSuperUser(String clientRole) { - return clientRole != null - && worker().getWorkerConfig().getSuperUserRoles() != null - && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole); + if (clientRole != null) { + try { + if ((worker().getWorkerConfig().getSuperUserRoles() != null + && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole))) { + return true; + } + return worker().getAuthorizationService().isSuperUser(clientRole, null) + .get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS); + } catch (InterruptedException e) { + log.warn("Time-out {} sec while checking the role {} is a super user role ", + worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), clientRole); + throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); + } catch (Exception e) { + log.warn("Admin-client with Role - failed to check the role {} is a super user role {} ", clientRole, + e.getMessage(), e); + throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); + } + } + return false; } public boolean allowFunctionOps(NamespaceName namespaceName, String role, diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java index bd65a0b..d1cf95b 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java @@ -44,7 +44,6 @@ import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; import org.apache.pulsar.functions.worker.FunctionRuntimeManager; import org.apache.pulsar.functions.worker.WorkerUtils; import org.apache.pulsar.functions.worker.WorkerConfig; -import org.apache.pulsar.functions.worker.WorkerService; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -63,6 +62,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.eq; @@ -247,6 +247,12 @@ public class FunctionsImplTest { // test super user assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", superUser, authenticationDataSource)); + // test pulsar super user + final String pulsarSuperUser = "pulsarSuperUser"; + when(authorizationService.isSuperUser(pulsarSuperUser, null)).thenReturn(CompletableFuture.completedFuture(true)); + assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", pulsarSuperUser, authenticationDataSource)); + assertTrue(functionImpl.isSuperUser(pulsarSuperUser)); + // test normal user functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService)); doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), any()); @@ -256,6 +262,7 @@ public class FunctionsImplTest { when(admin.tenants()).thenReturn(tenants); when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin); when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false)); + when(authorizationService.isSuperUser("test-user", null)).thenReturn(CompletableFuture.completedFuture(false)); assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource)); // if user is tenant admin @@ -269,6 +276,7 @@ public class FunctionsImplTest { when(admin.tenants()).thenReturn(tenants); when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin); when(authorizationService.isTenantAdmin("test-tenant", "test-user", tenantInfo, authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true)); + when(authorizationService.isSuperUser("test-user", authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false)); assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", "test-user", authenticationDataSource)); // test user allow function action @@ -300,10 +308,17 @@ public class FunctionsImplTest { public void testIsSuperUser() throws PulsarAdminException { FunctionsImpl functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService)); + AuthorizationService authorizationService = mock(AuthorizationService.class); + doReturn(authorizationService).when(mockedWorkerService).getAuthorizationService(); WorkerConfig workerConfig = new WorkerConfig(); workerConfig.setAuthorizationEnabled(true); workerConfig.setSuperUserRoles(Collections.singleton(superUser)); doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig(); + when(authorizationService.isSuperUser(anyString(), any())) + .thenAnswer((invocationOnMock) -> { + String role = invocationOnMock.getArgument(0, String.class); + return CompletableFuture.completedFuture(superUser.equals(role)); + }); AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class); assertTrue(functionImpl.isSuperUser(superUser)); @@ -311,13 +326,13 @@ public class FunctionsImplTest { assertFalse(functionImpl.isSuperUser("normal-user")); assertFalse(functionImpl.isSuperUser( null)); - // test super roles is null - + // test super roles is null and it's not a pulsar super user + when(authorizationService.isSuperUser(superUser, null)) + .thenReturn(CompletableFuture.completedFuture(false)); functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService)); workerConfig = new WorkerConfig(); workerConfig.setAuthorizationEnabled(true); doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig(); - assertFalse(functionImpl.isSuperUser(superUser)); }
