Repository: kafka Updated Branches: refs/heads/trunk 655367971 -> eb823281a
KAFKA-3424: Add CORS support to Connect REST API Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Gwen Shapira Closes #1099 from ewencp/cors-rest-support Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb823281 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb823281 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb823281 Branch: refs/heads/trunk Commit: eb823281a52f3b27c3a889e7412bc07b3024e688 Parents: 6553679 Author: Ewen Cheslack-Postava <[email protected]> Authored: Sat Mar 19 18:39:52 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Sat Mar 19 18:39:52 2016 -0700 ---------------------------------------------------------------------- build.gradle | 1 + .../kafka/connect/runtime/WorkerConfig.java | 14 +- .../kafka/connect/runtime/rest/RestServer.java | 12 ++ .../connect/runtime/rest/RestServerTest.java | 150 +++++++++++++++++++ gradle/dependencies.gradle | 1 + 5 files changed, 177 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 81e4af5..c29ad5a 100644 --- a/build.gradle +++ b/build.gradle @@ -749,6 +749,7 @@ project(':connect:runtime') { compile libs.jerseyContainerServlet compile libs.jettyServer compile libs.jettyServlet + compile libs.jettyServlets compile libs.reflections testCompile project(':clients').sourceSets.test.output http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 4ecacbb..471e4a5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -101,6 +101,15 @@ public class WorkerConfig extends AbstractConfig { private static final String REST_ADVERTISED_PORT_DOC = "If this is set, this is the port that will be given out to other workers to connect to."; + public static final String ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG = "access.control.allow.origin"; + protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DOC = + "Value to set the Access-Control-Allow-Origin header to for REST API requests." + + "To enable cross origin access, set this to the domain of the application that should be permitted" + + " to access the API, or '*' to allow access from any domain. The default value only allows access" + + " from the domain of the REST API."; + protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT = ""; + + /** * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to * bootstrap their own ConfigDef. @@ -129,7 +138,10 @@ public class WorkerConfig extends AbstractConfig { .define(REST_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_HOST_NAME_DOC) .define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, Importance.LOW, REST_PORT_DOC) .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC) - .define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC); + .define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC) + .define(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, Type.STRING, + ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT, Importance.LOW, + ACCESS_CONTROL_ALLOW_ORIGIN_DOC); } public WorkerConfig(ConfigDef definition, Map<String, String> props) { http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index 7e4279a..1505a01 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -39,8 +39,10 @@ import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.server.handler.RequestLogHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; +import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.CrossOriginFilter; import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.servlet.ServletContainer; import org.slf4j.Logger; @@ -52,9 +54,11 @@ import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URI; import java.net.URL; +import java.util.EnumSet; import java.util.List; import java.util.Map; +import javax.servlet.DispatcherType; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; @@ -109,6 +113,14 @@ public class RestServer { context.setContextPath("/"); context.addServlet(servletHolder, "/*"); + String allowedOrigins = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG); + if (allowedOrigins != null && !allowedOrigins.trim().isEmpty()) { + FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter()); + filterHolder.setName("cross-origin"); + filterHolder.setInitParameter("allowedOrigins", allowedOrigins); + context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST)); + } + RequestLogHandler requestLogHandler = new RequestLogHandler(); Slf4jRequestLog requestLog = new Slf4jRequestLog(); requestLog.setLoggerName(RestServer.class.getCanonicalName()); http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java new file mode 100644 index 0000000..8e9d52b --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest; + +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.util.Callback; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.MockStrict; +import org.powermock.modules.junit4.PowerMockRunner; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +@RunWith(PowerMockRunner.class) +public class RestServerTest { + + @MockStrict + private Herder herder; + private RestServer server; + + @After + public void tearDown() { + server.stop(); + } + + private Map<String, String> baseWorkerProps() { + Map<String, String> workerProps = new HashMap<>(); + workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter.schemas.enable", "false"); + workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); + return workerProps; + } + + @Test + public void testCORSEnabled() { + checkCORSRequest("*", "http://bar.com", "http://bar.com"); + } + + @Test + public void testCORSDisabled() { + checkCORSRequest("", "http://bar.com", null); + } + + public void checkCORSRequest(String corsDomain, String origin, String expectedHeader) { + // To be able to set the Origin, we need to toggle this flag + System.setProperty("sun.net.http.allowRestrictedHeaders", "true"); + + final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture(); + herder.connectors(EasyMock.capture(connectorsCallback)); + PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + connectorsCallback.getValue().onCompletion(null, Arrays.asList("a", "b")); + return null; + } + }); + PowerMock.replayAll(); + + Map<String, String> workerProps = baseWorkerProps(); + workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain); + WorkerConfig workerConfig = new StandaloneConfig(workerProps); + server = new RestServer(workerConfig); + server.start(herder); + + Response response = request("/connectors") + .header("Referer", origin + "/page") + .header("Origin", origin) + .get(); + assertEquals(200, response.getStatus()); + + assertEquals(expectedHeader, response.getHeaderString("Access-Control-Allow-Origin")); + PowerMock.verifyAll(); + } + + protected Invocation.Builder request(String path) { + return request(path, null, null, null); + } + + protected Invocation.Builder request(String path, Map<String, String> queryParams) { + return request(path, null, null, queryParams); + } + + protected Invocation.Builder request(String path, String templateName, Object templateValue) { + return request(path, templateName, templateValue, null); + } + + protected Invocation.Builder request(String path, String templateName, Object templateValue, + Map<String, String> queryParams) { + Client client = ClientBuilder.newClient(); + WebTarget target; + URI pathUri = null; + try { + pathUri = new URI(path); + } catch (URISyntaxException e) { + // Ignore, use restConnect and assume this is a valid path part + } + if (pathUri != null && pathUri.isAbsolute()) { + target = client.target(path); + } else { + target = client.target(server.advertisedUrl()).path(path); + } + if (templateName != null && templateValue != null) { + target = target.resolveTemplate(templateName, templateValue); + } + if (queryParams != null) { + for (Map.Entry<String, String> queryParam : queryParams.entrySet()) { + target = target.queryParam(queryParam.getKey(), queryParam.getValue()); + } + } + return target.request(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/gradle/dependencies.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index aa1d3f9..47158d6 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -72,6 +72,7 @@ libs += [ jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson", jettyServer: "org.eclipse.jetty:jetty-server:$versions.jetty", jettyServlet: "org.eclipse.jetty:jetty-servlet:$versions.jetty", + jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty", jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey", junit: "junit:junit:$versions.junit", joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
