This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 651e71956d6 [fix][admin] returns 4xx error when pulsar-worker-service
is disabled and trying to access it (#17901)
651e71956d6 is described below
commit 651e71956d68f20f64795d40a12a4d49061b6a76
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Nov 2 17:40:19 2022 -0700
[fix][admin] returns 4xx error when pulsar-worker-service is disabled and
trying to access it (#17901)
(cherry picked from commit 01e0068f0e01216b5b456ecbaf102cdc4dcef56a)
---
.../apache/pulsar/broker/admin/AdminResource.java | 9 ++++
.../pulsar/broker/admin/impl/FunctionsBase.java | 2 +-
.../apache/pulsar/broker/admin/impl/SinksBase.java | 2 +-
.../pulsar/broker/admin/impl/SourcesBase.java | 2 +-
.../apache/pulsar/broker/admin/v2/Functions.java | 2 +-
.../org/apache/pulsar/broker/admin/v2/Worker.java | 4 +-
.../apache/pulsar/broker/admin/v2/WorkerStats.java | 2 +-
.../apache/pulsar/broker/PulsarServiceTest.java | 59 ++++++++++++++++++++--
.../pulsar/client/admin/internal/WorkerImpl.java | 4 +-
9 files changed, 73 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 04982ea66f7..c1d846b1571 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -64,6 +64,7 @@ import
org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
@@ -277,6 +278,14 @@ public abstract class AdminResource extends
PulsarWebResource {
}
}
+ protected WorkerService validateAndGetWorkerService() {
+ try {
+ return pulsar().getWorkerService();
+ } catch (UnsupportedOperationException e) {
+ throw new RestException(Status.CONFLICT, e.getMessage());
+ }
+ }
+
protected Policies getNamespacePolicies(NamespaceName namespaceName) {
try {
final String namespace = namespaceName.toString();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index bb19dae45e7..401553a4e56 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -55,7 +55,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
public class FunctionsBase extends AdminResource {
Functions<? extends WorkerService> functions() {
- return pulsar().getWorkerService().getFunctions();
+ return validateAndGetWorkerService().getFunctions();
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index e2d3a2d1245..6f01b149afb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -50,7 +50,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
public class SinksBase extends AdminResource {
Sinks<? extends WorkerService> sinks() {
- return pulsar().getWorkerService().getSinks();
+ return validateAndGetWorkerService().getSinks();
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index 877951912a7..8ac70687c28 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -50,7 +50,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
public class SourcesBase extends AdminResource {
Sources<? extends WorkerService> sources() {
- return pulsar().getWorkerService().getSources();
+ return validateAndGetWorkerService().getSources();
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
index a83ea8cd514..1164e371bd7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
@@ -53,7 +53,7 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
public class Functions extends AdminResource {
FunctionsV2<? extends WorkerService> functions() {
- return pulsar().getWorkerService().getFunctionsV2();
+ return validateAndGetWorkerService().getFunctionsV2();
}
@POST
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
index 12b7950a582..616f46621c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java
@@ -45,12 +45,12 @@ import
org.apache.pulsar.functions.worker.service.api.Workers;
public class Worker extends AdminResource implements Supplier<WorkerService> {
Workers<? extends WorkerService> workers() {
- return pulsar().getWorkerService().getWorkers();
+ return validateAndGetWorkerService().getWorkers();
}
@Override
public WorkerService get() {
- return pulsar().getWorkerService();
+ return validateAndGetWorkerService();
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
index 6703caa7a27..6b9837c52be 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/WorkerStats.java
@@ -40,7 +40,7 @@ import org.apache.pulsar.functions.worker.service.api.Workers;
public class WorkerStats extends AdminResource {
public Workers<? extends WorkerService> workers() {
- return pulsar().getWorkerService().getWorkers();
+ return validateAndGetWorkerService().getWorkers();
}
@GET
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index d7bd832adee..e89e8f49ac2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -26,6 +26,7 @@ import java.util.Optional;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.testng.annotations.AfterMethod;
@@ -82,21 +83,71 @@ public class PulsarServiceTest extends
MockedPulsarServiceBaseTest {
*/
@Test
public void testGetWorkerServiceException() throws Exception {
+ init();
ServiceConfiguration configuration = new ServiceConfiguration();
configuration.setZookeeperServers("localhost");
configuration.setClusterName("clusterName");
configuration.setFunctionsWorkerEnabled(false);
configuration.setBrokerShutdownTimeoutMs(0L);
- @Cleanup
- PulsarService pulsarService = new PulsarService(configuration, new
WorkerConfig(),
- Optional.empty(), (exitCode) -> {});
+
+ configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(1.0d);
+ configuration.setBrokerServicePort(Optional.of(0));
+ configuration.setBrokerServicePortTls(Optional.of(0));
+ configuration.setWebServicePort(Optional.of(0));
+ configuration.setWebServicePortTls(Optional.of(0));
+ startBroker(configuration);
String errorMessage = "Pulsar Function Worker is not enabled, probably
functionsWorkerEnabled is set to false";
+
+ int thrownCnt = 0;
try {
- pulsarService.getWorkerService();
+ pulsar.getWorkerService();
} catch (UnsupportedOperationException e) {
+ thrownCnt++;
+ assertEquals(e.getMessage(), errorMessage);
+ }
+
+ try {
+ admin.sources().listSources("my", "test");
+ } catch (PulsarAdminException e) {
+ thrownCnt++;
+ assertEquals(e.getStatusCode(), 409);
+ assertEquals(e.getMessage(), errorMessage);
+ }
+
+ try {
+ admin.sinks().getSinkStatus("my", "test", "test");
+ } catch (PulsarAdminException e) {
+ thrownCnt++;
+ assertEquals(e.getStatusCode(), 409);
+ assertEquals(e.getMessage(), errorMessage);
+ }
+
+ try {
+ admin.functions().getFunction("my", "test", "test");
+ } catch (PulsarAdminException e) {
+ thrownCnt++;
+ assertEquals(e.getStatusCode(), 409);
+ assertEquals(e.getMessage(), errorMessage);
+ }
+
+ try {
+ admin.worker().getClusterLeader();
+ } catch (PulsarAdminException e) {
+ thrownCnt++;
+ assertEquals(e.getStatusCode(), 409);
+ assertEquals(e.getMessage(), errorMessage);
+ }
+
+ try {
+ admin.worker().getFunctionsStats();
+ } catch (PulsarAdminException e) {
+ thrownCnt++;
+ assertEquals(e.getStatusCode(), 409);
assertEquals(e.getMessage(), errorMessage);
}
+
+ assertEquals(thrownCnt, 6);
}
@Test
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index a8ebcf153ad..26ad376dc79 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -73,7 +73,7 @@ public class WorkerImpl extends BaseResource implements
Worker {
@Override
public void completed(Response response) {
if (response.getStatus() !=
Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(new
ClientErrorException(response));
+
future.completeExceptionally(getApiException(response));
} else {
List<WorkerFunctionInstanceStats> metricsList =
response.readEntity(new
GenericType<List<WorkerFunctionInstanceStats>>() {});
@@ -188,7 +188,7 @@ public class WorkerImpl extends BaseResource implements
Worker {
@Override
public void completed(Response response) {
if (response.getStatus() !=
Response.Status.OK.getStatusCode()) {
- future.completeExceptionally(new
ClientErrorException(response));
+
future.completeExceptionally(getApiException(response));
} else {
future.complete(response.readEntity(new
GenericType<WorkerInfo>(){}));
}