This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 028b014cbf18034150269a6b6288ec519faf69bd Author: Heesung Sohn <[email protected]> AuthorDate: Wed Nov 2 17:40:19 2022 -0700 [fix][admin] returns 4xx error when pulsar-worker-service is disabled and trying to access it (#17901) (cherry picked from commit 01e0068f0e01216b5b456ecbaf102cdc4dcef56a) --- .../apache/pulsar/broker/admin/AdminResource.java | 9 ++++ .../pulsar/broker/admin/impl/FunctionsBase.java | 2 +- .../apache/pulsar/broker/admin/impl/SinksBase.java | 2 +- .../pulsar/broker/admin/impl/SourcesBase.java | 2 +- .../apache/pulsar/broker/admin/v2/Functions.java | 2 +- .../org/apache/pulsar/broker/admin/v2/Worker.java | 4 +- .../apache/pulsar/broker/admin/v2/WorkerStats.java | 2 +- .../apache/pulsar/broker/PulsarServiceTest.java | 58 ++++++++++++++++++++-- 8 files changed, 70 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index eb5b62d0de5..b5d11c757ae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -68,6 +68,7 @@ import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; @@ -285,6 +286,14 @@ public abstract class AdminResource extends PulsarWebResource { } } + protected WorkerService validateAndGetWorkerService() { + try { + return pulsar().getWorkerService(); + } catch (UnsupportedOperationException e) { + throw new RestException(Status.CONFLICT, e.getMessage()); + } + } + protected Policies getNamespacePolicies(NamespaceName namespaceName) { try { Policies policies = namespaceResources().getPolicies(namespaceName) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index f1c4c105de6..3e26215c32e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -55,7 +55,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam; public class FunctionsBase extends AdminResource { Functions<? extends WorkerService> functions() { - return pulsar().getWorkerService().getFunctions(); + return validateAndGetWorkerService().getFunctions(); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java index d45016454f5..3f97545010c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java @@ -50,7 +50,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam; public class SinksBase extends AdminResource { Sinks<? extends WorkerService> sinks() { - return pulsar().getWorkerService().getSinks(); + return validateAndGetWorkerService().getSinks(); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java index b4ba332c312..5d020290a31 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java @@ -50,7 +50,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam; public class SourcesBase extends AdminResource { Sources<? extends WorkerService> sources() { - return pulsar().getWorkerService().getSources(); + return validateAndGetWorkerService().getSources(); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java index a83ea8cd514..1164e371bd7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java @@ -53,7 +53,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam; public class Functions extends AdminResource { FunctionsV2<? extends WorkerService> functions() { - return pulsar().getWorkerService().getFunctionsV2(); + return validateAndGetWorkerService().getFunctionsV2(); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java index 12b7950a582..616f46621c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java @@ -45,12 +45,12 @@ import org.apache.pulsar.functions.worker.service.api.Workers; public class Worker extends AdminResource implements Supplier<WorkerService> { Workers<? extends WorkerService> workers() { - return pulsar().getWorkerService().getWorkers(); + return validateAndGetWorkerService().getWorkers(); } @Override public WorkerService get() { - return pulsar().getWorkerService(); + return validateAndGetWorkerService(); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java index 6703caa7a27..6b9837c52be 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java @@ -40,7 +40,7 @@ import org.apache.pulsar.functions.worker.service.api.Workers; public class WorkerStats extends AdminResource { public Workers<? extends WorkerService> workers() { - return pulsar().getWorkerService().getWorkers(); + return validateAndGetWorkerService().getWorkers(); } @GET diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index d7bd832adee..ddb1ed4d469 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -26,6 +26,7 @@ import java.util.Optional; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.testng.annotations.AfterMethod; @@ -82,21 +83,70 @@ public class PulsarServiceTest extends MockedPulsarServiceBaseTest { */ @Test public void testGetWorkerServiceException() throws Exception { + init(); ServiceConfiguration configuration = new ServiceConfiguration(); configuration.setZookeeperServers("localhost"); configuration.setClusterName("clusterName"); configuration.setFunctionsWorkerEnabled(false); configuration.setBrokerShutdownTimeoutMs(0L); - @Cleanup - PulsarService pulsarService = new PulsarService(configuration, new WorkerConfig(), - Optional.empty(), (exitCode) -> {}); + configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); + configuration.setBrokerServicePort(Optional.of(0)); + configuration.setBrokerServicePortTls(Optional.of(0)); + configuration.setWebServicePort(Optional.of(0)); + configuration.setWebServicePortTls(Optional.of(0)); + startBroker(configuration); String errorMessage = "Pulsar Function Worker is not enabled, probably functionsWorkerEnabled is set to false"; + + int thrownCnt = 0; try { - pulsarService.getWorkerService(); + pulsar.getWorkerService(); } catch (UnsupportedOperationException e) { + thrownCnt++; + assertEquals(e.getMessage(), errorMessage); + } + + try { + admin.sources().listSources("my", "test"); + } catch (PulsarAdminException e) { + thrownCnt++; + assertEquals(e.getStatusCode(), 409); assertEquals(e.getMessage(), errorMessage); } + + try { + admin.sinks().getSinkStatus("my", "test", "test"); + } catch (PulsarAdminException e) { + thrownCnt++; + assertEquals(e.getStatusCode(), 409); + assertEquals(e.getMessage(), errorMessage); + } + + try { + admin.functions().getFunction("my", "test", "test"); + } catch (PulsarAdminException e) { + thrownCnt++; + assertEquals(e.getStatusCode(), 409); + assertEquals(e.getMessage(), errorMessage); + } + + try { + admin.worker().getClusterLeader(); + } catch (PulsarAdminException e) { + thrownCnt++; + assertEquals(e.getStatusCode(), 409); + assertEquals(e.getMessage(), errorMessage); + } + + try { + admin.worker().getFunctionsStats(); + } catch (PulsarAdminException e) { + thrownCnt++; + assertEquals(e.getStatusCode(), 409); + assertEquals(e.getMessage(), errorMessage); + } + + assertEquals(thrownCnt, 6); } @Test
