This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 651e71956d6 [fix][admin] returns 4xx error when pulsar-worker-service 
is disabled and trying to access it (#17901)
651e71956d6 is described below

commit 651e71956d68f20f64795d40a12a4d49061b6a76
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Nov 2 17:40:19 2022 -0700

    [fix][admin] returns 4xx error when pulsar-worker-service is disabled and 
trying to access it (#17901)
    
    (cherry picked from commit 01e0068f0e01216b5b456ecbaf102cdc4dcef56a)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  9 ++++
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  2 +-
 .../apache/pulsar/broker/admin/impl/SinksBase.java |  2 +-
 .../pulsar/broker/admin/impl/SourcesBase.java      |  2 +-
 .../apache/pulsar/broker/admin/v2/Functions.java   |  2 +-
 .../org/apache/pulsar/broker/admin/v2/Worker.java  |  4 +-
 .../apache/pulsar/broker/admin/v2/WorkerStats.java |  2 +-
 .../apache/pulsar/broker/PulsarServiceTest.java    | 59 ++++++++++++++++++++--
 .../pulsar/client/admin/internal/WorkerImpl.java   |  4 +-
 9 files changed, 73 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 04982ea66f7..c1d846b1571 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -64,6 +64,7 @@ import 
org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
@@ -277,6 +278,14 @@ public abstract class AdminResource extends 
PulsarWebResource {
         }
     }
 
+    protected WorkerService validateAndGetWorkerService() {
+        try {
+            return pulsar().getWorkerService();
+        } catch (UnsupportedOperationException e) {
+            throw new RestException(Status.CONFLICT, e.getMessage());
+        }
+    }
+
     protected Policies getNamespacePolicies(NamespaceName namespaceName) {
         try {
             final String namespace = namespaceName.toString();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index bb19dae45e7..401553a4e56 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -55,7 +55,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
 public class FunctionsBase extends AdminResource {
 
     Functions<? extends WorkerService> functions() {
-        return pulsar().getWorkerService().getFunctions();
+        return validateAndGetWorkerService().getFunctions();
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index e2d3a2d1245..6f01b149afb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -50,7 +50,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
 public class SinksBase extends AdminResource {
 
     Sinks<? extends WorkerService> sinks() {
-        return pulsar().getWorkerService().getSinks();
+        return validateAndGetWorkerService().getSinks();
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index 877951912a7..8ac70687c28 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -50,7 +50,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
 public class SourcesBase extends AdminResource {
 
     Sources<? extends WorkerService> sources() {
-        return pulsar().getWorkerService().getSources();
+        return validateAndGetWorkerService().getSources();
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
index a83ea8cd514..1164e371bd7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
@@ -53,7 +53,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
 public class Functions extends AdminResource {
 
     FunctionsV2<? extends WorkerService> functions() {
-        return pulsar().getWorkerService().getFunctionsV2();
+        return validateAndGetWorkerService().getFunctionsV2();
     }
 
     @POST
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
index 12b7950a582..616f46621c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
@@ -45,12 +45,12 @@ import 
org.apache.pulsar.functions.worker.service.api.Workers;
 public class Worker extends AdminResource implements Supplier<WorkerService> {
 
     Workers<? extends WorkerService> workers() {
-        return pulsar().getWorkerService().getWorkers();
+        return validateAndGetWorkerService().getWorkers();
     }
 
     @Override
     public WorkerService get() {
-        return pulsar().getWorkerService();
+        return validateAndGetWorkerService();
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
index 6703caa7a27..6b9837c52be 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
@@ -40,7 +40,7 @@ import org.apache.pulsar.functions.worker.service.api.Workers;
 public class WorkerStats extends AdminResource {
 
     public Workers<? extends WorkerService> workers() {
-        return pulsar().getWorkerService().getWorkers();
+        return validateAndGetWorkerService().getWorkers();
     }
 
     @GET
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index d7bd832adee..e89e8f49ac2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -26,6 +26,7 @@ import java.util.Optional;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.testng.annotations.AfterMethod;
@@ -82,21 +83,71 @@ public class PulsarServiceTest extends 
MockedPulsarServiceBaseTest {
      */
     @Test
     public void testGetWorkerServiceException() throws Exception {
+        init();
         ServiceConfiguration configuration = new ServiceConfiguration();
         configuration.setZookeeperServers("localhost");
         configuration.setClusterName("clusterName");
         configuration.setFunctionsWorkerEnabled(false);
         configuration.setBrokerShutdownTimeoutMs(0L);
-        @Cleanup
-        PulsarService pulsarService = new PulsarService(configuration, new 
WorkerConfig(),
-                Optional.empty(), (exitCode) -> {});
+
+        configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(1.0d);
+        configuration.setBrokerServicePort(Optional.of(0));
+        configuration.setBrokerServicePortTls(Optional.of(0));
+        configuration.setWebServicePort(Optional.of(0));
+        configuration.setWebServicePortTls(Optional.of(0));
+        startBroker(configuration);
 
         String errorMessage = "Pulsar Function Worker is not enabled, probably 
functionsWorkerEnabled is set to false";
+
+        int thrownCnt = 0;
         try {
-            pulsarService.getWorkerService();
+            pulsar.getWorkerService();
         } catch (UnsupportedOperationException e) {
+            thrownCnt++;
+            assertEquals(e.getMessage(), errorMessage);
+        }
+
+        try {
+            admin.sources().listSources("my", "test");
+        } catch (PulsarAdminException e) {
+            thrownCnt++;
+            assertEquals(e.getStatusCode(), 409);
+            assertEquals(e.getMessage(), errorMessage);
+        }
+
+        try {
+            admin.sinks().getSinkStatus("my", "test", "test");
+        } catch (PulsarAdminException e) {
+            thrownCnt++;
+            assertEquals(e.getStatusCode(), 409);
+            assertEquals(e.getMessage(), errorMessage);
+        }
+
+        try {
+            admin.functions().getFunction("my", "test", "test");
+        } catch (PulsarAdminException e) {
+            thrownCnt++;
+            assertEquals(e.getStatusCode(), 409);
+            assertEquals(e.getMessage(), errorMessage);
+        }
+
+        try {
+            admin.worker().getClusterLeader();
+        } catch (PulsarAdminException e) {
+            thrownCnt++;
+            assertEquals(e.getStatusCode(), 409);
+            assertEquals(e.getMessage(), errorMessage);
+        }
+
+        try {
+            admin.worker().getFunctionsStats();
+        } catch (PulsarAdminException e) {
+            thrownCnt++;
+            assertEquals(e.getStatusCode(), 409);
             assertEquals(e.getMessage(), errorMessage);
         }
+
+        assertEquals(thrownCnt, 6);
     }
 
     @Test
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index a8ebcf153ad..26ad376dc79 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -73,7 +73,7 @@ public class WorkerImpl extends BaseResource implements 
Worker {
                     @Override
                     public void completed(Response response) {
                         if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(new 
ClientErrorException(response));
+                            
future.completeExceptionally(getApiException(response));
                         } else {
                             List<WorkerFunctionInstanceStats> metricsList =
                                     response.readEntity(new 
GenericType<List<WorkerFunctionInstanceStats>>() {});
@@ -188,7 +188,7 @@ public class WorkerImpl extends BaseResource implements 
Worker {
                     @Override
                     public void completed(Response response) {
                         if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
-                            future.completeExceptionally(new 
ClientErrorException(response));
+                            
future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(new 
GenericType<WorkerInfo>(){}));
                         }

Reply via email to