This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 62914129c7d KAFKA-14099 - Fix request logging in connect (#12434)
62914129c7d is described below
commit 62914129c7dbf856a7702f33ed456109c6980b0c
Author: Alexandre Garnier <[email protected]>
AuthorDate: Wed Oct 12 16:28:55 2022 +0200
KAFKA-14099 - Fix request logging in connect (#12434)
Reviewers: Chris Egerton <[email protected]>
---
build.gradle | 1 +
checkstyle/import-control.xml | 1 +
.../kafka/connect/runtime/rest/RestServer.java | 16 +--
.../kafka/connect/runtime/rest/RestServerTest.java | 137 ++++++++++++---------
.../unit/kafka/utils/LogCaptureAppender.scala | 6 +
5 files changed, 94 insertions(+), 67 deletions(-)
diff --git a/build.gradle b/build.gradle
index 642c904d766..7cf6cc44b1c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1281,6 +1281,7 @@ project(':clients') {
testRuntimeOnly libs.jacksonJDK8Datatypes
testImplementation libs.jose4j
testImplementation libs.jacksonJaxrsJsonProvider
+ testImplementation libs.log4j
generator project(':generator')
}
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index cad049935b5..b2959d9ae6b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -573,6 +573,7 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.http"/>
<allow pkg="io.swagger.v3.oas.annotations"/>
+ <allow pkg="kafka.utils" />
<subpackage name="resources">
<allow pkg="org.apache.log4j" />
</subpackage>
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 3c89ddb55fc..baa9b041520 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -41,8 +41,6 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.Slf4jRequestLogWriter;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
-import org.eclipse.jetty.server.handler.DefaultHandler;
-import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
@@ -190,6 +188,11 @@ public class RestServer {
public void initializeServer() {
log.info("Initializing REST server");
+ Slf4jRequestLogWriter slf4jRequestLogWriter = new
Slf4jRequestLogWriter();
+
slf4jRequestLogWriter.setLoggerName(RestServer.class.getCanonicalName());
+ CustomRequestLog requestLog = new
CustomRequestLog(slf4jRequestLogWriter, CustomRequestLog.EXTENDED_NCSA_FORMAT +
" %{ms}T");
+ jettyServer.setRequestLog(requestLog);
+
/* Needed for graceful shutdown as per `setStopTimeout` documentation
*/
StatisticsHandler statsHandler = new StatisticsHandler();
statsHandler.setHandler(handlers);
@@ -284,15 +287,6 @@ public class RestServer {
configureHttpResponsHeaderFilter(context);
}
- RequestLogHandler requestLogHandler = new RequestLogHandler();
- Slf4jRequestLogWriter slf4jRequestLogWriter = new
Slf4jRequestLogWriter();
-
slf4jRequestLogWriter.setLoggerName(RestServer.class.getCanonicalName());
- CustomRequestLog requestLog = new
CustomRequestLog(slf4jRequestLogWriter, CustomRequestLog.EXTENDED_NCSA_FORMAT +
" %{ms}T");
- requestLogHandler.setRequestLog(requestLog);
-
- contextHandlers.add(new DefaultHandler());
- contextHandlers.add(requestLogHandler);
-
handlers.setHandlers(contextHandlers.toArray(new Handler[0]));
try {
context.start();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 2aedb32cdf3..82f9e0395e3 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -16,10 +16,12 @@
*/
package org.apache.kafka.connect.runtime.rest;
+import kafka.utils.LogCaptureAppender;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpOptions;
@@ -44,8 +46,11 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.core.MediaType;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -60,10 +65,12 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
public class RestServerTest {
-
+
private Herder herder;
private Plugins plugins;
private RestServer server;
+ private CloseableHttpClient httpClient;
+ private Collection<CloseableHttpResponse> responses = new ArrayList<>();
protected static final String KAFKA_CLUSTER_ID = "Xbafgnagvar";
@@ -71,10 +78,17 @@ public class RestServerTest {
public void setUp() {
herder = mock(Herder.class);
plugins = mock(Plugins.class);
+ httpClient = HttpClients.createMinimal();
}
@After
- public void tearDown() {
+ public void tearDown() throws IOException {
+ for (CloseableHttpResponse response: responses) {
+ response.close();
+ }
+ if (httpClient != null) {
+ httpClient.close();
+ }
if (server != null) {
server.stop();
}
@@ -113,6 +127,7 @@ public class RestServerTest {
server = new RestServer(config);
Assert.assertEquals("http://localhost:8080/",
server.advertisedUrl().toString());
+ server.stop();
// Advertised URI from listeners with protocol
configMap = new HashMap<>(baseWorkerProps());
@@ -122,6 +137,7 @@ public class RestServerTest {
server = new RestServer(config);
Assert.assertEquals("https://localhost:8443/",
server.advertisedUrl().toString());
+ server.stop();
// Advertised URI from listeners with only SSL available
configMap = new HashMap<>(baseWorkerProps());
@@ -130,6 +146,7 @@ public class RestServerTest {
server = new RestServer(config);
Assert.assertEquals("https://localhost:8443/",
server.advertisedUrl().toString());
+ server.stop();
// Listener is overriden by advertised values
configMap = new HashMap<>(baseWorkerProps());
@@ -141,6 +158,7 @@ public class RestServerTest {
server = new RestServer(config);
Assert.assertEquals("http://somehost:10000/",
server.advertisedUrl().toString());
+ server.stop();
// correct listener is chosen when https listener is configured before
http listener and advertised listener is http
configMap = new HashMap<>(baseWorkerProps());
@@ -149,6 +167,7 @@ public class RestServerTest {
config = new DistributedConfig(configMap);
server = new RestServer(config);
Assert.assertEquals("http://plaintext-localhost:4761/",
server.advertisedUrl().toString());
+ server.stop();
}
@Test
@@ -166,12 +185,7 @@ public class RestServerTest {
HttpOptions request = new HttpOptions("/connectors");
request.addHeader("Content-Type", MediaType.WILDCARD);
- CloseableHttpClient httpClient = HttpClients.createMinimal();
- HttpHost httpHost = new HttpHost(
- server.advertisedUrl().getHost(),
- server.advertisedUrl().getPort()
- );
- CloseableHttpResponse response = httpClient.execute(httpHost, request);
+ HttpResponse response = executeRequest(server.advertisedUrl(),
request);
Assert.assertEquals(MediaType.TEXT_PLAIN,
response.getEntity().getContentType().getValue());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
response.getEntity().writeTo(baos);
@@ -196,15 +210,12 @@ public class RestServerTest {
server = new RestServer(workerConfig);
server.initializeServer();
server.initializeResources(herder);
+ URI serverUrl = server.advertisedUrl();
+
HttpRequest request = new HttpGet("/connectors");
request.addHeader("Referer", origin + "/page");
request.addHeader("Origin", origin);
- CloseableHttpClient httpClient = HttpClients.createMinimal();
- HttpHost httpHost = new HttpHost(
- server.advertisedUrl().getHost(),
- server.advertisedUrl().getPort()
- );
- CloseableHttpResponse response = httpClient.execute(httpHost, request);
+ HttpResponse response = executeRequest(serverUrl, request);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
@@ -217,7 +228,7 @@ public class RestServerTest {
request.addHeader("Referer", origin + "/page");
request.addHeader("Origin", origin);
request.addHeader("Access-Control-Request-Method", method);
- response = httpClient.execute(httpHost, request);
+ response = executeRequest(serverUrl, request);
Assert.assertEquals(404, response.getStatusLine().getStatusCode());
if (expectedHeader != null) {
Assert.assertEquals(expectedHeader,
@@ -244,9 +255,7 @@ public class RestServerTest {
server.initializeServer();
server.initializeResources(herder);
HttpRequest request = new HttpGet("/connectors");
- CloseableHttpClient httpClient = HttpClients.createMinimal();
- HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(),
server.advertisedUrl().getPort());
- CloseableHttpResponse response = httpClient.execute(httpHost, request);
+ HttpResponse response = executeRequest(server.advertisedUrl(),
request);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
}
@@ -269,12 +278,11 @@ public class RestServerTest {
ObjectMapper mapper = new ObjectMapper();
- String host = server.advertisedUrl().getHost();
- int port = server.advertisedUrl().getPort();
+ URI serverUrl = server.advertisedUrl();
- executePut(host, port, "/admin/loggers/a.b.c.s.W", "{\"level\":
\"INFO\"}");
+ executePut(serverUrl, "/admin/loggers/a.b.c.s.W", "{\"level\":
\"INFO\"}");
- String responseStr = executeGet(host, port, "/admin/loggers");
+ String responseStr = executeGet(serverUrl, "/admin/loggers");
Map<String, Map<String, ?>> loggers = mapper.readValue(responseStr,
new TypeReference<Map<String, Map<String, ?>>>() {
});
assertNotNull("expected non null response for /admin/loggers" +
prettyPrint(loggers), loggers);
@@ -305,12 +313,10 @@ public class RestServerTest {
assertNotEquals(server.advertisedUrl(), server.adminUrl());
- executeGet(server.adminUrl().getHost(), server.adminUrl().getPort(),
"/admin/loggers");
+ executeGet(server.adminUrl(), "/admin/loggers");
HttpRequest request = new HttpGet("/admin/loggers");
- CloseableHttpClient httpClient = HttpClients.createMinimal();
- HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(),
server.advertisedUrl().getPort());
- CloseableHttpResponse response = httpClient.execute(httpHost, request);
+ HttpResponse response = executeRequest(server.advertisedUrl(),
request);
Assert.assertEquals(404, response.getStatusLine().getStatusCode());
}
@@ -332,12 +338,37 @@ public class RestServerTest {
assertNull(server.adminUrl());
HttpRequest request = new HttpGet("/admin/loggers");
- CloseableHttpClient httpClient = HttpClients.createMinimal();
- HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(),
server.advertisedUrl().getPort());
- CloseableHttpResponse response = httpClient.execute(httpHost, request);
+ HttpResponse response = executeRequest(server.advertisedUrl(),
request);
Assert.assertEquals(404, response.getStatusLine().getStatusCode());
}
+ @Test
+ public void testRequestLogs() throws IOException, InterruptedException {
+ Map<String, String> configMap = new HashMap<>(baseWorkerProps());
+ DistributedConfig workerConfig = new DistributedConfig(configMap);
+
+ doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
+ doReturn(plugins).when(herder).plugins();
+
doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(),
workerConfig, ConnectRestExtension.class);
+
+ server = new RestServer(workerConfig);
+ server.initializeServer();
+ server.initializeResources(herder);
+
+ LogCaptureAppender restServerAppender =
LogCaptureAppender.createAndRegister();
+ HttpRequest request = new HttpGet("/");
+ HttpResponse response = executeRequest(server.advertisedUrl(),
request);
+
+ // Stop the server to flush all logs
+ server.stop();
+
+ Collection<String> logMessages =
restServerAppender.getRenderedMessages();
+ LogCaptureAppender.unregister(restServerAppender);
+ restServerAppender.close();
+ String expectedlogContent = "\"GET / HTTP/1.1\" " +
String.valueOf(response.getStatusLine().getStatusCode());
+ assertTrue(logMessages.stream().anyMatch(logMessage ->
logMessage.contains(expectedlogContent)));
+ }
+
@Test
public void testValidCustomizedHttpResponseHeaders() throws IOException {
String headerConfig =
@@ -368,51 +399,45 @@ public class RestServerTest {
doReturn(Arrays.asList("a", "b")).when(herder).connectors();
server = new RestServer(workerConfig);
- try {
- server.initializeServer();
- server.initializeResources(herder);
- HttpRequest request = new HttpGet("/connectors");
- try (CloseableHttpClient httpClient = HttpClients.createMinimal())
{
- HttpHost httpHost = new
HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort());
- try (CloseableHttpResponse response =
httpClient.execute(httpHost, request)) {
- Assert.assertEquals(200,
response.getStatusLine().getStatusCode());
- if (!headerConfig.isEmpty()) {
- expectedHeaders.forEach((k, v) ->
-
Assert.assertEquals(response.getFirstHeader(k).getValue(), v));
- } else {
-
Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
- }
- }
- }
- } finally {
- server.stop();
- server = null;
+ server.initializeServer();
+ server.initializeResources(herder);
+ HttpRequest request = new HttpGet("/connectors");
+ HttpResponse response = executeRequest(server.advertisedUrl(),
request);
+ Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+ if (!headerConfig.isEmpty()) {
+ expectedHeaders.forEach((k, v) ->
+ Assert.assertEquals(response.getFirstHeader(k).getValue(),
v));
+ } else {
+ Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
}
}
- private String executeGet(String host, int port, String endpoint) throws
IOException {
+ private String executeGet(URI serverUrl, String endpoint) throws
IOException {
HttpRequest request = new HttpGet(endpoint);
- CloseableHttpClient httpClient = HttpClients.createMinimal();
- HttpHost httpHost = new HttpHost(host, port);
- CloseableHttpResponse response = httpClient.execute(httpHost, request);
+ HttpResponse response = executeRequest(serverUrl, request);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
return new BasicResponseHandler().handleResponse(response);
}
- private String executePut(String host, int port, String endpoint, String
jsonBody) throws IOException {
+ private String executePut(URI serverUrl, String endpoint, String jsonBody)
throws IOException {
HttpPut request = new HttpPut(endpoint);
StringEntity entity = new StringEntity(jsonBody,
StandardCharsets.UTF_8.name());
entity.setContentType("application/json");
request.setEntity(entity);
- CloseableHttpClient httpClient = HttpClients.createMinimal();
- HttpHost httpHost = new HttpHost(host, port);
- CloseableHttpResponse response = httpClient.execute(httpHost, request);
+ HttpResponse response = executeRequest(serverUrl, request);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
return new BasicResponseHandler().handleResponse(response);
}
+ private HttpResponse executeRequest(URI serverUrl, HttpRequest request)
throws IOException {
+ HttpHost httpHost = new HttpHost(serverUrl.getHost(),
serverUrl.getPort());
+ CloseableHttpResponse response = httpClient.execute(httpHost, request);
+ responses.add(response);
+ return response;
+ }
+
private static String prettyPrint(Map<String, ?> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(map);
diff --git a/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
index 2d071452829..bec77356e1e 100644
--- a/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
+++ b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
@@ -20,6 +20,7 @@ package kafka.utils
import org.apache.log4j.{AppenderSkeleton, Level, Logger}
import org.apache.log4j.spi.LoggingEvent
+import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
class LogCaptureAppender extends AppenderSkeleton {
@@ -37,6 +38,11 @@ class LogCaptureAppender extends AppenderSkeleton {
}
}
+ def getRenderedMessages: java.util.List[String] = {
+ return getMessages.map(e => e.getRenderedMessage).asJava
+ }
+
+
override def close(): Unit = {
events.synchronized {
events.clear()