This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new c6954b7  KAFKA-9883: Add better error message when REST API forwards a 
request and leader is not known (#8536)
c6954b7 is described below

commit c6954b79cadaafe3d439ed006c352ec386c623b4
Author: Randall Hauch <rha...@gmail.com>
AuthorDate: Thu Apr 23 13:37:45 2020 -0500

    KAFKA-9883: Add better error message when REST API forwards a request and 
leader is not known (#8536)
    
    When the Connect worker forwards a REST API request to the leader, it might 
get back a `RequestTargetException` that suggests the worker should forward the 
request to a different worker. This can happen when the leader changes, and the 
worker that receives the original request forwards the request to the worker 
that it thinks is the current leader, but that worker is not the current 
leader. In this case. In most cases, the worker that received the forwarded 
request includes the URL of [...]
    
    When this rare case happens, the user gets a null pointer exception in 
their response and the NPE is logged. Instead, the worker should catch this 
condition and provide a more useful error message that is similar to other 
existing error messages that might occur.
    
    Added a unit test that verifies this corner case is caught and this 
particular NPE does not occur.
    
    Author: Randall Hauch <rha...@gmail.com>
    Reviewer: Konstantine Karantasis <konstant...@confluent.io>
---
 .../runtime/rest/resources/ConnectorsResource.java    |  9 ++++++++-
 .../rest/resources/ConnectorsResourceTest.java        | 19 +++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 3b41fb7..8728e1c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -306,7 +306,14 @@ public class ConnectorsResource {
                     // this gives two total hops to resolve the request before 
giving up.
                     boolean recursiveForward = forward == null;
                     RequestTargetException targetException = 
(RequestTargetException) cause;
-                    String forwardUrl = 
UriBuilder.fromUri(targetException.forwardUrl())
+                    String forwardedUrl = targetException.forwardUrl();
+                    if (forwardedUrl == null) {
+                        // the target didn't know of the leader at this moment.
+                        throw new 
ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
+                                "Cannot complete request momentarily due to no 
known leader URL, "
+                                + "likely because a rebalance was underway.");
+                    }
+                    String forwardUrl = UriBuilder.fromUri(forwardedUrl)
                             .path(path)
                             .queryParam("forward", recursiveForward)
                             .build()
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 967b8fc..f604c80 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -36,6 +36,7 @@ import 
org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.easymock.Capture;
@@ -69,6 +70,8 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(RestClient.class)
@@ -803,6 +806,22 @@ public class ConnectorsResourceTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testCompleteOrForwardWithErrorAndNoForwardUrl() throws 
Throwable {
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = 
Capture.newInstance();
+        herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.capture(cb));
+        String leaderUrl = null;
+        expectAndCallbackException(cb, new NotLeaderException("not leader", 
leaderUrl));
+
+        PowerMock.replayAll();
+
+        ConnectRestException e = assertThrows(ConnectRestException.class, () 
-> {
+            connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, 
FORWARD);
+        });
+        assertTrue(e.getMessage().contains("no known leader URL"));
+        PowerMock.verifyAll();
+    }
+
     private <T> byte[] serializeAsBytes(final T value) throws IOException {
         return new ObjectMapper().writeValueAsBytes(value);
     }

Reply via email to