This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new e9b72b1  KAFKA-9747: Creating connect reconfiguration URL safely 
(#11174)
e9b72b1 is described below

commit e9b72b13c8a9b4cb8eaa2a9b0a9207acb40edfc9
Author: Andras Katona <[email protected]>
AuthorDate: Thu Sep 2 10:09:55 2021 +0200

    KAFKA-9747: Creating connect reconfiguration URL safely (#11174)
    
    * URL wasn't urlencoded when forwarded reconfiguration to leader connect 
worker
    * handling previously swallowed errors in connect RestClient
    
    Reviewers: Mickael Maison <[email protected]>, Viktor Somogyi-Vass 
<[email protected]>
    
    Co-authored-by: Andras Katona  <[email protected]>
    Co-authored-by: Daniel Urban <[email protected]>
---
 .../kafka/connect/runtime/distributed/DistributedHerder.java     | 9 +++++++--
 .../java/org/apache/kafka/connect/runtime/rest/RestClient.java   | 3 +++
 .../java/org/apache/kafka/connect/runtime/rest/RestServer.java   | 7 -------
 3 files changed, 10 insertions(+), 9 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index cb2c4da..0c668a0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -50,7 +50,6 @@ import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
 import org.apache.kafka.connect.runtime.rest.RestClient;
-import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
@@ -66,6 +65,7 @@ import org.slf4j.Logger;
 import javax.crypto.KeyGenerator;
 import javax.crypto.SecretKey;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1463,7 +1463,12 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                                         "because the URL of the leader's REST 
interface is empty!"), null);
                                 return;
                             }
-                            String reconfigUrl = RestServer.urlJoin(leaderUrl, 
"/connectors/" + connName + "/tasks");
+                            String reconfigUrl = UriBuilder.fromUri(leaderUrl)
+                                    .path("connectors")
+                                    .path(connName)
+                                    .path("tasks")
+                                    .build()
+                                    .toString();
                             log.trace("Forwarding task configurations for 
connector {} to leader", connName);
                             RestClient.httpRequest(reconfigUrl, "POST", null, 
rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
                             cb.onCompletion(null, null);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index 58d7df0..81c5a84 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -142,6 +142,9 @@ public class RestClient {
         } catch (IOException | InterruptedException | TimeoutException | 
ExecutionException e) {
             log.error("IO error forwarding REST request: ", e);
             throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to 
forward REST request: " + e.getMessage(), e);
+        } catch (Throwable t) {
+            log.error("Error forwarding REST request", t);
+            throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Error trying to 
forward REST request: " + t.getMessage(), t);
         } finally {
             try {
                 client.stop();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 136c616..4be40c1 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -471,13 +471,6 @@ public class RestServer {
 
     }
 
-    public static String urlJoin(String base, String path) {
-        if (base.endsWith("/") && path.startsWith("/"))
-            return base + path.substring(1);
-        else
-            return base + path;
-    }
-
     /**
      * Register header filter to ServletContextHandler.
      * @param context The serverlet context handler

Reply via email to