This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e427897c67c KAFKA-14816: Only load SSL properties when issuing
cross-worker requests to HTTPS URLs (#13415)
e427897c67c is described below
commit e427897c67c83ada340f731dc952bfd49d6cc32e
Author: Chris Egerton <[email protected]>
AuthorDate: Mon Mar 20 11:32:01 2023 -0400
KAFKA-14816: Only load SSL properties when issuing cross-worker requests to
HTTPS URLs (#13415)
This fixes a regression introduced in #12828, which caused workers to start
unconditionally loading (and therefore validating) SSL-related properties when
issuing REST requests to other workers. That was fine for the most part, but
caused unnecessary failures when workers were configured with invalid
SSL-related properties and their REST API used HTTP instead of HTTPS.
Reviewers: Ian McDonald <[email protected]>, Greg Harris
<[email protected]>, Yash Mayya <[email protected]>, Justine Olshan
<[email protected]>
---
.../kafka/connect/runtime/rest/RestClient.java | 11 ++++--
.../kafka/connect/runtime/rest/RestClientTest.java | 41 ++++++++++++++++------
2 files changed, 39 insertions(+), 13 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index d9ba6194124..55a564720e0 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -31,6 +31,7 @@ import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,8 +60,8 @@ public class RestClient {
}
// VisibleForTesting
- HttpClient httpClient() {
- return new
HttpClient(SSLUtils.createClientSideSslContextFactory(config));
+ HttpClient httpClient(SslContextFactory sslContextFactory) {
+ return sslContextFactory != null ? new HttpClient(sslContextFactory) :
new HttpClient();
}
/**
@@ -97,7 +98,11 @@ public class RestClient {
public <T> HttpResponse<T> httpRequest(String url, String method,
HttpHeaders headers, Object requestBodyData,
TypeReference<T>
responseFormat,
SecretKey sessionKey, String
requestSignatureAlgorithm) {
- HttpClient client = httpClient();
+ // Only try to load SSL configs if we have to (see KAFKA-14816)
+ SslContextFactory sslContextFactory = url.startsWith("https://")
+ ? SSLUtils.createClientSideSslContextFactory(config)
+ : null;
+ HttpClient client = httpClient(sslContextFactory);
client.setFollowRedirects(false);
try {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
index d7c2218afb2..5db463c2b04 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java
@@ -46,9 +46,11 @@ import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -75,18 +77,24 @@ public class RestClientTest {
return mockKey;
}
- private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient
httpClient, String requestSignatureAlgorithm) {
+ private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient
httpClient, String requestSignatureAlgorithm, boolean https) {
RestClient client = spy(new RestClient(null));
- doReturn(httpClient).when(client).httpClient();
+ doReturn(httpClient).when(client).httpClient(any());
+ String protocol = https ? "https" : "http";
+ String url = protocol + "://localhost:1234/api/endpoint";
return client.httpRequest(
- "https://localhost:1234/api/endpoint",
- "GET",
- null,
- new TestDTO("requestBodyData"),
- TEST_TYPE,
- MOCK_SECRET_KEY,
- requestSignatureAlgorithm
- );
+ url,
+ "GET",
+ null,
+ new TestDTO("requestBodyData"),
+ TEST_TYPE,
+ MOCK_SECRET_KEY,
+ requestSignatureAlgorithm
+ );
+ }
+
+ private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient
httpClient, String requestSignatureAlgorithm) {
+ return httpRequest(httpClient, requestSignatureAlgorithm, false);
}
private static RestClient.HttpResponse<TestDTO> httpRequest(HttpClient
httpClient) throws Exception {
@@ -225,6 +233,19 @@ public class RestClientTest {
ConnectRestException e = assertThrows(ConnectRestException.class,
() -> httpRequest(httpClient));
assertIsInternalServerError(e);
}
+
+ @Test
+ public void testUseSslConfigsOnlyWhenNecessary() throws Exception {
+ // See KAFKA-14816; we want to make sure that even if the worker
is configured with invalid SSL properties,
+ // REST requests only fail if we try to contact a URL using HTTPS
(but not HTTP)
+ int statusCode = Response.Status.OK.getStatusCode();
+ TestDTO expectedResponse = new TestDTO("someContent");
+ setupHttpClient(statusCode, toJsonString(expectedResponse));
+
+ String requestSignatureAlgorithm = "HmacSHA1";
+ assertDoesNotThrow(() -> httpRequest(httpClient,
requestSignatureAlgorithm, false));
+ assertThrows(RuntimeException.class, () -> httpRequest(httpClient,
requestSignatureAlgorithm, true));
+ }
}