This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2988eac KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect. (#8620) 2988eac is described below commit 2988eac0822022578bd2c6c5626bfbda3e8c73a6 Author: Jeff Huang <47870461+jeffhuan...@users.noreply.github.com> AuthorDate: Sun May 24 06:56:27 2020 -0700 KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect. (#8620) Added support for customizing the HTTP response headers for Kafka Connect as described in KIP-577. Author: Jeff Huang <jeff.hu...@confluent.io> Reviewer: Randall Hauch <rha...@gmail.com> --- checkstyle/import-control.xml | 1 + checkstyle/suppressions.xml | 3 + .../apache/kafka/connect/runtime/WorkerConfig.java | 75 +++++++++++++++++++++- .../kafka/connect/runtime/rest/RestServer.java | 16 +++++ .../kafka/connect/runtime/WorkerConfigTest.java | 48 ++++++++++++++ .../kafka/connect/runtime/rest/RestServerTest.java | 62 +++++++++++++++++- 6 files changed, 202 insertions(+), 3 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 3fd5ea7..7119265 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -362,6 +362,7 @@ <allow pkg="org.reflections"/> <allow pkg="org.reflections.util"/> <allow pkg="javax.crypto"/> + <allow pkg="org.eclipse.jetty.util" /> <subpackage name="rest"> <allow pkg="org.eclipse.jetty" /> diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index f86cc5f..74e6961 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -144,6 +144,9 @@ <suppress checks="MethodLength" files="Values.java"/> + <suppress checks="NPathComplexity" + files="RestServer.java"/> + <!-- connect tests--> <suppress checks="ClassDataAbstractionCoupling" files="(DistributedHerder|KafkaBasedLog)Test.java"/> diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 347e250..7a4f04e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -34,11 +34,14 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import org.eclipse.jetty.util.StringUtil; + import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; @@ -49,6 +52,9 @@ public class WorkerConfig extends AbstractConfig { private static final Logger log = LoggerFactory.getLogger(WorkerConfig.class); private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*"); + private static final Collection<String> HEADER_ACTIONS = Collections.unmodifiableList( + Arrays.asList("set", "add", "setDate", "addDate") + ); public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; public static final String BOOTSTRAP_SERVERS_DOC @@ -244,6 +250,10 @@ public class WorkerConfig extends AbstractConfig { + "user requests to reset the set of active topics per connector."; protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true; + public static final String RESPONSE_HTTP_HEADERS_CONFIG = "response.http.headers.config"; + public static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers"; + public static final String RESPONSE_HTTP_HEADERS_DEFAULT = ""; + /** * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to * bootstrap their own ConfigDef. @@ -324,7 +334,9 @@ public class WorkerConfig extends AbstractConfig { .define(TOPIC_TRACKING_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ENABLE_DEFAULT, Importance.LOW, TOPIC_TRACKING_ENABLE_DOC) .define(TOPIC_TRACKING_ALLOW_RESET_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ALLOW_RESET_DEFAULT, - Importance.LOW, TOPIC_TRACKING_ALLOW_RESET_DOC); + Importance.LOW, TOPIC_TRACKING_ALLOW_RESET_DOC) + .define(RESPONSE_HTTP_HEADERS_CONFIG, Type.STRING, RESPONSE_HTTP_HEADERS_DEFAULT, + new ResponseHttpHeadersValidator(), Importance.LOW, RESPONSE_HTTP_HEADERS_DOC); } private void logInternalConverterDeprecationWarnings(Map<String, String> props) { @@ -400,6 +412,48 @@ public class WorkerConfig extends AbstractConfig { logInternalConverterDeprecationWarnings(props); } + // Visible for testing + static void validateHttpResponseHeaderConfig(String config) { + try { + // validate format + String[] configTokens = config.trim().split("\\s+", 2); + if (configTokens.length != 2) { + throw new ConfigException(String.format("Invalid format of header config '%s\'. " + + "Expected: '[ation] [header name]:[header value]'", config)); + } + + // validate action + String method = configTokens[0].trim(); + validateHeaderConfigAction(method); + + // validate header name and header value pair + String header = configTokens[1]; + String[] headerTokens = header.trim().split(":"); + if (headerTokens.length != 2) { + throw new ConfigException( + String.format("Invalid format of header name and header value pair '%s'. " + + "Expected: '[header name]:[header value]'", header)); + } + + // validate header name + String headerName = headerTokens[0].trim(); + if (headerName.isEmpty() || headerName.matches(".*\\s+.*")) { + throw new ConfigException(String.format("Invalid header name '%s'. " + + "The '[header name]' cannot contain whitespace", headerName)); + } + } catch (ArrayIndexOutOfBoundsException e) { + throw new ConfigException(String.format("Invalid header config '%s'.", config), e); + } + } + + // Visible for testing + static void validateHeaderConfigAction(String action) { + if (!HEADER_ACTIONS.stream().anyMatch(action::equalsIgnoreCase)) { + throw new ConfigException(String.format("Invalid header config action: '%s'. " + + "Expected one of %s", action, HEADER_ACTIONS)); + } + } + private static class AdminListenersValidator implements ConfigDef.Validator { @Override public void ensureValid(String name, Object value) { @@ -427,4 +481,23 @@ public class WorkerConfig extends AbstractConfig { } } + private static class ResponseHttpHeadersValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + String strValue = (String) value; + if (strValue == null || strValue.trim().isEmpty()) { + return; + } + + String[] configs = StringUtil.csvSplit(strValue); // handles and removed surrounding quotes + Arrays.stream(configs).forEach(WorkerConfig::validateHttpResponseHeaderConfig); + } + + @Override + public String toString() { + return "Comma-separated header rules, where each header rule is of the form " + + "'[action] [header name]:[header value]' and optionally surrounded by double quotes " + + "if any part of a header rule contains a comma"; + } + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index 02b4677..408f72e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -46,6 +46,7 @@ import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlets.CrossOriginFilter; +import org.eclipse.jetty.servlets.HeaderFilter; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.server.ServerProperties; @@ -285,6 +286,11 @@ public class RestServer { context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST)); } + String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG); + if (headerConfig != null && !headerConfig.trim().isEmpty()) { + configureHttpResponsHeaderFilter(context); + } + RequestLogHandler requestLogHandler = new RequestLogHandler(); Slf4jRequestLogWriter slf4jRequestLogWriter = new Slf4jRequestLogWriter(); slf4jRequestLogWriter.setLoggerName(RestServer.class.getCanonicalName()); @@ -472,4 +478,14 @@ public class RestServer { return base + path; } + /** + * Register header filter to ServletContextHandler. + * @param context The serverlet context handler + */ + protected void configureHttpResponsHeaderFilter(ServletContextHandler context) { + String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG); + FilterHolder headerFilterHolder = new FilterHolder(HeaderFilter.class); + headerFilterHolder.setInitParameter("headerConfig", headerConfig); + context.addFilter(headerFilterHolder, "/*", EnumSet.of(DispatcherType.REQUEST)); + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java index 33416b9..1eeb13e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java @@ -23,12 +23,38 @@ import org.junit.Test; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThrows; public class WorkerConfigTest { + private static final List<String> VALID_HEADER_CONFIGS = Arrays.asList( + "add \t Cache-Control: no-cache, no-store, must-revalidate", + "add \r X-XSS-Protection: 1; mode=block", + "\n add Strict-Transport-Security: max-age=31536000; includeSubDomains", + "AdD Strict-Transport-Security: \r max-age=31536000; includeSubDomains", + "AdD \t Strict-Transport-Security : \n max-age=31536000; includeSubDomains", + "add X-Content-Type-Options: \r nosniff", + "Set \t X-Frame-Options: \t Deny\n ", + "seT \t X-Cache-Info: \t not cacheable\n ", + "seTDate \t Expires: \r 31540000000", + "adDdate \n Last-Modified: \t 0" + ); + + private static final List<String> INVALID_HEADER_CONFIGS = Arrays.asList( + "set \t", + "badaction \t X-Frame-Options:DENY", + "set add X-XSS-Protection:1", + "addX-XSS-Protection", + "X-XSS-Protection:", + "add set X-XSS-Protection: 1", + "add X-XSS-Protection:1 X-XSS-Protection:1 ", + "add X-XSS-Protection", + "set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate " + ); @Test public void testAdminListenersConfigAllowedValues() { @@ -63,6 +89,28 @@ public class WorkerConfigTest { new WorkerConfig(WorkerConfig.baseConfigDef(), props); } + @Test + public void testInvalidHeaderConfigs() { + for (String config : INVALID_HEADER_CONFIGS) { + assertInvalidHeaderConfig(config); + } + } + + @Test + public void testValidHeaderConfigs() { + for (String config : VALID_HEADER_CONFIGS) { + assertValidHeaderConfig(config); + } + } + + private void assertInvalidHeaderConfig(String config) { + assertThrows(ConfigException.class, () -> WorkerConfig.validateHttpResponseHeaderConfig(config)); + } + + private void assertValidHeaderConfig(String config) { + WorkerConfig.validateHttpResponseHeaderConfig(config); + } + private Map<String, String> baseProps() { Map<String, String> props = new HashMap<>(); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java index 575c4da..0c81ddd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -65,7 +65,6 @@ import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) @PowerMockIgnore({"javax.net.ssl.*", "javax.security.*", "javax.crypto.*"}) public class RestServerTest { - @MockStrict private Herder herder; @MockStrict @@ -76,7 +75,9 @@ public class RestServerTest { @After public void tearDown() { - server.stop(); + if (server != null) { + server.stop(); + } } @SuppressWarnings("deprecation") @@ -400,6 +401,63 @@ public class RestServerTest { Assert.assertEquals(404, response.getStatusLine().getStatusCode()); } + @Test + public void testValidCustomizedHttpResponseHeaders() throws IOException { + String headerConfig = + "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\""; + Map<String, String> expectedHeaders = new HashMap<>(); + expectedHeaders.put("X-XSS-Protection", "1; mode=block"); + expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate"); + checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders); + } + + @Test + public void testDefaultCustomizedHttpResponseHeaders() throws IOException { + String headerConfig = ""; + Map<String, String> expectedHeaders = new HashMap<>(); + checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders); + } + + private void checkCustomizedHttpResponseHeaders(String headerConfig, Map<String, String> expectedHeaders) + throws IOException { + Map<String, String> workerProps = baseWorkerProps(); + workerProps.put("offset.storage.file.filename", "/tmp"); + workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig); + WorkerConfig workerConfig = new DistributedConfig(workerProps); + + EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID); + EasyMock.expect(herder.plugins()).andStubReturn(plugins); + EasyMock.expect(plugins.newPlugins(Collections.emptyList(), + workerConfig, + ConnectRestExtension.class)).andStubReturn(Collections.emptyList()); + + EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b")); + + PowerMock.replayAll(); + + server = new RestServer(workerConfig); + try { + server.initializeServer(); + server.initializeResources(herder); + HttpRequest request = new HttpGet("/connectors"); + try (CloseableHttpClient httpClient = HttpClients.createMinimal()) { + HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort()); + try (CloseableHttpResponse response = httpClient.execute(httpHost, request)) { + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + if (!headerConfig.isEmpty()) { + expectedHeaders.forEach((k, v) -> + Assert.assertEquals(response.getFirstHeader(k).getValue(), v)); + } else { + Assert.assertNull(response.getFirstHeader("X-Frame-Options")); + } + } + } + } finally { + server.stop(); + server = null; + } + } + private String executeGet(String host, int port, String endpoint) throws IOException { HttpRequest request = new HttpGet(endpoint); CloseableHttpClient httpClient = HttpClients.createMinimal();