This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e427897c67c KAFKA-14816: Only load SSL properties when issuing 
cross-worker requests to HTTPS URLs (#13415)
e427897c67c is described below

commit e427897c67c83ada340f731dc952bfd49d6cc32e
Author: Chris Egerton <[email protected]>
AuthorDate: Mon Mar 20 11:32:01 2023 -0400

    KAFKA-14816: Only load SSL properties when issuing cross-worker requests to 
HTTPS URLs (#13415)
    
    This fixes a regression introduced in #12828, which caused workers to start 
unconditionally loading (and therefore validating) SSL-related properties when 
issuing REST requests to other workers. That was fine for the most part, but 
caused unnecessary failures when workers were configured with invalid 
SSL-related properties and their REST API used HTTP instead of HTTPS.
    
    Reviewers: Ian McDonald <[email protected]>, Greg Harris 
<[email protected]>, Yash Mayya <[email protected]>, Justine Olshan 
<[email protected]>
---
 .../kafka/connect/runtime/rest/RestClient.java     | 11 ++++--
 .../kafka/connect/runtime/rest/RestClientTest.java | 41 ++++++++++++++++------
 2 files changed, 39 insertions(+), 13 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index d9ba6194124..55a564720e0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -31,6 +31,7 @@ import org.eclipse.jetty.client.util.StringContentProvider;
 import org.eclipse.jetty.http.HttpField;
 import org.eclipse.jetty.http.HttpFields;
 import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,8 +60,8 @@ public class RestClient {
     }
 
     // VisibleForTesting
-    HttpClient httpClient() {
-        return new 
HttpClient(SSLUtils.createClientSideSslContextFactory(config));
+    HttpClient httpClient(SslContextFactory sslContextFactory) {
+        return sslContextFactory != null ? new HttpClient(sslContextFactory) : 
new HttpClient();
     }
 
     /**
@@ -97,7 +98,11 @@ public class RestClient {
     public <T> HttpResponse<T> httpRequest(String url, String method, 
HttpHeaders headers, Object requestBodyData,
                                                   TypeReference<T> 
responseFormat,
                                                   SecretKey sessionKey, String 
requestSignatureAlgorithm) {
-        HttpClient client = httpClient();
+        // Only try to load SSL configs if we have to (see KAFKA-14816)
+        SslContextFactory sslContextFactory = url.startsWith("https://";)
+                ? SSLUtils.createClientSideSslContextFactory(config)
+                : null;
+        HttpClient client = httpClient(sslContextFactory);
         client.setFollowRedirects(false);
 
         try {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
index d7c2218afb2..5db463c2b04 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
@@ -46,9 +46,11 @@ import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -75,18 +77,24 @@ public class RestClientTest {
         return mockKey;
     }
 
-    private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient 
httpClient, String requestSignatureAlgorithm) {
+    private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient 
httpClient, String requestSignatureAlgorithm, boolean https) {
         RestClient client = spy(new RestClient(null));
-        doReturn(httpClient).when(client).httpClient();
+        doReturn(httpClient).when(client).httpClient(any());
+        String protocol = https ? "https" : "http";
+        String url = protocol + "://localhost:1234/api/endpoint";
         return client.httpRequest(
-                        "https://localhost:1234/api/endpoint";,
-                        "GET",
-                        null,
-                        new TestDTO("requestBodyData"),
-                        TEST_TYPE,
-                        MOCK_SECRET_KEY,
-                        requestSignatureAlgorithm
-                );
+                url,
+                "GET",
+                null,
+                new TestDTO("requestBodyData"),
+                TEST_TYPE,
+                MOCK_SECRET_KEY,
+                requestSignatureAlgorithm
+        );
+    }
+
+    private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient 
httpClient, String requestSignatureAlgorithm) {
+        return httpRequest(httpClient, requestSignatureAlgorithm, false);
     }
 
     private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient 
httpClient) throws Exception {
@@ -225,6 +233,19 @@ public class RestClientTest {
             ConnectRestException e = assertThrows(ConnectRestException.class, 
() -> httpRequest(httpClient));
             assertIsInternalServerError(e);
         }
+
+        @Test
+        public void testUseSslConfigsOnlyWhenNecessary() throws Exception {
+            // See KAFKA-14816; we want to make sure that even if the worker 
is configured with invalid SSL properties,
+            // REST requests only fail if we try to contact a URL using HTTPS 
(but not HTTP)
+            int statusCode = Response.Status.OK.getStatusCode();
+            TestDTO expectedResponse = new TestDTO("someContent");
+            setupHttpClient(statusCode, toJsonString(expectedResponse));
+
+            String requestSignatureAlgorithm = "HmacSHA1";
+            assertDoesNotThrow(() -> httpRequest(httpClient, 
requestSignatureAlgorithm, false));
+            assertThrows(RuntimeException.class, () -> httpRequest(httpClient, 
requestSignatureAlgorithm, true));
+        }
     }
 
 

Reply via email to