This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3431be2 KAFKA-4029: SSL support for Connect REST API (KIP-208)
3431be2 is described below
commit 3431be2aeb4f16599c419f89360856b72d0d335d
Author: Jakub Scholz <[email protected]>
AuthorDate: Tue Jan 30 15:09:40 2018 -0800
KAFKA-4029: SSL support for Connect REST API (KIP-208)
This PR implements the JIRA issue [KAFKA-4029: SSL support for Connect REST
API](https://issues.apache.org/jira/browse/KAFKA-4029) /
[KIP-208](https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface).
Summary of the main changes:
- Jetty `HttpClient` is used as HTTP client instead of the one shipped with
Java. That allows to keep the SSL configuration for Server and Client be in
single place (both use the Jetty `SslContextFactory`). It also has much richer
configuration than the JDK client (it is easier to configure things such as
supported cipher suites etc.).
- The `RestServer` class has been broker into 3 parts. `RestServer`
contains the server it self. `RestClient` contains the HTTP client used for
forwarding requests etc. and `SSLUtils` contain some helper classes for
configuring SSL. One of the reasons for this was Findbugs complaining about the
class complexity.
- A new method `valuesWithPrefixAllOrNothing` has been added to
`AbstractConfig` to make it easier to handle the situation that we want to use
either only the prefixed SSL options or only the non-prefixed. But not mixed
them.
Author: Jakub Scholz <[email protected]>
Reviewers: Ewen Cheslack-Postava <[email protected]>
Closes #4429 from scholzj/kip-208
---
build.gradle | 1 +
.../apache/kafka/common/config/AbstractConfig.java | 25 +++
.../kafka/common/config/AbstractConfigTest.java | 28 +++
.../apache/kafka/connect/runtime/WorkerConfig.java | 25 ++-
.../runtime/distributed/DistributedHerder.java | 7 +-
.../kafka/connect/runtime/rest/RestClient.java | 158 +++++++++++++++
.../kafka/connect/runtime/rest/RestServer.java | 224 +++++++++++----------
.../runtime/rest/resources/ConnectorsResource.java | 15 +-
.../kafka/connect/runtime/rest/util/SSLUtils.java | 152 ++++++++++++++
.../kafka/connect/runtime/rest/RestServerTest.java | 100 +++++++--
.../resources/ConnectorPluginsResourceTest.java | 9 +-
.../rest/resources/ConnectorsResourceTest.java | 49 ++---
.../connect/runtime/rest/util/SSLUtilsTest.java | 126 ++++++++++++
docs/connect.html | 38 +++-
gradle/dependencies.gradle | 1 +
15 files changed, 798 insertions(+), 160 deletions(-)
diff --git a/build.gradle b/build.gradle
index aedd943..430d989 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1201,6 +1201,7 @@ project(':connect:runtime') {
compile libs.jettyServer
compile libs.jettyServlet
compile libs.jettyServlets
+ compile libs.jettyClient
compile(libs.reflections)
compile(libs.mavenArtifact)
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 427c492..ce3ae43 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -218,6 +218,31 @@ public class AbstractConfig {
return result;
}
+ /**
+ * If at least one key with {@code prefix} exists, all prefixed values
will be parsed and put into map.
+ * If no value with {@code prefix} exists all unprefixed values will be
returned.
+ *
+ * This is useful if one wants to allow prefixed configs to override
default ones, but wants to use either
+ * only prefixed configs or only regular configs, but not mix them.
+ */
+ public Map<String, Object> valuesWithPrefixAllOrNothing(String prefix) {
+ Map<String, Object> withPrefix = originalsWithPrefix(prefix, true);
+
+ if (withPrefix.isEmpty()) {
+ return new RecordingMap<>(values(), "", true);
+ } else {
+ Map<String, Object> result = new RecordingMap<>(prefix, true);
+
+ for (Map.Entry<String, ?> entry : withPrefix.entrySet()) {
+ ConfigDef.ConfigKey configKey =
definition.configKeys().get(entry.getKey());
+ if (configKey != null)
+ result.put(entry.getKey(),
definition.parseValue(configKey, entry.getValue(), true));
+ }
+
+ return result;
+ }
+ }
+
public Map<String, ?> values() {
return new RecordingMap<>(values);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 2e15715..074df10 100644
---
a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -140,6 +140,34 @@ public class AbstractConfigTest {
}
@Test
+ public void testValuesWithPrefixAllOrNothing() {
+ String prefix1 = "prefix1.";
+ String prefix2 = "prefix2.";
+ Properties props = new Properties();
+ props.put("sasl.mechanism", "PLAIN");
+ props.put("prefix1.sasl.mechanism", "GSSAPI");
+ props.put("prefix1.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2");
+ props.put("prefix1.ssl.truststore.location", "my location");
+ props.put("sasl.kerberos.service.name", "service name");
+ props.put("ssl.keymanager.algorithm", "algorithm");
+ TestSecurityConfig config = new TestSecurityConfig(props);
+ Map<String, Object> valuesWithPrefixAllOrNothing1 =
config.valuesWithPrefixAllOrNothing(prefix1);
+
+ // All prefixed values are there
+ assertEquals("GSSAPI",
valuesWithPrefixAllOrNothing1.get("sasl.mechanism"));
+ assertEquals("/usr/bin/kinit2",
valuesWithPrefixAllOrNothing1.get("sasl.kerberos.kinit.cmd"));
+ assertEquals("my location",
valuesWithPrefixAllOrNothing1.get("ssl.truststore.location"));
+
+ // Non-prefixed values are missing
+
assertFalse(valuesWithPrefixAllOrNothing1.containsKey("sasl.kerberos.service.name"));
+
assertFalse(valuesWithPrefixAllOrNothing1.containsKey("ssl.keymanager.algorithm"));
+
+ Map<String, Object> valuesWithPrefixAllOrNothing2 =
config.valuesWithPrefixAllOrNothing(prefix2);
+
assertTrue(valuesWithPrefixAllOrNothing2.containsKey("sasl.kerberos.service.name"));
+
assertTrue(valuesWithPrefixAllOrNothing2.containsKey("ssl.keymanager.algorithm"));
+ }
+
+ @Test
public void testUnused() {
Properties props = new Properties();
String configValue =
"org.apache.kafka.common.config.AbstractConfigTest$ConfiguredFakeMetricsReporter";
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index dfae761..3b99c6b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Sensor;
import java.util.ArrayList;
@@ -99,15 +100,30 @@ public class WorkerConfig extends AbstractConfig {
+ "data to be committed in a future attempt.";
public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
+ /**
+ * @deprecated As of 1.1.0.
+ */
+ @Deprecated
public static final String REST_HOST_NAME_CONFIG = "rest.host.name";
private static final String REST_HOST_NAME_DOC
= "Hostname for the REST API. If this is set, it will only bind to
this interface.";
+ /**
+ * @deprecated As of 1.1.0.
+ */
+ @Deprecated
public static final String REST_PORT_CONFIG = "rest.port";
private static final String REST_PORT_DOC
= "Port for the REST API to listen on.";
public static final int REST_PORT_DEFAULT = 8083;
+ public static final String LISTENERS_CONFIG = "listeners";
+ private static final String LISTENERS_DOC
+ = "List of comma-separated URIs the REST API will listen on. The
supported protocols are HTTP and HTTPS.\n" +
+ " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
+ " Leave hostname empty to bind to default interface.\n" +
+ " Examples of legal listener lists:
HTTP://myhost:8083,HTTPS://myhost:8084";
+
public static final String REST_ADVERTISED_HOST_NAME_CONFIG =
"rest.advertised.host.name";
private static final String REST_ADVERTISED_HOST_NAME_DOC
= "If this is set, this is the hostname that will be given out to
other workers to connect to.";
@@ -116,6 +132,10 @@ public class WorkerConfig extends AbstractConfig {
private static final String REST_ADVERTISED_PORT_DOC
= "If this is set, this is the port that will be given out to
other workers to connect to.";
+ public static final String REST_ADVERTISED_LISTENER_CONFIG =
"rest.advertised.listener";
+ private static final String REST_ADVERTISED_LISTENER_DOC
+ = "Sets the advertised listener (HTTP or HTTPS) which will be
given to other workers to use.";
+
public static final String ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG =
"access.control.allow.origin";
protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DOC =
"Value to set the Access-Control-Allow-Origin header to for REST
API requests." +
@@ -173,8 +193,10 @@ public class WorkerConfig extends AbstractConfig {
Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC)
.define(REST_HOST_NAME_CONFIG, Type.STRING, null,
Importance.LOW, REST_HOST_NAME_DOC)
.define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT,
Importance.LOW, REST_PORT_DOC)
+ .define(LISTENERS_CONFIG, Type.LIST, null, Importance.LOW,
LISTENERS_DOC)
.define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING, null,
Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
.define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null,
Importance.LOW, REST_ADVERTISED_PORT_DOC)
+ .define(REST_ADVERTISED_LISTENER_CONFIG, Type.STRING, null,
Importance.LOW, REST_ADVERTISED_LISTENER_DOC)
.define(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, Type.STRING,
ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT, Importance.LOW,
ACCESS_CONTROL_ALLOW_ORIGIN_DOC)
@@ -199,7 +221,8 @@ public class WorkerConfig extends AbstractConfig {
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST,
"", Importance.LOW,
- CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC);
+ CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+ .define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
ConfigDef.Type.STRING, "none", ConfigDef.Importance.LOW,
BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC);
}
@Override
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index c39ee67..c6e6a47 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -42,6 +42,7 @@ import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@@ -146,6 +147,8 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
private boolean needsReconfigRebalance;
private volatile int generation;
+ private final DistributedConfig config;
+
public DistributedHerder(DistributedConfig config,
Time time,
Worker worker,
@@ -186,6 +189,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
});
this.forwardRequestExecutor = Executors.newSingleThreadExecutor();
this.startAndStopExecutor =
Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE);
+ this.config = config;
stopping = new AtomicBoolean(false);
configState = ClusterConfigState.EMPTY;
@@ -710,7 +714,6 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
return generation;
}
-
// Should only be called from work thread, so synchronization should not
be needed
private boolean isLeader() {
return assignment != null &&
member.memberId().equals(assignment.leader());
@@ -1011,7 +1014,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
public void run() {
try {
String reconfigUrl =
RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks");
- RestServer.httpRequest(reconfigUrl, "POST",
taskProps, null);
+ RestClient.httpRequest(reconfigUrl, "POST",
taskProps, null, config);
cb.onCompletion(null, null);
} catch (ConnectException e) {
log.error("Request to leader to reconfigure
connector tasks failed", e);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
new file mode 100644
index 0000000..d500ad2
--- /dev/null
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.util.StringContentProvider;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+public class RestClient {
+ private static final Logger log =
LoggerFactory.getLogger(RestClient.class);
+ private static final ObjectMapper JSON_SERDE = new ObjectMapper();
+
+ /**
+ * Sends HTTP request to remote REST server
+ *
+ * @param url HTTP connection will be established with this
url.
+ * @param method HTTP method ("GET", "POST", "PUT", etc.)
+ * @param requestBodyData Object to serialize as JSON and send in the
request body.
+ * @param responseFormat Expected format of the response to the HTTP
request.
+ * @param <T> The type of the deserialized response to the
HTTP request.
+ * @return The deserialized response to the HTTP request, or null if no
data is expected.
+ */
+ public static <T> HttpResponse<T> httpRequest(String url, String method,
Object requestBodyData,
+ TypeReference<T>
responseFormat, WorkerConfig config) {
+ HttpClient client;
+
+ if (url.startsWith("https://")) {
+ client = new HttpClient(SSLUtils.createSslContextFactory(config,
true));
+ } else {
+ client = new HttpClient();
+ }
+
+ client.setFollowRedirects(false);
+
+ try {
+ client.start();
+ } catch (Exception e) {
+ log.error("Failed to start RestClient: ", e);
+ throw new
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Failed to start
RestClient: " + e.getMessage(), e);
+ }
+
+ try {
+ String serializedBody = requestBodyData == null ? null :
JSON_SERDE.writeValueAsString(requestBodyData);
+ log.trace("Sending {} with input {} to {}", method,
serializedBody, url);
+
+ Request req = client.newRequest(url);
+ req.method(method);
+ req.accept("application/json");
+ req.agent("kafka-connect");
+ req.content(new StringContentProvider(serializedBody,
StandardCharsets.UTF_8), "application/json");
+
+ ContentResponse res = req.send();
+
+ int responseCode = res.getStatus();
+ System.out.println(responseCode);
+ if (responseCode == HttpStatus.NO_CONTENT_204) {
+ return new HttpResponse<>(responseCode,
convertHttpFieldsToMap(res.getHeaders()), null);
+ } else if (responseCode >= 400) {
+ ErrorMessage errorMessage =
JSON_SERDE.readValue(res.getContentAsString(), ErrorMessage.class);
+ throw new ConnectRestException(responseCode,
errorMessage.errorCode(), errorMessage.message());
+ } else if (responseCode >= 200 && responseCode < 300) {
+ T result = JSON_SERDE.readValue(res.getContentAsString(),
responseFormat);
+ return new HttpResponse<>(responseCode,
convertHttpFieldsToMap(res.getHeaders()), result);
+ } else {
+ throw new
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR,
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+ "Unexpected status code when handling forwarded
request: " + responseCode);
+ }
+ } catch (IOException | InterruptedException | TimeoutException |
ExecutionException e) {
+ log.error("IO error forwarding REST request: ", e);
+ throw new
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to
forward REST request: " + e.getMessage(), e);
+ } finally {
+ if (client != null)
+ try {
+ client.stop();
+ } catch (Exception e) {
+ log.error("Failed to stop HTTP client", e);
+ }
+ }
+ }
+
+ /**
+ * Convert response parameters from Jetty format (HttpFields)
+ * @param httpFields
+ * @return
+ */
+ private static Map<String, String> convertHttpFieldsToMap(HttpFields
httpFields) {
+ Map<String, String> headers = new HashMap<String, String>();
+
+ if (httpFields == null || httpFields.size() == 0)
+ return headers;
+
+ for (HttpField field : httpFields) {
+ headers.put(field.getName(), field.getValue());
+ }
+
+ return headers;
+ }
+
+ public static class HttpResponse<T> {
+ private int status;
+ private Map<String, String> headers;
+ private T body;
+
+ public HttpResponse(int status, Map<String, String> headers, T body) {
+ this.status = status;
+ this.headers = headers;
+ this.body = body;
+ }
+
+ public int status() {
+ return status;
+ }
+
+ public Map<String, String> headers() {
+ return headers;
+ }
+
+ public T body() {
+ return body;
+ }
+ }
+}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 4a2f913..77f6cdf 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -16,19 +16,16 @@
*/
package org.apache.kafka.connect.runtime.rest;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
-import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.runtime.rest.resources.RootResource;
+import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
@@ -42,25 +39,22 @@ import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.CrossOriginFilter;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
+import javax.servlet.DispatcherType;
+import javax.ws.rs.core.UriBuilder;
import java.net.URI;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
-import java.util.Map;
-
-import javax.servlet.DispatcherType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
+import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* Embedded server for the REST API that provides the control plane for Kafka
Connect workers.
@@ -70,7 +64,8 @@ public class RestServer {
private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 60 * 1000;
- private static final ObjectMapper JSON_SERDE = new ObjectMapper();
+ private static final String PROTOCOL_HTTP = "http";
+ private static final String PROTOCOL_HTTPS = "https";
private final WorkerConfig config;
private Server jettyServer;
@@ -81,17 +76,79 @@ public class RestServer {
public RestServer(WorkerConfig config) {
this.config = config;
- // To make the advertised port available immediately, we need to do
some configuration here
- String hostname = config.getString(WorkerConfig.REST_HOST_NAME_CONFIG);
- Integer port = config.getInt(WorkerConfig.REST_PORT_CONFIG);
+ List<String> listeners = parseListeners();
jettyServer = new Server();
- ServerConnector connector = new ServerConnector(jettyServer);
- if (hostname != null && !hostname.isEmpty())
+ createConnectors(listeners);
+ }
+
+ List<String> parseListeners() {
+ List<String> listeners = config.getList(WorkerConfig.LISTENERS_CONFIG);
+ if (listeners == null || listeners.size() == 0) {
+ String hostname =
config.getString(WorkerConfig.REST_HOST_NAME_CONFIG);
+
+ if (hostname == null)
+ hostname = "";
+
+ listeners = Collections.singletonList(String.format("%s://%s:%d",
PROTOCOL_HTTP, hostname, config.getInt(WorkerConfig.REST_PORT_CONFIG)));
+ }
+
+ return listeners;
+ }
+
+ /**
+ * Adds Jetty connector for each configured listener
+ */
+ public void createConnectors(List<String> listeners) {
+ List<Connector> connectors = new ArrayList<>();
+
+ for (String listener : listeners) {
+ if (!listener.isEmpty()) {
+ Connector connector = createConnector(listener);
+ connectors.add(connector);
+ log.info("Added connector for " + listener);
+ }
+ }
+
+ jettyServer.setConnectors(connectors.toArray(new
Connector[connectors.size()]));
+ }
+
+ /**
+ * Creates Jetty connector according to configuration
+ */
+ public Connector createConnector(String listener) {
+ Pattern listenerPattern =
Pattern.compile("^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)");
+ Matcher listenerMatcher = listenerPattern.matcher(listener);
+
+ if (!listenerMatcher.matches())
+ throw new ConfigException("Listener doesn't have the right format
(protocol://hostname:port).");
+
+ String protocol = listenerMatcher.group(1).toLowerCase(Locale.ENGLISH);
+
+ if (!PROTOCOL_HTTP.equals(protocol) &&
!PROTOCOL_HTTPS.equals(protocol))
+ throw new ConfigException(String.format("Listener protocol must be
either \"%s\" or \"%s\".", PROTOCOL_HTTP, PROTOCOL_HTTPS));
+
+ String hostname = listenerMatcher.group(2);
+ int port = Integer.parseInt(listenerMatcher.group(3));
+
+ ServerConnector connector;
+
+ if (PROTOCOL_HTTPS.equals(protocol)) {
+ SslContextFactory ssl = SSLUtils.createSslContextFactory(config);
+ connector = new ServerConnector(jettyServer, ssl);
+ connector.setName(String.format("%s_%s%d", PROTOCOL_HTTPS,
hostname, port));
+ } else {
+ connector = new ServerConnector(jettyServer);
+ connector.setName(String.format("%s_%s%d", PROTOCOL_HTTP,
hostname, port));
+ }
+
+ if (!hostname.isEmpty())
connector.setHost(hostname);
+
connector.setPort(port);
- jettyServer.setConnectors(new Connector[]{connector});
+
+ return connector;
}
public void start(Herder herder) {
@@ -101,7 +158,7 @@ public class RestServer {
resourceConfig.register(new JacksonJsonProvider());
resourceConfig.register(new RootResource(herder));
- resourceConfig.register(new ConnectorsResource(herder));
+ resourceConfig.register(new ConnectorsResource(herder, config));
resourceConfig.register(new ConnectorPluginsResource(herder));
resourceConfig.register(ConnectExceptionMapper.class);
@@ -171,103 +228,56 @@ public class RestServer {
*/
public URI advertisedUrl() {
UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
+
+ String advertisedSecurityProtocol = determineAdvertisedProtocol();
+ ServerConnector serverConnector =
findConnector(advertisedSecurityProtocol);
+ builder.scheme(advertisedSecurityProtocol);
+
String advertisedHostname =
config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG);
if (advertisedHostname != null && !advertisedHostname.isEmpty())
builder.host(advertisedHostname);
+ else if (serverConnector != null && serverConnector.getHost() != null
&& serverConnector.getHost().length() > 0)
+ builder.host(serverConnector.getHost());
+
Integer advertisedPort =
config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
if (advertisedPort != null)
builder.port(advertisedPort);
- else
- builder.port(config.getInt(WorkerConfig.REST_PORT_CONFIG));
- return builder.build();
- }
-
- /**
- * @param url HTTP connection will be established with this
url.
- * @param method HTTP method ("GET", "POST", "PUT", etc.)
- * @param requestBodyData Object to serialize as JSON and send in the
request body.
- * @param responseFormat Expected format of the response to the HTTP
request.
- * @param <T> The type of the deserialized response to the
HTTP request.
- * @return The deserialized response to the HTTP request, or null if no
data is expected.
- */
- public static <T> HttpResponse<T> httpRequest(String url, String method,
Object requestBodyData,
- TypeReference<T> responseFormat) {
- HttpURLConnection connection = null;
- try {
- String serializedBody = requestBodyData == null ? null :
JSON_SERDE.writeValueAsString(requestBodyData);
- log.debug("Sending {} with input {} to {}", method,
serializedBody, url);
-
- connection = (HttpURLConnection) new URL(url).openConnection();
- connection.setRequestMethod(method);
+ else if (serverConnector != null)
+ builder.port(serverConnector.getPort());
- connection.setRequestProperty("User-Agent", "kafka-connect");
- connection.setRequestProperty("Accept", "application/json");
+ log.info("Advertised URI: {}", builder.build());
- // connection.getResponseCode() implicitly calls getInputStream,
so always set to true.
- // On the other hand, leaving this out breaks nothing.
- connection.setDoInput(true);
-
- connection.setUseCaches(false);
-
- if (requestBodyData != null) {
- connection.setRequestProperty("Content-Type",
"application/json");
- connection.setDoOutput(true);
-
- OutputStream os = connection.getOutputStream();
- os.write(serializedBody.getBytes(StandardCharsets.UTF_8));
- os.flush();
- os.close();
- }
-
- int responseCode = connection.getResponseCode();
- if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) {
- return new HttpResponse<>(responseCode,
connection.getHeaderFields(), null);
- } else if (responseCode >= 400) {
- InputStream es = connection.getErrorStream();
- ErrorMessage errorMessage = JSON_SERDE.readValue(es,
ErrorMessage.class);
- es.close();
- throw new ConnectRestException(responseCode,
errorMessage.errorCode(), errorMessage.message());
- } else if (responseCode >= 200 && responseCode < 300) {
- InputStream is = connection.getInputStream();
- T result = JSON_SERDE.readValue(is, responseFormat);
- is.close();
- return new HttpResponse<>(responseCode,
connection.getHeaderFields(), result);
- } else {
- throw new
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR,
- Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
- "Unexpected status code when handling forwarded
request: " + responseCode);
- }
- } catch (IOException e) {
- log.error("IO error forwarding REST request: ", e);
- throw new
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to
forward REST request: " + e.getMessage(), e);
- } finally {
- if (connection != null)
- connection.disconnect();
- }
+ return builder.build();
}
- public static class HttpResponse<T> {
- private int status;
- private Map<String, List<String>> headers;
- private T body;
-
- public HttpResponse(int status, Map<String, List<String>> headers, T
body) {
- this.status = status;
- this.headers = headers;
- this.body = body;
- }
-
- public int status() {
- return status;
+ String determineAdvertisedProtocol() {
+ String advertisedSecurityProtocol =
config.getString(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG);
+ if (advertisedSecurityProtocol == null) {
+ String listeners = (String)
config.originals().get(WorkerConfig.LISTENERS_CONFIG);
+
+ if (listeners == null)
+ return PROTOCOL_HTTP;
+ else
+ listeners = listeners.toLowerCase(Locale.ENGLISH);
+
+ if (listeners.contains(String.format("%s://", PROTOCOL_HTTP)))
+ return PROTOCOL_HTTP;
+ else if (listeners.contains(String.format("%s://",
PROTOCOL_HTTPS)))
+ return PROTOCOL_HTTPS;
+ else
+ return PROTOCOL_HTTP;
+ } else {
+ return advertisedSecurityProtocol.toLowerCase(Locale.ENGLISH);
}
+ }
- public Map<String, List<String>> headers() {
- return headers;
+ ServerConnector findConnector(String protocol) {
+ for (Connector connector : jettyServer.getConnectors()) {
+ if (connector.getName().startsWith(protocol))
+ return (ServerConnector) connector;
}
- public T body() {
- return body;
- }
+ return null;
}
public static String urlJoin(String base, String path) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 2c03124..7a01168 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -19,9 +19,10 @@ package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
-import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
@@ -67,11 +68,13 @@ public class ConnectorsResource {
private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
private final Herder herder;
+ private final WorkerConfig config;
@javax.ws.rs.core.Context
private ServletContext context;
- public ConnectorsResource(Herder herder) {
+ public ConnectorsResource(Herder herder, WorkerConfig config) {
this.herder = herder;
+ this.config = config;
}
@GET
@@ -257,7 +260,7 @@ public class ConnectorsResource {
.build()
.toString();
log.debug("Forwarding request {} {} {}", forwardUrl,
method, body);
- return
translator.translate(RestServer.httpRequest(forwardUrl, method, body,
resultType));
+ return
translator.translate(RestClient.httpRequest(forwardUrl, method, body,
resultType, config));
} else {
// we should find the right target for the query within
two hops, so if
// we don't, it probably means that a rebalance has taken
place.
@@ -290,19 +293,19 @@ public class ConnectorsResource {
}
private interface Translator<T, U> {
- T translate(RestServer.HttpResponse<U> response);
+ T translate(RestClient.HttpResponse<U> response);
}
private static class IdentityTranslator<T> implements Translator<T, T> {
@Override
- public T translate(RestServer.HttpResponse<T> response) {
+ public T translate(RestClient.HttpResponse<T> response) {
return response.body();
}
}
private static class CreatedConnectorInfoTranslator implements
Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> {
@Override
- public Herder.Created<ConnectorInfo>
translate(RestServer.HttpResponse<ConnectorInfo> response) {
+ public Herder.Created<ConnectorInfo>
translate(RestClient.HttpResponse<ConnectorInfo> response) {
boolean created = response.status() == 201;
return new Herder.Created<>(created, response.body());
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
new file mode 100644
index 0000000..51222e5
--- /dev/null
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.util;
+
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class for setting up SSL for RestServer and RestClient
+ */
+public class SSLUtils {
+ /**
+ * Configures SSL/TLS for HTTPS Jetty Server / Client
+ */
+ public static SslContextFactory createSslContextFactory(WorkerConfig
config) {
+ return createSslContextFactory(config, false);
+ }
+
+ /**
+ * Configures SSL/TLS for HTTPS Jetty Server / Client
+ */
+ public static SslContextFactory createSslContextFactory(WorkerConfig
config, boolean client) {
+ Map<String, Object> sslConfigValues =
config.valuesWithPrefixAllOrNothing("listeners.https.");
+
+ SslContextFactory ssl = new SslContextFactory();
+
+ configureSslContextFactoryKeyStore(ssl, sslConfigValues);
+ configureSslContextFactoryTrustStore(ssl, sslConfigValues);
+ configureSslContextFactoryAlgorithms(ssl, sslConfigValues);
+ configureSslContextFactoryAuthentication(ssl, sslConfigValues);
+
+ if (client)
+ configureSslContextFactoryEndpointIdentification(ssl,
sslConfigValues);
+
+ return ssl;
+ }
+
+ /**
+ * Configures KeyStore related settings in SslContextFactory
+ */
+ protected static void configureSslContextFactoryKeyStore(SslContextFactory
ssl, Map<String, Object> sslConfigValues) {
+ ssl.setKeyStoreType((String) getOrDefault(sslConfigValues,
SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE));
+
+ String sslKeystoreLocation = (String)
sslConfigValues.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
+ if (sslKeystoreLocation != null)
+ ssl.setKeyStorePath(sslKeystoreLocation);
+
+ Password sslKeystorePassword = (Password)
sslConfigValues.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
+ if (sslKeystorePassword != null)
+ ssl.setKeyStorePassword(sslKeystorePassword.value());
+
+ Password sslKeyPassword = (Password)
sslConfigValues.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
+ if (sslKeyPassword != null)
+ ssl.setKeyManagerPassword(sslKeyPassword.value());
+ }
+
+ protected static Object getOrDefault(Map<String, Object> configMap, String
key, Object defaultValue) {
+ if (configMap.containsKey(key))
+ return configMap.get(key);
+
+ return defaultValue;
+ }
+
+ /**
+ * Configures TrustStore related settings in SslContextFactory
+ */
+ protected static void
configureSslContextFactoryTrustStore(SslContextFactory ssl, Map<String, Object>
sslConfigValues) {
+ ssl.setTrustStoreType((String) getOrDefault(sslConfigValues,
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE));
+
+ String sslTruststoreLocation = (String)
sslConfigValues.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
+ if (sslTruststoreLocation != null)
+ ssl.setTrustStorePath(sslTruststoreLocation);
+
+ Password sslTruststorePassword = (Password)
sslConfigValues.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
+ if (sslTruststorePassword != null)
+ ssl.setTrustStorePassword(sslTruststorePassword.value());
+ }
+
+ /**
+ * Configures Protocol, Algorithm and Provider related settings in
SslContextFactory
+ */
+ protected static void
configureSslContextFactoryAlgorithms(SslContextFactory ssl, Map<String, Object>
sslConfigValues) {
+ List<String> sslEnabledProtocols = (List<String>)
getOrDefault(sslConfigValues, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split("\\s*,\\s*")));
+ ssl.setIncludeProtocols(sslEnabledProtocols.toArray(new
String[sslEnabledProtocols.size()]));
+
+ String sslProvider = (String)
sslConfigValues.get(SslConfigs.SSL_PROVIDER_CONFIG);
+ if (sslProvider != null)
+ ssl.setProvider(sslProvider);
+
+ ssl.setProtocol((String) getOrDefault(sslConfigValues,
SslConfigs.SSL_PROTOCOL_CONFIG, SslConfigs.DEFAULT_SSL_PROTOCOL));
+
+ List<String> sslCipherSuites = (List<String>)
sslConfigValues.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
+ if (sslCipherSuites != null)
+ ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new
String[sslCipherSuites.size()]));
+
+ ssl.setSslKeyManagerFactoryAlgorithm((String)
getOrDefault(sslConfigValues, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG,
SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM));
+
+ String sslSecureRandomImpl = (String)
sslConfigValues.get(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);
+ if (sslSecureRandomImpl != null)
+ ssl.setSecureRandomAlgorithm(sslSecureRandomImpl);
+
+ ssl.setTrustManagerFactoryAlgorithm((String)
getOrDefault(sslConfigValues, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM));
+ }
+
+ /**
+ * Configures Protocol, Algorithm and Provider related settings in
SslContextFactory
+ */
+ protected static void
configureSslContextFactoryEndpointIdentification(SslContextFactory ssl,
Map<String, Object> sslConfigValues) {
+ String sslEndpointIdentificationAlg = (String)
sslConfigValues.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
+ if (sslEndpointIdentificationAlg != null)
+
ssl.setEndpointIdentificationAlgorithm(sslEndpointIdentificationAlg);
+ }
+
+ /**
+ * Configures Authentication related settings in SslContextFactory
+ */
+ protected static void
configureSslContextFactoryAuthentication(SslContextFactory ssl, Map<String,
Object> sslConfigValues) {
+ String sslClientAuth = (String) getOrDefault(sslConfigValues,
BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
+ switch (sslClientAuth) {
+ case "requested":
+ ssl.setWantClientAuth(true);
+ break;
+ case "required":
+ ssl.setNeedClientAuth(true);
+ break;
+ default:
+ ssl.setNeedClientAuth(false);
+ ssl.setWantClientAuth(false);
+ }
+ }
+}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 8c53372..d26aa04 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -16,20 +16,27 @@
*/
package org.apache.kafka.connect.runtime.rest;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.modules.junit4.PowerMockRunner;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
@@ -37,12 +44,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.Response;
-
import static org.junit.Assert.assertEquals;
@RunWith(PowerMockRunner.class)
@@ -59,13 +60,16 @@ public class RestServerTest {
private Map<String, String> baseWorkerProps() {
Map<String, String> workerProps = new HashMap<>();
- workerProps.put("key.converter",
"org.apache.kafka.connect.json.JsonConverter");
- workerProps.put("value.converter",
"org.apache.kafka.connect.json.JsonConverter");
- workerProps.put("internal.key.converter",
"org.apache.kafka.connect.json.JsonConverter");
- workerProps.put("internal.value.converter",
"org.apache.kafka.connect.json.JsonConverter");
- workerProps.put("internal.key.converter.schemas.enable", "false");
- workerProps.put("internal.value.converter.schemas.enable", "false");
- workerProps.put("offset.storage.file.filename",
"/tmp/connect.offsets");
+ workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG,
"status-topic");
+ workerProps.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
+ workerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
+ workerProps.put(DistributedConfig.GROUP_ID_CONFIG,
"connect-test-group");
+ workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"connect-offsets");
+
return workerProps;
}
@@ -79,6 +83,72 @@ public class RestServerTest {
checkCORSRequest("", "http://bar.com", null, null);
}
+ @Test
+ public void testParseListeners() {
+ // Use listeners field
+ Map<String, String> configMap = new HashMap<>(baseWorkerProps());
+ configMap.put(WorkerConfig.LISTENERS_CONFIG,
"http://localhost:8080,https://localhost:8443");
+ DistributedConfig config = new DistributedConfig(configMap);
+
+ server = new RestServer(config);
+ Assert.assertArrayEquals(new String[] {"http://localhost:8080",
"https://localhost:8443"}, server.parseListeners().toArray());
+
+ // Build listener from hostname and port
+ configMap = new HashMap<>(baseWorkerProps());
+ configMap.put(WorkerConfig.REST_HOST_NAME_CONFIG, "my-hostname");
+ configMap.put(WorkerConfig.REST_PORT_CONFIG, "8080");
+ config = new DistributedConfig(configMap);
+ server = new RestServer(config);
+ Assert.assertArrayEquals(new String[] {"http://my-hostname:8080"},
server.parseListeners().toArray());
+ }
+
+ @Test
+ public void testAdvertisedUri() {
+ // Advertised URI from listeenrs without protocol
+ Map<String, String> configMap = new HashMap<>(baseWorkerProps());
+ configMap.put(WorkerConfig.LISTENERS_CONFIG,
"http://localhost:8080,https://localhost:8443");
+ DistributedConfig config = new DistributedConfig(configMap);
+
+ server = new RestServer(config);
+ Assert.assertEquals("http://localhost:8080/",
server.advertisedUrl().toString());
+
+ // Advertised URI from listeners with protocol
+ configMap = new HashMap<>(baseWorkerProps());
+ configMap.put(WorkerConfig.LISTENERS_CONFIG,
"http://localhost:8080,https://localhost:8443");
+ configMap.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, "https");
+ config = new DistributedConfig(configMap);
+
+ server = new RestServer(config);
+ Assert.assertEquals("https://localhost:8443/",
server.advertisedUrl().toString());
+
+ // Advertised URI from listeners with only SSL available
+ configMap = new HashMap<>(baseWorkerProps());
+ configMap.put(WorkerConfig.LISTENERS_CONFIG, "https://localhost:8443");
+ config = new DistributedConfig(configMap);
+
+ server = new RestServer(config);
+ Assert.assertEquals("https://localhost:8443/",
server.advertisedUrl().toString());
+
+ // Listener is overriden by advertised values
+ configMap = new HashMap<>(baseWorkerProps());
+ configMap.put(WorkerConfig.LISTENERS_CONFIG, "https://localhost:8443");
+ configMap.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
+ configMap.put(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG,
"somehost");
+ configMap.put(WorkerConfig.REST_ADVERTISED_PORT_CONFIG, "10000");
+ config = new DistributedConfig(configMap);
+
+ server = new RestServer(config);
+ Assert.assertEquals("http://somehost:10000/",
server.advertisedUrl().toString());
+
+ // listener from hostname and port
+ configMap = new HashMap<>(baseWorkerProps());
+ configMap.put(WorkerConfig.REST_HOST_NAME_CONFIG, "my-hostname");
+ configMap.put(WorkerConfig.REST_PORT_CONFIG, "8080");
+ config = new DistributedConfig(configMap);
+ server = new RestServer(config);
+ Assert.assertEquals("http://my-hostname:8080/",
server.advertisedUrl().toString());
+ }
+
public void checkCORSRequest(String corsDomain, String origin, String
expectedHeader, String method) {
// To be able to set the Origin, we need to toggle this flag
System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
@@ -98,7 +168,7 @@ public class RestServerTest {
Map<String, String> workerProps = baseWorkerProps();
workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG,
corsDomain);
workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG,
method);
- WorkerConfig workerConfig = new StandaloneConfig(workerProps);
+ WorkerConfig workerConfig = new DistributedConfig(workerProps);
server = new RestServer(workerConfig);
server.start(herder);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index ff9043e..ad360b6 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -32,10 +32,11 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.TestSinkConnector;
import org.apache.kafka.connect.runtime.TestSourceConnector;
+import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
-import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@@ -78,7 +79,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
-@PrepareForTest(RestServer.class)
+@PrepareForTest(RestClient.class)
@PowerMockIgnore("javax.management.*")
public class ConnectorPluginsResourceTest {
@@ -177,8 +178,8 @@ public class ConnectorPluginsResourceTest {
@Before
public void setUp() throws Exception {
- PowerMock.mockStatic(RestServer.class,
- RestServer.class.getMethod("httpRequest",
String.class, String.class, Object.class, TypeReference.class));
+ PowerMock.mockStatic(RestClient.class,
+ RestClient.class.getMethod("httpRequest", String.class,
String.class, Object.class, TypeReference.class, WorkerConfig.class));
plugins = PowerMock.createMock(Plugins.class);
herder = PowerMock.createMock(AbstractHerder.class);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 89a2218..6e17349 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -23,9 +23,10 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
-import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
@@ -59,7 +60,7 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
@RunWith(PowerMockRunner.class)
-@PrepareForTest(RestServer.class)
+@PrepareForTest(RestClient.class)
@PowerMockIgnore("javax.management.*")
@SuppressWarnings("unchecked")
public class ConnectorsResourceTest {
@@ -102,9 +103,9 @@ public class ConnectorsResourceTest {
@Before
public void setUp() throws NoSuchMethodException {
- PowerMock.mockStatic(RestServer.class,
- RestServer.class.getMethod("httpRequest", String.class,
String.class, Object.class, TypeReference.class));
- connectorsResource = new ConnectorsResource(herder);
+ PowerMock.mockStatic(RestClient.class,
+ RestClient.class.getMethod("httpRequest", String.class,
String.class, Object.class, TypeReference.class, WorkerConfig.class));
+ connectorsResource = new ConnectorsResource(herder, null);
}
@Test
@@ -128,9 +129,9 @@ public class ConnectorsResourceTest {
herder.connectors(EasyMock.capture(cb));
expectAndCallbackNotLeaderException(cb);
// Should forward request
-
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"),
EasyMock.eq("GET"),
- EasyMock.isNull(), EasyMock.anyObject(TypeReference.class)))
- .andReturn(new RestServer.HttpResponse<>(200, new
HashMap<String, List<String>>(), Arrays.asList(CONNECTOR2_NAME,
CONNECTOR_NAME)));
+
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"),
EasyMock.eq("GET"),
+ EasyMock.isNull(), EasyMock.anyObject(TypeReference.class),
EasyMock.anyObject(WorkerConfig.class)))
+ .andReturn(new RestClient.HttpResponse<>(200, new
HashMap<String, String>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)));
PowerMock.replayAll();
@@ -177,8 +178,8 @@ public class ConnectorsResourceTest {
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME),
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
expectAndCallbackNotLeaderException(cb);
// Should forward request
-
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"),
EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject()))
- .andReturn(new RestServer.HttpResponse<>(201, new
HashMap<String, List<String>>(), new ConnectorInfo(CONNECTOR_NAME,
CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
+
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"),
EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject(),
EasyMock.anyObject(WorkerConfig.class)))
+ .andReturn(new RestClient.HttpResponse<>(201, new
HashMap<String, String>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES,
ConnectorType.SOURCE)));
PowerMock.replayAll();
@@ -242,8 +243,8 @@ public class ConnectorsResourceTest {
herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME),
EasyMock.capture(cb));
expectAndCallbackNotLeaderException(cb);
// Should forward request
-
EasyMock.expect(RestServer.httpRequest("http://leader:8083/connectors/" +
CONNECTOR_NAME + "?forward=false", "DELETE", null, null))
- .andReturn(new RestServer.HttpResponse<>(204, new
HashMap<String, List<String>>(), null));
+
EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" +
CONNECTOR_NAME + "?forward=false", "DELETE", null, null, null))
+ .andReturn(new RestClient.HttpResponse<>(204, new
HashMap<String, String>(), null));
PowerMock.replayAll();
@@ -445,9 +446,9 @@ public class ConnectorsResourceTest {
herder.restartConnector(EasyMock.eq(CONNECTOR_NAME),
EasyMock.capture(cb));
expectAndCallbackNotLeaderException(cb);
-
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors/"
+ CONNECTOR_NAME + "/restart?forward=true"),
- EasyMock.eq("POST"), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject()))
- .andReturn(new RestServer.HttpResponse<>(202, new
HashMap<String, List<String>>(), null));
+
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/"
+ CONNECTOR_NAME + "/restart?forward=true"),
+ EasyMock.eq("POST"), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+ .andReturn(new RestClient.HttpResponse<>(202, new
HashMap<String, String>(), null));
PowerMock.replayAll();
@@ -463,9 +464,9 @@ public class ConnectorsResourceTest {
String ownerUrl = "http://owner:8083";
expectAndCallbackException(cb, new NotAssignedException("not owner
test", ownerUrl));
-
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://owner:8083/connectors/"
+ CONNECTOR_NAME + "/restart?forward=false"),
- EasyMock.eq("POST"), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject()))
- .andReturn(new RestServer.HttpResponse<>(202, new
HashMap<String, List<String>>(), null));
+
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/"
+ CONNECTOR_NAME + "/restart?forward=false"),
+ EasyMock.eq("POST"), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+ .andReturn(new RestClient.HttpResponse<>(202, new
HashMap<String, String>(), null));
PowerMock.replayAll();
@@ -496,9 +497,9 @@ public class ConnectorsResourceTest {
herder.restartTask(EasyMock.eq(taskId), EasyMock.capture(cb));
expectAndCallbackNotLeaderException(cb);
-
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors/"
+ CONNECTOR_NAME + "/tasks/0/restart?forward=true"),
- EasyMock.eq("POST"), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject()))
- .andReturn(new RestServer.HttpResponse<>(202, new
HashMap<String, List<String>>(), null));
+
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/"
+ CONNECTOR_NAME + "/tasks/0/restart?forward=true"),
+ EasyMock.eq("POST"), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+ .andReturn(new RestClient.HttpResponse<>(202, new
HashMap<String, String>(), null));
PowerMock.replayAll();
@@ -516,9 +517,9 @@ public class ConnectorsResourceTest {
String ownerUrl = "http://owner:8083";
expectAndCallbackException(cb, new NotAssignedException("not owner
test", ownerUrl));
-
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://owner:8083/connectors/"
+ CONNECTOR_NAME + "/tasks/0/restart?forward=false"),
- EasyMock.eq("POST"), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject()))
- .andReturn(new RestServer.HttpResponse<>(202, new
HashMap<String, List<String>>(), null));
+
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/"
+ CONNECTOR_NAME + "/tasks/0/restart?forward=false"),
+ EasyMock.eq("POST"), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+ .andReturn(new RestClient.HttpResponse<>(202, new
HashMap<String, String>(), null));
PowerMock.replayAll();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
new file mode 100644
index 0000000..422b2b0
--- /dev/null
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.util;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SSLUtilsTest {
+ private static final Map<String, String> DEFAULT_CONFIG = new HashMap<>();
+ static {
+ // The WorkerConfig base class has some required settings without
defaults
+ DEFAULT_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG,
"status-topic");
+ DEFAULT_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG,
"config-topic");
+ DEFAULT_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
+ DEFAULT_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG,
"connect-test-group");
+ DEFAULT_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ DEFAULT_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG,
"connect-offsets");
+ }
+
+ @Test
+ public void testGetOrDefault() {
+ String existingKey = "exists";
+ String missingKey = "missing";
+ String value = "value";
+ String defaultValue = "default";
+ Map<String, Object> map = new HashMap<>();
+ map.put("exists", "value");
+
+ Assert.assertEquals(SSLUtils.getOrDefault(map, existingKey,
defaultValue), value);
+ Assert.assertEquals(SSLUtils.getOrDefault(map, missingKey,
defaultValue), defaultValue);
+ }
+
+ @Test
+ public void testCreateSslContextFactory() {
+ Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG);
+ configMap.put("ssl.keystore.location", "/path/to/keystore");
+ configMap.put("ssl.keystore.password", "123456");
+ configMap.put("ssl.key.password", "123456");
+ configMap.put("ssl.truststore.location", "/path/to/truststore");
+ configMap.put("ssl.truststore.password", "123456");
+ configMap.put("ssl.provider", "SunJSSE");
+ configMap.put("ssl.cipher.suites",
"SSL_RSA_WITH_RC4_128_SHA,SSL_RSA_WITH_RC4_128_MD5");
+ configMap.put("ssl.secure.random.implementation", "SHA1PRNG");
+ configMap.put("ssl.client.auth", "required");
+ configMap.put("ssl.endpoint.identification.algorithm", "HTTPS");
+ configMap.put("ssl.keystore.type", "JKS");
+ configMap.put("ssl.protocol", "TLS");
+ configMap.put("ssl.truststore.type", "JKS");
+ configMap.put("ssl.enabled.protocols", "TLSv1.2,TLSv1.1,TLSv1");
+ configMap.put("ssl.keymanager.algorithm", "SunX509");
+ configMap.put("ssl.trustmanager.algorithm", "PKIX");
+
+ DistributedConfig config = new DistributedConfig(configMap);
+ SslContextFactory ssl = SSLUtils.createSslContextFactory(config);
+
+ Assert.assertEquals("/path/to/keystore", ssl.getKeyStorePath());
+ Assert.assertEquals("/path/to/truststore", ssl.getTrustStore());
+ Assert.assertEquals("SunJSSE", ssl.getProvider());
+ Assert.assertArrayEquals(new String[] {"SSL_RSA_WITH_RC4_128_SHA",
"SSL_RSA_WITH_RC4_128_MD5"}, ssl.getIncludeCipherSuites());
+ Assert.assertEquals("SHA1PRNG", ssl.getSecureRandomAlgorithm());
+ Assert.assertTrue(ssl.getNeedClientAuth());
+ Assert.assertEquals("JKS", ssl.getKeyStoreType());
+ Assert.assertEquals("JKS", ssl.getTrustStoreType());
+ Assert.assertEquals("TLS", ssl.getProtocol());
+ Assert.assertArrayEquals(new String[] {"TLSv1.2", "TLSv1.1", "TLSv1"},
ssl.getIncludeProtocols());
+ Assert.assertEquals("SunX509", ssl.getSslKeyManagerFactoryAlgorithm());
+ Assert.assertEquals("PKIX", ssl.getTrustManagerFactoryAlgorithm());
+ }
+
+ @Test
+ public void testCreateSslContextFactoryDefaultValues() {
+ Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG);
+ configMap.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
"/tmp/offset/file");
+ configMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ configMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ configMap.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ configMap.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
+ configMap.put("ssl.keystore.location", "/path/to/keystore");
+ configMap.put("ssl.keystore.password", "123456");
+ configMap.put("ssl.key.password", "123456");
+ configMap.put("ssl.truststore.location", "/path/to/truststore");
+ configMap.put("ssl.truststore.password", "123456");
+ configMap.put("ssl.provider", "SunJSSE");
+ configMap.put("ssl.cipher.suites",
"SSL_RSA_WITH_RC4_128_SHA,SSL_RSA_WITH_RC4_128_MD5");
+ configMap.put("ssl.secure.random.implementation", "SHA1PRNG");
+
+ DistributedConfig config = new DistributedConfig(configMap);
+ SslContextFactory ssl = SSLUtils.createSslContextFactory(config);
+
+ Assert.assertEquals(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE,
ssl.getKeyStoreType());
+ Assert.assertEquals(SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE,
ssl.getTrustStoreType());
+ Assert.assertEquals(SslConfigs.DEFAULT_SSL_PROTOCOL,
ssl.getProtocol());
+
Assert.assertArrayEquals(Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split("\\s*,\\s*")).toArray(),
ssl.getIncludeProtocols());
+ Assert.assertEquals(SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM,
ssl.getSslKeyManagerFactoryAlgorithm());
+ Assert.assertEquals(SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM,
ssl.getTrustManagerFactoryAlgorithm());
+ Assert.assertFalse(ssl.getNeedClientAuth());
+ Assert.assertFalse(ssl.getWantClientAuth());
+ }
+}
diff --git a/docs/connect.html b/docs/connect.html
index b910cf5..1230a7a 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -187,7 +187,43 @@
<h4><a id="connect_rest" href="#connect_rest">REST API</a></h4>
- <p>Since Kafka Connect is intended to be run as a service, it also
provides a REST API for managing connectors. By default, this service runs on
port 8083. The following are the currently supported endpoints:</p>
+ <p>Since Kafka Connect is intended to be run as a service, it also
provides a REST API for managing connectors. The REST API server can be
configured using the <code>listeners</code> configuration option.
+ This field should contain a list of listeners in the following format:
<code>protocol://host:port,protocol2://host2:port2</code>. Currently supported
protocols are <code>http</code> and <code>https</code>.
+ For example:</p>
+
+ <pre class="brush: text;">
+ listeners=http://localhost:8080,https://localhost:8443
+ </pre>
+
+ <p>By default, if no <code>listeners</code> are specified, the REST server
runs on port 8083 using the HTTP protocol. When using HTTPS, the configuration
has to include the SSL configuration.
+ By default, it will use the <code>ssl.*</code> settings. In case it is
needed to use different configuration for the REST API than for connecting to
Kafka brokers, the fields can be prefixed with <code>listeners.https</code>.
+ When using the prefix, only the prefixed options will be used and the
<code>ssl.*</code> options without the prefix will be ignored. Following fields
can be used to configure HTTPS for the REST API:</p>
+
+ <ul>
+ <li><code>ssl.keystore.location</code></li>
+ <li><code>ssl.keystore.password</code></li>
+ <li><code>ssl.keystore.type</code></li>
+ <li><code>ssl.key.password</code></li>
+ <li><code>ssl.truststore.location</code></li>
+ <li><code>ssl.truststore.password</code></li>
+ <li><code>ssl.truststore.type</code></li>
+ <li><code>ssl.enabled.protocols</code></li>
+ <li><code>ssl.provider</code></li>
+ <li><code>ssl.protocol</code></li>
+ <li><code>ssl.cipher.suites</code></li>
+ <li><code>ssl.keymanager.algorithm</code></li>
+ <li><code>ssl.secure.random.implementation</code></li>
+ <li><code>ssl.trustmanager.algorithm</code></li>
+ <li><code>ssl.endpoint.identification.algorithm</code></li>
+ <li><code>ssl.client.auth</code></li>
+ </ul>
+
+ <p>The REST API is used not only by users to monitor / manage Kafka
Connect. It is also used for the Kafka Connect cross-cluster communication.
Requests received on the follower nodes REST API will be forwarded to the
leader node REST API.
+ In case the URI under which is given host reachable is different from the
URI which it listens on, the configuration options
<code>rest.advertised.host.name</code>, <code>rest.advertised.port</code> and
<code>rest.advertised.listener</code>
+ can be used to change the URI which will be used by the follower nodes to
connect with the leader. When using both HTTP and HTTPS listeners, the
<code>rest.advertised.listener</code> option can be also used to define which
listener
+ will be used for the cross-cluster communication. When using HTTPS for
communication between nodes, the same <code>ssl.*</code> or
<code>listeners.https</code> options will be used to configure the HTTPS
client.</p>
+
+ <p>The following are the currently supported REST API endpoints:</p>
<ul>
<li><code>GET /connectors</code> - return a list of active
connectors</li>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 0f395b8..963fd03 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -92,6 +92,7 @@ libs += [
jacksonDatabind:
"com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
jacksonJaxrsJsonProvider:
"com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
jettyServer: "org.eclipse.jetty:jetty-server:$versions.jetty",
+ jettyClient: "org.eclipse.jetty:jetty-client:$versions.jetty",
jettyServlet: "org.eclipse.jetty:jetty-servlet:$versions.jetty",
jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty",
jerseyContainerServlet:
"org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey",
--
To stop receiving notification emails like this one, please contact
[email protected].