This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fe7dc8652162e15ab9a5a55164c46450c7f50fc9
Author: GuoJiwei <[email protected]>
AuthorDate: Mon Aug 16 10:11:20 2021 +0800

    [Broker Interceptor] Fix Pulsar didn't respond error messages when throw 
InterceptException (#11650)
    
    (cherry picked from commit 4793ff7bc0aa5ea442878efc85e0bac011cba7ee)
---
 .../apache/pulsar/broker/web/ExceptionHandler.java | 57 ++++++++++++++++++++
 .../pulsar/broker/web/PreInterceptFilter.java      |  8 +--
 .../org/apache/pulsar/broker/web/WebService.java   |  3 +-
 .../pulsar/broker/admin/AdminApiSchemaTest.java    |  4 +-
 .../broker/intercept/InterceptFilterOutTest.java   |  4 +-
 .../pulsar/broker/web/ExceptionHandlerTest.java    | 63 ++++++++++++++++++++++
 .../pulsar/client/admin/internal/BaseResource.java |  2 +-
 .../client/admin/internal/FunctionsImpl.java       | 16 +++---
 .../pulsar/client/admin/internal/PackagesImpl.java |  2 +-
 .../pulsar/client/admin/internal/SinksImpl.java    | 10 ++--
 .../pulsar/client/admin/internal/SourcesImpl.java  | 10 ++--
 .../pulsar/client/admin/internal/WorkerImpl.java   | 10 ++--
 .../admin/internal/http/AsyncHttpConnector.java    | 16 ++++++
 13 files changed, 173 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java
new file mode 100644
index 0000000..8b200a8
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
+import org.apache.pulsar.common.intercept.InterceptException;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.http.MetaData;
+
+/**
+ *  Exception handler for handle exception.
+ */
+public class ExceptionHandler {
+
+    public void handle(ServletResponse response, Exception ex) throws 
IOException {
+        if (ex instanceof InterceptException) {
+            String reason = ex.getMessage();
+            byte[] content = reason.getBytes(StandardCharsets.UTF_8);
+            MetaData.Response info = new MetaData.Response();
+            info.setHttpVersion(HttpVersion.HTTP_1_1);
+            info.setReason(reason);
+            info.setStatus(((InterceptException) ex).getErrorCode());
+            info.setContentLength(content.length);
+            if (response instanceof org.eclipse.jetty.server.Response) {
+                ((org.eclipse.jetty.server.Response) 
response).getHttpChannel().sendResponse(info,
+                        ByteBuffer.wrap(content), true);
+            } else {
+                ((HttpServletResponse) 
response).sendError(((InterceptException) ex).getErrorCode(),
+                        ex.getMessage());
+            }
+        } else {
+            ((HttpServletResponse) 
response).sendError(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+                    ex.getMessage());
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
index 201d9ad..e4e7bbc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
@@ -26,7 +26,6 @@ import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -38,8 +37,11 @@ public class PreInterceptFilter implements Filter {
 
     private final BrokerInterceptor interceptor;
 
-    public PreInterceptFilter(BrokerInterceptor interceptor) {
+    private final ExceptionHandler exceptionHandler;
+
+    public PreInterceptFilter(BrokerInterceptor interceptor, ExceptionHandler 
exceptionHandler) {
         this.interceptor = interceptor;
+        this.exceptionHandler = exceptionHandler;
     }
 
     @Override
@@ -67,7 +69,7 @@ public class PreInterceptFilter implements Filter {
             interceptor.onWebserviceRequest(requestWrapper);
             filterChain.doFilter(requestWrapper, servletResponse);
         } catch (InterceptException e) {
-            ((HttpServletResponse) 
servletResponse).sendError(e.getErrorCode(), e.getMessage());
+            exceptionHandler.handle(servletResponse, e);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 6efb7c6..4d9ff83 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -162,8 +162,9 @@ public class WebService implements AutoCloseable {
 
         if (!pulsar.getConfig().getBrokerInterceptors().isEmpty()
                 || !pulsar.getConfig().isDisableBrokerInterceptors()) {
+            ExceptionHandler handler = new ExceptionHandler();
             // Enable PreInterceptFilter only when interceptors are enabled
-            context.addFilter(new FilterHolder(new 
PreInterceptFilter(pulsar.getBrokerInterceptor())),
+            context.addFilter(new FilterHolder(new 
PreInterceptFilter(pulsar.getBrokerInterceptor(), handler)),
                     MATCH_ALL, EnumSet.allOf(DispatcherType.class));
             context.addFilter(new FilterHolder(new 
ProcessHandlerFilter(pulsar)),
                     MATCH_ALL, EnumSet.allOf(DispatcherType.class));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index 38d0ab7..dce60e0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -193,7 +193,7 @@ public class AdminApiSchemaTest extends 
MockedPulsarServiceBaseTest {
             admin.schemas().createSchema(topicName, foo1SchemaInfo);
             fail("Should have failed");
         } catch (PulsarAdminException.ConflictException e) {
-            assertTrue(e.getMessage().contains("HTTP 409 Conflict"));
+            assertTrue(e.getMessage().contains("HTTP 409"));
         }
 
         namespace = "schematest/testnotfound";
@@ -203,7 +203,7 @@ public class AdminApiSchemaTest extends 
MockedPulsarServiceBaseTest {
             admin.schemas().createSchema(topicName, fooSchemaInfo);
             fail("Should have failed");
         } catch (PulsarAdminException.NotFoundException e) {
-            assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
+            assertTrue(e.getMessage().contains("HTTP 404"));
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
index 1219420..3d8c473 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.intercept;
 
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.web.ExceptionHandler;
 import org.apache.pulsar.broker.web.PreInterceptFilter;
 import org.apache.pulsar.broker.web.ProcessHandlerFilter;
 import org.apache.pulsar.broker.web.ResponseHandlerFilter;
@@ -61,7 +62,8 @@ public class InterceptFilterOutTest {
     @Test
     public void testFilterOutForPreInterceptFilter() throws Exception {
         CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
-        PreInterceptFilter filter = new PreInterceptFilter(interceptor);
+        ExceptionHandler handler = new ExceptionHandler();
+        PreInterceptFilter filter = new PreInterceptFilter(interceptor, 
handler);
 
         HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
         HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ExceptionHandlerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ExceptionHandlerTest.java
new file mode 100644
index 0000000..ed70869
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ExceptionHandlerTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import lombok.SneakyThrows;
+import org.apache.pulsar.common.intercept.InterceptException;
+import org.eclipse.jetty.server.HttpChannel;
+import org.eclipse.jetty.server.Response;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import javax.servlet.http.HttpServletResponse;
+
+import static org.eclipse.jetty.http.HttpStatus.INTERNAL_SERVER_ERROR_500;
+import static org.eclipse.jetty.http.HttpStatus.PRECONDITION_FAILED_412;
+
+/**
+ * Unit test for ExceptionHandler.
+ */
+@Test(groups = "broker")
+public class ExceptionHandlerTest {
+
+    @Test
+    @SneakyThrows
+    public void testHandle() {
+        String restriction = "Reach the max tenants [5] restriction";
+        String internal = "internal exception";
+        String illegal = "illegal argument exception ";
+        ExceptionHandler handler = new ExceptionHandler();
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        handler.handle(response, new 
InterceptException(PRECONDITION_FAILED_412, restriction));
+        Mockito.verify(response).sendError(PRECONDITION_FAILED_412, 
restriction);
+
+        handler.handle(response, new 
InterceptException(INTERNAL_SERVER_ERROR_500, internal));
+        Mockito.verify(response).sendError(INTERNAL_SERVER_ERROR_500, 
internal);
+
+        handler.handle(response, new IllegalArgumentException(illegal));
+        Mockito.verify(response).sendError(INTERNAL_SERVER_ERROR_500, illegal);
+
+        Response response2 = Mockito.mock(Response.class);
+        HttpChannel httpChannel = Mockito.mock(HttpChannel.class);
+        Mockito.when(response2.getHttpChannel()).thenReturn(httpChannel);
+        handler.handle(response2, new 
InterceptException(PRECONDITION_FAILED_412, restriction));
+        Mockito.verify(httpChannel).sendResponse(Mockito.any(), Mockito.any(), 
Mockito.anyBoolean());
+    }
+
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index ae9b0c3..6838fd8 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -249,7 +249,7 @@ public abstract class BaseResource {
     }
 
     public PulsarAdminException getApiException(Response response) {
-        if (response.getStatusInfo().equals(Response.Status.OK)) {
+        if (response.getStatus() == Response.Status.OK.getStatusCode()) {
             return null;
         }
         try {
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 806ef79..a53da9d 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -103,7 +103,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             List<String> functions = response.readEntity(new 
GenericType<List<String>>() {});
@@ -142,7 +142,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             
future.complete(response.readEntity(FunctionConfig.class));
@@ -181,7 +181,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             
future.complete(response.readEntity(FunctionStatus.class));
@@ -224,7 +224,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(
@@ -267,7 +267,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             
future.complete(response.readEntity(FunctionInstanceStatsDataImpl.class));
@@ -307,7 +307,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             
future.complete(response.readEntity(FunctionStatsImpl.class));
@@ -920,7 +920,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
     public List<ConnectorDefinition> getConnectorsList() throws 
PulsarAdminException {
         try {
             Response response = request(functions.path("connectors")).get();
-            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+            if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                 throw getApiException(response);
             }
             return response.readEntity(new 
GenericType<List<ConnectorDefinition>>() {
@@ -977,7 +977,7 @@ public class FunctionsImpl extends ComponentResource 
implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             
future.complete(response.readEntity(FunctionState.class));
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
index ae84649..4c7fc4c 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
@@ -176,7 +176,7 @@ public class PackagesImpl extends ComponentResource 
implements Packages {
         asyncGetRequest(webTarget, new InvocationCallback<Response>(){
             @Override
             public void completed(Response response) {
-                if (response.getStatusInfo().equals(Response.Status.OK)) {
+                if (response.getStatus() == 
Response.Status.OK.getStatusCode()) {
                     try (InputStream inputStream = 
response.readEntity(InputStream.class)) {
                         Path destinyPath = Paths.get(path);
                         if (destinyPath.getParent() != null) {
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
index 6a31823..215c893 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
@@ -89,7 +89,7 @@ public class SinksImpl extends ComponentResource implements 
Sinks, Sink {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(new 
GenericType<List<String>>() {}));
@@ -129,7 +129,7 @@ public class SinksImpl extends ComponentResource implements 
Sinks, Sink {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             
future.complete(response.readEntity(SinkConfig.class));
@@ -170,7 +170,7 @@ public class SinksImpl extends ComponentResource implements 
Sinks, Sink {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             
future.complete(response.readEntity(SinkStatus.class));
@@ -213,7 +213,7 @@ public class SinksImpl extends ComponentResource implements 
Sinks, Sink {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(
@@ -637,7 +637,7 @@ public class SinksImpl extends ComponentResource implements 
Sinks, Sink {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
index 5139202..f52845a 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
@@ -85,7 +85,7 @@ public class SourcesImpl extends ComponentResource implements 
Sources, Source {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(new 
GenericType<List<String>>() {}));
@@ -122,7 +122,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             
future.complete(response.readEntity(SourceConfig.class));
@@ -160,7 +160,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             
future.complete(response.readEntity(SourceStatus.class));
@@ -201,7 +201,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(
@@ -586,7 +586,7 @@ public class SourcesImpl extends ComponentResource 
implements Sources, Source {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             
future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index 0b4d9da..a8ebcf1 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -72,7 +72,7 @@ public class WorkerImpl extends BaseResource implements 
Worker {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(new 
ClientErrorException(response));
                         } else {
                             List<WorkerFunctionInstanceStats> metricsList =
@@ -111,7 +111,7 @@ public class WorkerImpl extends BaseResource implements 
Worker {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(new 
ClientErrorException(response));
                         } else {
                             future.complete(response.readEntity(
@@ -150,7 +150,7 @@ public class WorkerImpl extends BaseResource implements 
Worker {
 
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(new 
ClientErrorException(response));
                         } else {
                             future.complete(response.readEntity(new 
GenericType<List<WorkerInfo>>() {}));
@@ -187,7 +187,7 @@ public class WorkerImpl extends BaseResource implements 
Worker {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(new 
ClientErrorException(response));
                         } else {
                             future.complete(response.readEntity(new 
GenericType<WorkerInfo>(){}));
@@ -224,7 +224,7 @@ public class WorkerImpl extends BaseResource implements 
Worker {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if 
(!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != 
Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(new 
ClientErrorException(response));
                         } else {
                             future.complete(response.readEntity(
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index bf8492c..3e17a38 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -205,6 +205,22 @@ public class AsyncHttpConnector implements Connector {
             } else {
                 ClientResponse jerseyResponse =
                         new 
ClientResponse(Status.fromStatusCode(response.getStatusCode()), jerseyRequest);
+                jerseyResponse.setStatusInfo(new 
javax.ws.rs.core.Response.StatusType() {
+                    @Override
+                    public int getStatusCode() {
+                        return response.getStatusCode();
+                    }
+
+                    @Override
+                    public Status.Family getFamily() {
+                        return 
Status.Family.familyOf(response.getStatusCode());
+                    }
+
+                    @Override
+                    public String getReasonPhrase() {
+                        return response.getStatusText();
+                    }
+                });
                 response.getHeaders().forEach(e -> 
jerseyResponse.header(e.getKey(), e.getValue()));
                 if (response.hasResponseBody()) {
                     
jerseyResponse.setEntityStream(response.getResponseBodyAsStream());

Reply via email to