This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fe7dc8652162e15ab9a5a55164c46450c7f50fc9 Author: GuoJiwei <[email protected]> AuthorDate: Mon Aug 16 10:11:20 2021 +0800 [Broker Interceptor] Fix Pulsar didn't respond error messages when throw InterceptException (#11650) (cherry picked from commit 4793ff7bc0aa5ea442878efc85e0bac011cba7ee) --- .../apache/pulsar/broker/web/ExceptionHandler.java | 57 ++++++++++++++++++++ .../pulsar/broker/web/PreInterceptFilter.java | 8 +-- .../org/apache/pulsar/broker/web/WebService.java | 3 +- .../pulsar/broker/admin/AdminApiSchemaTest.java | 4 +- .../broker/intercept/InterceptFilterOutTest.java | 4 +- .../pulsar/broker/web/ExceptionHandlerTest.java | 63 ++++++++++++++++++++++ .../pulsar/client/admin/internal/BaseResource.java | 2 +- .../client/admin/internal/FunctionsImpl.java | 16 +++--- .../pulsar/client/admin/internal/PackagesImpl.java | 2 +- .../pulsar/client/admin/internal/SinksImpl.java | 10 ++-- .../pulsar/client/admin/internal/SourcesImpl.java | 10 ++-- .../pulsar/client/admin/internal/WorkerImpl.java | 10 ++-- .../admin/internal/http/AsyncHttpConnector.java | 16 ++++++ 13 files changed, 173 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java new file mode 100644 index 0000000..8b200a8 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.Response; +import org.apache.pulsar.common.intercept.InterceptException; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; + +/** + * Exception handler for handle exception. + */ +public class ExceptionHandler { + + public void handle(ServletResponse response, Exception ex) throws IOException { + if (ex instanceof InterceptException) { + String reason = ex.getMessage(); + byte[] content = reason.getBytes(StandardCharsets.UTF_8); + MetaData.Response info = new MetaData.Response(); + info.setHttpVersion(HttpVersion.HTTP_1_1); + info.setReason(reason); + info.setStatus(((InterceptException) ex).getErrorCode()); + info.setContentLength(content.length); + if (response instanceof org.eclipse.jetty.server.Response) { + ((org.eclipse.jetty.server.Response) response).getHttpChannel().sendResponse(info, + ByteBuffer.wrap(content), true); + } else { + ((HttpServletResponse) response).sendError(((InterceptException) ex).getErrorCode(), + ex.getMessage()); + } + } else { + ((HttpServletResponse) response).sendError(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), + ex.getMessage()); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java index 201d9ad..e4e7bbc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java @@ -26,7 +26,6 @@ import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MediaType; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -38,8 +37,11 @@ public class PreInterceptFilter implements Filter { private final BrokerInterceptor interceptor; - public PreInterceptFilter(BrokerInterceptor interceptor) { + private final ExceptionHandler exceptionHandler; + + public PreInterceptFilter(BrokerInterceptor interceptor, ExceptionHandler exceptionHandler) { this.interceptor = interceptor; + this.exceptionHandler = exceptionHandler; } @Override @@ -67,7 +69,7 @@ public class PreInterceptFilter implements Filter { interceptor.onWebserviceRequest(requestWrapper); filterChain.doFilter(requestWrapper, servletResponse); } catch (InterceptException e) { - ((HttpServletResponse) servletResponse).sendError(e.getErrorCode(), e.getMessage()); + exceptionHandler.handle(servletResponse, e); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 6efb7c6..4d9ff83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -162,8 +162,9 @@ public class WebService implements AutoCloseable { if (!pulsar.getConfig().getBrokerInterceptors().isEmpty() || !pulsar.getConfig().isDisableBrokerInterceptors()) { + ExceptionHandler handler = new ExceptionHandler(); // Enable PreInterceptFilter only when interceptors are enabled - context.addFilter(new FilterHolder(new PreInterceptFilter(pulsar.getBrokerInterceptor())), + context.addFilter(new FilterHolder(new PreInterceptFilter(pulsar.getBrokerInterceptor(), handler)), MATCH_ALL, EnumSet.allOf(DispatcherType.class)); context.addFilter(new FilterHolder(new ProcessHandlerFilter(pulsar)), MATCH_ALL, EnumSet.allOf(DispatcherType.class)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java index 38d0ab7..dce60e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java @@ -193,7 +193,7 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest { admin.schemas().createSchema(topicName, foo1SchemaInfo); fail("Should have failed"); } catch (PulsarAdminException.ConflictException e) { - assertTrue(e.getMessage().contains("HTTP 409 Conflict")); + assertTrue(e.getMessage().contains("HTTP 409")); } namespace = "schematest/testnotfound"; @@ -203,7 +203,7 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest { admin.schemas().createSchema(topicName, fooSchemaInfo); fail("Should have failed"); } catch (PulsarAdminException.NotFoundException e) { - assertTrue(e.getMessage().contains("HTTP 404 Not Found")); + assertTrue(e.getMessage().contains("HTTP 404")); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java index 1219420..3d8c473 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.broker.intercept; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.web.ExceptionHandler; import org.apache.pulsar.broker.web.PreInterceptFilter; import org.apache.pulsar.broker.web.ProcessHandlerFilter; import org.apache.pulsar.broker.web.ResponseHandlerFilter; @@ -61,7 +62,8 @@ public class InterceptFilterOutTest { @Test public void testFilterOutForPreInterceptFilter() throws Exception { CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor(); - PreInterceptFilter filter = new PreInterceptFilter(interceptor); + ExceptionHandler handler = new ExceptionHandler(); + PreInterceptFilter filter = new PreInterceptFilter(interceptor, handler); HttpServletRequest request = Mockito.mock(HttpServletRequest.class); HttpServletResponse response = Mockito.mock(HttpServletResponse.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ExceptionHandlerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ExceptionHandlerTest.java new file mode 100644 index 0000000..ed70869 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ExceptionHandlerTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import lombok.SneakyThrows; +import org.apache.pulsar.common.intercept.InterceptException; +import org.eclipse.jetty.server.HttpChannel; +import org.eclipse.jetty.server.Response; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +import javax.servlet.http.HttpServletResponse; + +import static org.eclipse.jetty.http.HttpStatus.INTERNAL_SERVER_ERROR_500; +import static org.eclipse.jetty.http.HttpStatus.PRECONDITION_FAILED_412; + +/** + * Unit test for ExceptionHandler. + */ +@Test(groups = "broker") +public class ExceptionHandlerTest { + + @Test + @SneakyThrows + public void testHandle() { + String restriction = "Reach the max tenants [5] restriction"; + String internal = "internal exception"; + String illegal = "illegal argument exception "; + ExceptionHandler handler = new ExceptionHandler(); + HttpServletResponse response = Mockito.mock(HttpServletResponse.class); + handler.handle(response, new InterceptException(PRECONDITION_FAILED_412, restriction)); + Mockito.verify(response).sendError(PRECONDITION_FAILED_412, restriction); + + handler.handle(response, new InterceptException(INTERNAL_SERVER_ERROR_500, internal)); + Mockito.verify(response).sendError(INTERNAL_SERVER_ERROR_500, internal); + + handler.handle(response, new IllegalArgumentException(illegal)); + Mockito.verify(response).sendError(INTERNAL_SERVER_ERROR_500, illegal); + + Response response2 = Mockito.mock(Response.class); + HttpChannel httpChannel = Mockito.mock(HttpChannel.class); + Mockito.when(response2.getHttpChannel()).thenReturn(httpChannel); + handler.handle(response2, new InterceptException(PRECONDITION_FAILED_412, restriction)); + Mockito.verify(httpChannel).sendResponse(Mockito.any(), Mockito.any(), Mockito.anyBoolean()); + } + +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java index ae9b0c3..6838fd8 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java @@ -249,7 +249,7 @@ public abstract class BaseResource { } public PulsarAdminException getApiException(Response response) { - if (response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() == Response.Status.OK.getStatusCode()) { return null; } try { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 806ef79..a53da9d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -103,7 +103,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { List<String> functions = response.readEntity(new GenericType<List<String>>() {}); @@ -142,7 +142,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity(FunctionConfig.class)); @@ -181,7 +181,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity(FunctionStatus.class)); @@ -224,7 +224,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity( @@ -267,7 +267,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity(FunctionInstanceStatsDataImpl.class)); @@ -307,7 +307,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity(FunctionStatsImpl.class)); @@ -920,7 +920,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { public List<ConnectorDefinition> getConnectorsList() throws PulsarAdminException { try { Response response = request(functions.path("connectors")).get(); - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { throw getApiException(response); } return response.readEntity(new GenericType<List<ConnectorDefinition>>() { @@ -977,7 +977,7 @@ public class FunctionsImpl extends ComponentResource implements Functions { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity(FunctionState.class)); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java index ae84649..4c7fc4c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java @@ -176,7 +176,7 @@ public class PackagesImpl extends ComponentResource implements Packages { asyncGetRequest(webTarget, new InvocationCallback<Response>(){ @Override public void completed(Response response) { - if (response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() == Response.Status.OK.getStatusCode()) { try (InputStream inputStream = response.readEntity(InputStream.class)) { Path destinyPath = Paths.get(path); if (destinyPath.getParent() != null) { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java index 6a31823..215c893 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java @@ -89,7 +89,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity(new GenericType<List<String>>() {})); @@ -129,7 +129,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity(SinkConfig.class)); @@ -170,7 +170,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity(SinkStatus.class)); @@ -213,7 +213,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity( @@ -637,7 +637,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity( diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java index 5139202..f52845a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java @@ -85,7 +85,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity(new GenericType<List<String>>() {})); @@ -122,7 +122,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity(SourceConfig.class)); @@ -160,7 +160,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity(SourceStatus.class)); @@ -201,7 +201,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity( @@ -586,7 +586,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(getApiException(response)); } else { future.complete(response.readEntity( diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java index 0b4d9da..a8ebcf1 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java @@ -72,7 +72,7 @@ public class WorkerImpl extends BaseResource implements Worker { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(new ClientErrorException(response)); } else { List<WorkerFunctionInstanceStats> metricsList = @@ -111,7 +111,7 @@ public class WorkerImpl extends BaseResource implements Worker { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(new ClientErrorException(response)); } else { future.complete(response.readEntity( @@ -150,7 +150,7 @@ public class WorkerImpl extends BaseResource implements Worker { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(new ClientErrorException(response)); } else { future.complete(response.readEntity(new GenericType<List<WorkerInfo>>() {})); @@ -187,7 +187,7 @@ public class WorkerImpl extends BaseResource implements Worker { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(new ClientErrorException(response)); } else { future.complete(response.readEntity(new GenericType<WorkerInfo>(){})); @@ -224,7 +224,7 @@ public class WorkerImpl extends BaseResource implements Worker { new InvocationCallback<Response>() { @Override public void completed(Response response) { - if (!response.getStatusInfo().equals(Response.Status.OK)) { + if (response.getStatus() != Response.Status.OK.getStatusCode()) { future.completeExceptionally(new ClientErrorException(response)); } else { future.complete(response.readEntity( diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java index bf8492c..3e17a38 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java @@ -205,6 +205,22 @@ public class AsyncHttpConnector implements Connector { } else { ClientResponse jerseyResponse = new ClientResponse(Status.fromStatusCode(response.getStatusCode()), jerseyRequest); + jerseyResponse.setStatusInfo(new javax.ws.rs.core.Response.StatusType() { + @Override + public int getStatusCode() { + return response.getStatusCode(); + } + + @Override + public Status.Family getFamily() { + return Status.Family.familyOf(response.getStatusCode()); + } + + @Override + public String getReasonPhrase() { + return response.getStatusText(); + } + }); response.getHeaders().forEach(e -> jerseyResponse.header(e.getKey(), e.getValue())); if (response.hasResponseBody()) { jerseyResponse.setEntityStream(response.getResponseBodyAsStream());
