This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 70761b4 KAFKA-8404: Add HttpHeader to RestClient HTTP Request and
Connector REST API (#6791)
70761b4 is described below
commit 70761b4b93479e53c277c4d427d9fc13e5ef19a8
Author: Hai-Dang Dam <[email protected]>
AuthorDate: Mon Jun 3 19:06:00 2019 -0700
KAFKA-8404: Add HttpHeader to RestClient HTTP Request and Connector REST
API (#6791)
When Connect forwards a REST request from one worker to another, the
Authorization header was not forwarded. This commit changes the Connect
framework to add include the authorization header when forwarding requests to
other workers.
Author: Hai-Dang Dam <[email protected]>
Reviewers: Robert Yokota <[email protected]>, Randall Hauch
<[email protected]>
# Conflicts:
#
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
#
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
# Conflicts:
#
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
---
.../basic/auth/extension/JaasBasicAuthFilter.java | 16 +--
.../auth/extension/JaasBasicAuthFilterTest.java | 34 +++++-
.../runtime/distributed/DistributedHerder.java | 11 +-
.../kafka/connect/runtime/rest/RestClient.java | 21 +++-
.../runtime/rest/resources/ConnectorsResource.java | 49 +++++---
.../resources/ConnectorPluginsResourceTest.java | 3 +-
.../rest/resources/ConnectorsResourceTest.java | 123 ++++++++++++++-------
7 files changed, 185 insertions(+), 72 deletions(-)
diff --git
a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
index 6167434..d5b15c6 100644
---
a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
+++
b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
@@ -17,6 +17,8 @@
package org.apache.kafka.connect.rest.basic.auth.extension;
+import java.util.regex.Pattern;
+import javax.ws.rs.HttpMethod;
import org.apache.kafka.common.config.ConfigException;
import java.io.IOException;
@@ -35,18 +37,18 @@ import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.core.Response;
public class JaasBasicAuthFilter implements ContainerRequestFilter {
-
private static final String CONNECT_LOGIN_MODULE = "KafkaConnect";
static final String AUTHORIZATION = "Authorization";
-
+ private static final Pattern TASK_REQUEST_PATTERN =
Pattern.compile("/?connectors/([^/]+)/tasks/?");
@Override
public void filter(ContainerRequestContext requestContext) throws
IOException {
-
try {
- LoginContext loginContext =
- new LoginContext(CONNECT_LOGIN_MODULE, new
BasicAuthCallBackHandler(
- requestContext.getHeaderString(AUTHORIZATION)));
- loginContext.login();
+ if (!(requestContext.getMethod().equals(HttpMethod.POST) &&
TASK_REQUEST_PATTERN.matcher(requestContext.getUriInfo().getPath()).matches()))
{
+ LoginContext loginContext =
+ new LoginContext(CONNECT_LOGIN_MODULE, new
BasicAuthCallBackHandler(
+ requestContext.getHeaderString(AUTHORIZATION)));
+ loginContext.login();
+ }
} catch (LoginException | ConfigException e) {
requestContext.abortWith(
Response.status(Response.Status.UNAUTHORIZED)
diff --git
a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
index d61fc06..c81f8f6 100644
---
a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
+++
b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
@@ -17,12 +17,15 @@
package org.apache.kafka.connect.rest.basic.auth.extension;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.UriInfo;
import org.apache.kafka.common.security.JaasUtils;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -52,6 +55,9 @@ public class JaasBasicAuthFilterTest {
private String previousJaasConfig;
private Configuration previousConfiguration;
+ @MockStrict
+ private UriInfo uriInfo;
+
@Before
public void setup() throws IOException {
EasyMock.reset(requestContext);
@@ -137,7 +143,34 @@ public class JaasBasicAuthFilterTest {
jaasBasicAuthFilter.filter(requestContext);
}
+ @Test
+ public void testPostWithoutAppropriateCredential() throws IOException {
+ EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.POST);
+ EasyMock.expect(requestContext.getUriInfo()).andReturn(uriInfo);
+
EasyMock.expect(uriInfo.getPath()).andReturn("connectors/connName/tasks");
+
+ PowerMock.replayAll();
+ jaasBasicAuthFilter.filter(requestContext);
+ EasyMock.verify(requestContext);
+ }
+
+ @Test
+ public void testPostNotChangingConnectorTask() throws IOException {
+ EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.POST);
+ EasyMock.expect(requestContext.getUriInfo()).andReturn(uriInfo);
+
EasyMock.expect(uriInfo.getPath()).andReturn("local:randomport/connectors/connName");
+ String authHeader = "Basic" +
Base64.getEncoder().encodeToString(("user" + ":" + "password").getBytes());
+
EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
+ .andReturn(authHeader);
+ requestContext.abortWith(EasyMock.anyObject(Response.class));
+ EasyMock.expectLastCall();
+ PowerMock.replayAll();
+ jaasBasicAuthFilter.filter(requestContext);
+ EasyMock.verify(requestContext);
+ }
+
private void setMock(String authorization, String username, String
password, boolean exceptionCase) {
+ EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.GET);
String authHeader = authorization + " " +
Base64.getEncoder().encodeToString((username + ":" + password).getBytes());
EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
.andReturn(authHeader);
@@ -152,7 +185,6 @@ public class JaasBasicAuthFilterTest {
File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
jaasConfigFile.deleteOnExit();
previousJaasConfig =
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
-
List<String> lines;
lines = new ArrayList<>();
lines.add(loginModule + " {
org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule
required ");
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 7edc3b2..b229102 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1023,8 +1023,15 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
@Override
public void run() {
try {
- String reconfigUrl =
RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks");
- RestClient.httpRequest(reconfigUrl, "POST",
rawTaskProps, null, config);
+ String leaderUrl = leaderUrl();
+ if (leaderUrl == null ||
leaderUrl.trim().isEmpty()) {
+ cb.onCompletion(new
ConnectException("Request to leader to " +
+ "reconfigure connector tasks
failed " +
+ "because the URL of the leader's
REST interface is empty!"), null);
+ return;
+ }
+ String reconfigUrl =
RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks");
+ RestClient.httpRequest(reconfigUrl, "POST",
null, rawTaskProps, null, config);
cb.onCompletion(null, null);
} catch (ConnectException e) {
log.error("Request to leader to reconfigure
connector tasks failed", e);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index 15e8418..de11f26 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.rest;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import javax.ws.rs.core.HttpHeaders;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@@ -50,12 +51,13 @@ public class RestClient {
*
* @param url HTTP connection will be established with this
url.
* @param method HTTP method ("GET", "POST", "PUT", etc.)
+ * @param headers HTTP headers from REST endpoint
* @param requestBodyData Object to serialize as JSON and send in the
request body.
* @param responseFormat Expected format of the response to the HTTP
request.
* @param <T> The type of the deserialized response to the
HTTP request.
* @return The deserialized response to the HTTP request, or null if no
data is expected.
*/
- public static <T> HttpResponse<T> httpRequest(String url, String method,
Object requestBodyData,
+ public static <T> HttpResponse<T> httpRequest(String url, String method,
HttpHeaders headers, Object requestBodyData,
TypeReference<T>
responseFormat, WorkerConfig config) {
HttpClient client;
@@ -82,6 +84,8 @@ public class RestClient {
req.method(method);
req.accept("application/json");
req.agent("kafka-connect");
+ addHeadersToRequest(headers, req);
+
if (serializedBody != null) {
req.content(new StringContentProvider(serializedBody,
StandardCharsets.UTF_8), "application/json");
}
@@ -116,6 +120,21 @@ public class RestClient {
}
}
+
+ /**
+ * Extract headers from REST call and add to client request
+ * @param headers Headers from REST endpoint
+ * @param req The client request to modify
+ */
+ private static void addHeadersToRequest(HttpHeaders headers, Request req) {
+ if (headers != null) {
+ String credentialAuthorization =
headers.getHeaderString(HttpHeaders.AUTHORIZATION);
+ if (credentialAuthorization != null) {
+ req.header(HttpHeaders.AUTHORIZATION, credentialAuthorization);
+ }
+ }
+ }
+
/**
* Convert response parameters from Jetty format (HttpFields)
* @param httpFields
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 4a04512..26a09ea 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -17,6 +17,8 @@
package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
+
+import javax.ws.rs.core.HttpHeaders;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -44,6 +46,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
@@ -79,16 +82,18 @@ public class ConnectorsResource {
@GET
@Path("/")
- public Collection<String> listConnectors(final @QueryParam("forward")
Boolean forward) throws Throwable {
+ public Collection<String> listConnectors(final @QueryParam("forward")
Boolean forward,
+ final @Context HttpHeaders
headers) throws Throwable {
FutureCallback<Collection<String>> cb = new FutureCallback<>();
herder.connectors(cb);
- return completeOrForwardRequest(cb, "/connectors", "GET", null, new
TypeReference<Collection<String>>() {
+ return completeOrForwardRequest(cb, "/connectors", "GET", headers,
null, new TypeReference<Collection<String>>() {
}, forward);
}
@POST
@Path("/")
public Response createConnector(final @QueryParam("forward") Boolean
forward,
+ final @Context HttpHeaders headers,
final CreateConnectorRequest
createRequest) throws Throwable {
// Trim leading and trailing whitespaces from the connector name,
replace null with empty string
// if no name element present to keep validation within validator
(NonEmptyStringWithoutControlChars
@@ -100,7 +105,7 @@ public class ConnectorsResource {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new
FutureCallback<>();
herder.putConnectorConfig(name, configs, false, cb);
- Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb,
"/connectors", "POST", createRequest,
+ Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb,
"/connectors", "POST", headers, createRequest,
new TypeReference<ConnectorInfo>() { }, new
CreatedConnectorInfoTranslator(), forward);
URI location = UriBuilder.fromUri("/connectors").path(name).build();
@@ -110,19 +115,21 @@ public class ConnectorsResource {
@GET
@Path("/{connector}")
public ConnectorInfo getConnector(final @PathParam("connector") String
connector,
+ final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean
forward) throws Throwable {
FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
herder.connectorInfo(connector, cb);
- return completeOrForwardRequest(cb, "/connectors/" + connector, "GET",
null, forward);
+ return completeOrForwardRequest(cb, "/connectors/" + connector, "GET",
headers, null, forward);
}
@GET
@Path("/{connector}/config")
public Map<String, String> getConnectorConfig(final
@PathParam("connector") String connector,
+ final @Context HttpHeaders
headers,
final @QueryParam("forward")
Boolean forward) throws Throwable {
FutureCallback<Map<String, String>> cb = new FutureCallback<>();
herder.connectorConfig(connector, cb);
- return completeOrForwardRequest(cb, "/connectors/" + connector +
"/config", "GET", null, forward);
+ return completeOrForwardRequest(cb, "/connectors/" + connector +
"/config", "GET", headers, null, forward);
}
@GET
@@ -134,6 +141,7 @@ public class ConnectorsResource {
@PUT
@Path("/{connector}/config")
public Response putConnectorConfig(final @PathParam("connector") String
connector,
+ final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean
forward,
final Map<String, String>
connectorConfig) throws Throwable {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new
FutureCallback<>();
@@ -141,7 +149,7 @@ public class ConnectorsResource {
herder.putConnectorConfig(connector, connectorConfig, true, cb);
Herder.Created<ConnectorInfo> createdInfo =
completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
- "PUT", connectorConfig, new TypeReference<ConnectorInfo>() {
}, new CreatedConnectorInfoTranslator(), forward);
+ "PUT", headers, connectorConfig, new
TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(),
forward);
Response.ResponseBuilder response;
if (createdInfo.created()) {
URI location =
UriBuilder.fromUri("/connectors").path(connector).build();
@@ -155,15 +163,16 @@ public class ConnectorsResource {
@POST
@Path("/{connector}/restart")
public void restartConnector(final @PathParam("connector") String
connector,
+ final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward)
throws Throwable {
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartConnector(connector, cb);
- completeOrForwardRequest(cb, "/connectors/" + connector + "/restart",
"POST", null, forward);
+ completeOrForwardRequest(cb, "/connectors/" + connector + "/restart",
"POST", headers, null, forward);
}
@PUT
@Path("/{connector}/pause")
- public Response pauseConnector(@PathParam("connector") String connector) {
+ public Response pauseConnector(@PathParam("connector") String connector,
final @Context HttpHeaders headers) {
herder.pauseConnector(connector);
return Response.accepted().build();
}
@@ -178,26 +187,29 @@ public class ConnectorsResource {
@GET
@Path("/{connector}/tasks")
public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String
connector,
+ final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean
forward) throws Throwable {
FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
herder.taskConfigs(connector, cb);
- return completeOrForwardRequest(cb, "/connectors/" + connector +
"/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
+ return completeOrForwardRequest(cb, "/connectors/" + connector +
"/tasks", "GET", headers, null, new TypeReference<List<TaskInfo>>() {
}, forward);
}
@POST
@Path("/{connector}/tasks")
public void putTaskConfigs(final @PathParam("connector") String connector,
+ final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward,
final List<Map<String, String>> taskConfigs)
throws Throwable {
FutureCallback<Void> cb = new FutureCallback<>();
herder.putTaskConfigs(connector, taskConfigs, cb);
- completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks",
"POST", taskConfigs, forward);
+ completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks",
"POST", headers, taskConfigs, forward);
}
@GET
@Path("/{connector}/tasks/{task}/status")
public ConnectorStateInfo.TaskState getTaskStatus(final
@PathParam("connector") String connector,
+ final @Context
HttpHeaders headers,
final @PathParam("task")
Integer task) throws Throwable {
return herder.taskStatus(new ConnectorTaskId(connector, task));
}
@@ -206,20 +218,22 @@ public class ConnectorsResource {
@Path("/{connector}/tasks/{task}/restart")
public void restartTask(final @PathParam("connector") String connector,
final @PathParam("task") Integer task,
+ final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward)
throws Throwable {
FutureCallback<Void> cb = new FutureCallback<>();
ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
herder.restartTask(taskId, cb);
- completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" +
task + "/restart", "POST", null, forward);
+ completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" +
task + "/restart", "POST", headers, null, forward);
}
@DELETE
@Path("/{connector}")
public void destroyConnector(final @PathParam("connector") String
connector,
+ final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward)
throws Throwable {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new
FutureCallback<>();
herder.deleteConnectorConfig(connector, cb);
- completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE",
null, forward);
+ completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE",
headers, null, forward);
}
// Check whether the connector name from the url matches the one (if there
is one) provided in the connectorconfig
@@ -239,6 +253,7 @@ public class ConnectorsResource {
private <T, U> T completeOrForwardRequest(FutureCallback<T> cb,
String path,
String method,
+ HttpHeaders headers,
Object body,
TypeReference<U> resultType,
Translator<T, U> translator,
@@ -261,7 +276,7 @@ public class ConnectorsResource {
.build()
.toString();
log.debug("Forwarding request {} {} {}", forwardUrl,
method, body);
- return
translator.translate(RestClient.httpRequest(forwardUrl, method, body,
resultType, config));
+ return
translator.translate(RestClient.httpRequest(forwardUrl, method, headers, body,
resultType, config));
} else {
// we should find the right target for the query within
two hops, so if
// we don't, it probably means that a rebalance has taken
place.
@@ -283,14 +298,14 @@ public class ConnectorsResource {
}
}
- private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path,
String method, Object body,
+ private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path,
String method, HttpHeaders headers, Object body,
TypeReference<T> resultType,
Boolean forward) throws Throwable {
- return completeOrForwardRequest(cb, path, method, body, resultType,
new IdentityTranslator<T>(), forward);
+ return completeOrForwardRequest(cb, path, method, headers, body,
resultType, new IdentityTranslator<T>(), forward);
}
- private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path,
String method,
+ private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path,
String method, HttpHeaders headers,
Object body, Boolean forward)
throws Throwable {
- return completeOrForwardRequest(cb, path, method, body, null, new
IdentityTranslator<T>(), forward);
+ return completeOrForwardRequest(cb, path, method, headers, body, null,
new IdentityTranslator<T>(), forward);
}
private interface Translator<T, U> {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index ad360b6..67cae67 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import javax.ws.rs.core.HttpHeaders;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -179,7 +180,7 @@ public class ConnectorPluginsResourceTest {
@Before
public void setUp() throws Exception {
PowerMock.mockStatic(RestClient.class,
- RestClient.class.getMethod("httpRequest", String.class,
String.class, Object.class, TypeReference.class, WorkerConfig.class));
+ RestClient.class.getMethod("httpRequest", String.class,
String.class, HttpHeaders.class, Object.class, TypeReference.class,
WorkerConfig.class));
plugins = PowerMock.createMock(Plugins.class);
herder = PowerMock.createMock(AbstractHerder.class);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index f84cd25..ba5a2c3 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
+import javax.ws.rs.core.HttpHeaders;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
@@ -75,6 +76,7 @@ public class ConnectorsResourceTest {
private static final String CONNECTOR_NAME_PADDING_WHITESPACES = " " +
CONNECTOR_NAME + " \n ";
private static final Boolean FORWARD = true;
private static final Map<String, String> CONNECTOR_CONFIG_SPECIAL_CHARS =
new HashMap<>();
+ private static final HttpHeaders NULL_HEADERS = null;
static {
CONNECTOR_CONFIG_SPECIAL_CHARS.put("name",
CONNECTOR_NAME_SPECIAL_CHARS);
CONNECTOR_CONFIG_SPECIAL_CHARS.put("sample_config", "test_config");
@@ -125,7 +127,7 @@ public class ConnectorsResourceTest {
@Before
public void setUp() throws NoSuchMethodException {
PowerMock.mockStatic(RestClient.class,
- RestClient.class.getMethod("httpRequest", String.class,
String.class, Object.class, TypeReference.class, WorkerConfig.class));
+ RestClient.class.getMethod("httpRequest", String.class,
String.class, HttpHeaders.class, Object.class, TypeReference.class,
WorkerConfig.class));
connectorsResource = new ConnectorsResource(herder, null);
}
@@ -142,7 +144,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- Collection<String> connectors =
connectorsResource.listConnectors(FORWARD);
+ Collection<String> connectors =
connectorsResource.listConnectors(FORWARD, NULL_HEADERS);
// Ordering isn't guaranteed, compare sets
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME,
CONNECTOR2_NAME)), new HashSet<>(connectors));
@@ -156,15 +158,12 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb);
// Should forward request
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"),
EasyMock.eq("GET"),
- EasyMock.isNull(), EasyMock.anyObject(TypeReference.class),
EasyMock.anyObject(WorkerConfig.class)))
+ EasyMock.isNull(), EasyMock.isNull(),
EasyMock.anyObject(TypeReference.class),
EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(200, new
HashMap<String, String>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)));
PowerMock.replayAll();
- Collection<String> connectors =
connectorsResource.listConnectors(FORWARD);
- // Ordering isn't guaranteed, compare sets
- assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME,
CONNECTOR2_NAME)), new HashSet<>(connectors));
-
+ Collection<String> connectors =
connectorsResource.listConnectors(FORWARD, NULL_HEADERS);
PowerMock.verifyAll();
}
@@ -177,7 +176,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
// throws
- connectorsResource.listConnectors(FORWARD);
+ connectorsResource.listConnectors(FORWARD, NULL_HEADERS);
}
@Test
@@ -191,7 +190,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.createConnector(FORWARD, body);
+ connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
PowerMock.verifyAll();
}
@@ -204,19 +203,57 @@ public class ConnectorsResourceTest {
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME),
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
expectAndCallbackNotLeaderException(cb);
// Should forward request
-
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"),
EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject(),
EasyMock.anyObject(WorkerConfig.class)))
+
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"),
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.eq(body),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(201, new
HashMap<String, String>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES,
ConnectorType.SOURCE)));
PowerMock.replayAll();
- connectorsResource.createConnector(FORWARD, body);
+ connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
PowerMock.verifyAll();
}
+ @Test
+ public void testCreateConnectorWithHeaderAuthorization() throws Throwable {
+ CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
+ HttpHeaders httpHeaders = EasyMock.mock(HttpHeaders.class);
+
EasyMock.expect(httpHeaders.getHeaderString("Authorization")).andReturn("Basic
YWxhZGRpbjpvcGVuc2VzYW1l").times(1);
+ EasyMock.replay(httpHeaders);
+ herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME),
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
+ CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+
+ PowerMock.replayAll();
+
+ connectorsResource.createConnector(FORWARD, httpHeaders, body);
+
+ PowerMock.verifyAll();
+ }
+
+
+
+ @Test
+ public void testCreateConnectorWithoutHeaderAuthorization() throws
Throwable {
+ CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
+ HttpHeaders httpHeaders = EasyMock.mock(HttpHeaders.class);
+
EasyMock.expect(httpHeaders.getHeaderString("Authorization")).andReturn(null).times(1);
+ EasyMock.replay(httpHeaders);
+ herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME),
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
+ CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+
+ PowerMock.replayAll();
+
+ connectorsResource.createConnector(FORWARD, httpHeaders, body);
+
+ PowerMock.verifyAll();
+ }
+
@Test(expected = AlreadyExistsException.class)
public void testCreateConnectorExists() throws Throwable {
CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
@@ -227,7 +264,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.createConnector(FORWARD, body);
+ connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
PowerMock.verifyAll();
}
@@ -246,7 +283,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.createConnector(FORWARD, bodyIn);
+ connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
PowerMock.verifyAll();
}
@@ -265,7 +302,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.createConnector(FORWARD, bodyIn);
+ connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
PowerMock.verifyAll();
}
@@ -284,7 +321,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.createConnector(FORWARD, bodyIn);
+ connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
PowerMock.verifyAll();
}
@@ -297,7 +334,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
+ connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS,
FORWARD);
PowerMock.verifyAll();
}
@@ -308,12 +345,12 @@ public class ConnectorsResourceTest {
herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME),
EasyMock.capture(cb));
expectAndCallbackNotLeaderException(cb);
// Should forward request
-
EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" +
CONNECTOR_NAME + "?forward=false", "DELETE", null, null, null))
+
EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" +
CONNECTOR_NAME + "?forward=false", "DELETE", NULL_HEADERS, null, null, null))
.andReturn(new RestClient.HttpResponse<>(204, new
HashMap<String, String>(), null));
PowerMock.replayAll();
- connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
+ connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS,
FORWARD);
PowerMock.verifyAll();
}
@@ -327,7 +364,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
+ connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS,
FORWARD);
PowerMock.verifyAll();
}
@@ -341,7 +378,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- ConnectorInfo connInfo =
connectorsResource.getConnector(CONNECTOR_NAME, FORWARD);
+ ConnectorInfo connInfo =
connectorsResource.getConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE),
connInfo);
@@ -356,7 +393,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- Map<String, String> connConfig =
connectorsResource.getConnectorConfig(CONNECTOR_NAME, FORWARD);
+ Map<String, String> connConfig =
connectorsResource.getConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
assertEquals(CONNECTOR_CONFIG, connConfig);
PowerMock.verifyAll();
@@ -370,7 +407,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.getConnectorConfig(CONNECTOR_NAME, FORWARD);
+ connectorsResource.getConnectorConfig(CONNECTOR_NAME, NULL_HEADERS,
FORWARD);
PowerMock.verifyAll();
}
@@ -384,7 +421,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD,
CONNECTOR_CONFIG);
+ connectorsResource.putConnectorConfig(CONNECTOR_NAME, NULL_HEADERS,
FORWARD, CONNECTOR_CONFIG);
PowerMock.verifyAll();
}
@@ -400,7 +437,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- String rspLocation = connectorsResource.createConnector(FORWARD,
body).getLocation().toString();
+ String rspLocation = connectorsResource.createConnector(FORWARD,
NULL_HEADERS, body).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Assert.assertEquals("/connectors/" + CONNECTOR_NAME_SPECIAL_CHARS,
decoded);
@@ -418,7 +455,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- String rspLocation = connectorsResource.createConnector(FORWARD,
body).getLocation().toString();
+ String rspLocation = connectorsResource.createConnector(FORWARD,
NULL_HEADERS, body).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Assert.assertEquals("/connectors/" +
CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
@@ -435,7 +472,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- String rspLocation =
connectorsResource.putConnectorConfig(CONNECTOR_NAME_SPECIAL_CHARS, FORWARD,
CONNECTOR_CONFIG_SPECIAL_CHARS).getLocation().toString();
+ String rspLocation =
connectorsResource.putConnectorConfig(CONNECTOR_NAME_SPECIAL_CHARS,
NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_SPECIAL_CHARS).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Assert.assertEquals("/connectors/" + CONNECTOR_NAME_SPECIAL_CHARS,
decoded);
@@ -452,7 +489,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- String rspLocation =
connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1,
FORWARD, CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
+ String rspLocation =
connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1,
NULL_HEADERS, FORWARD,
CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
Assert.assertEquals("/connectors/" +
CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
@@ -463,7 +500,7 @@ public class ConnectorsResourceTest {
public void testPutConnectorConfigNameMismatch() throws Throwable {
Map<String, String> connConfig = new HashMap<>(CONNECTOR_CONFIG);
connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
- connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD,
connConfig);
+ connectorsResource.putConnectorConfig(CONNECTOR_NAME, NULL_HEADERS,
FORWARD, connConfig);
}
@Test(expected = BadRequestException.class)
@@ -471,7 +508,7 @@ public class ConnectorsResourceTest {
Map<String, String> connConfig = new HashMap<>();
connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
CreateConnectorRequest request = new
CreateConnectorRequest(CONNECTOR_NAME, connConfig);
- connectorsResource.createConnector(FORWARD, request);
+ connectorsResource.createConnector(FORWARD, NULL_HEADERS, request);
}
@Test
@@ -482,7 +519,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- List<TaskInfo> taskInfos =
connectorsResource.getTaskConfigs(CONNECTOR_NAME, FORWARD);
+ List<TaskInfo> taskInfos =
connectorsResource.getTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
assertEquals(TASK_INFOS, taskInfos);
PowerMock.verifyAll();
@@ -496,7 +533,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.getTaskConfigs(CONNECTOR_NAME, FORWARD);
+ connectorsResource.getTaskConfigs(CONNECTOR_NAME, NULL_HEADERS,
FORWARD);
PowerMock.verifyAll();
}
@@ -509,7 +546,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD,
TASK_CONFIGS);
+ connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS,
FORWARD, TASK_CONFIGS);
PowerMock.verifyAll();
}
@@ -522,7 +559,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD,
TASK_CONFIGS);
+ connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS,
FORWARD, TASK_CONFIGS);
PowerMock.verifyAll();
}
@@ -535,7 +572,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.restartConnector(CONNECTOR_NAME, FORWARD);
+ connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS,
FORWARD);
PowerMock.verifyAll();
}
@@ -547,12 +584,12 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb);
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/"
+ CONNECTOR_NAME + "/restart?forward=true"),
- EasyMock.eq("POST"), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+ EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(202, new
HashMap<String, String>(), null));
PowerMock.replayAll();
- connectorsResource.restartConnector(CONNECTOR_NAME, null);
+ connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS,
null);
PowerMock.verifyAll();
}
@@ -565,12 +602,12 @@ public class ConnectorsResourceTest {
expectAndCallbackException(cb, new NotAssignedException("not owner
test", ownerUrl));
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/"
+ CONNECTOR_NAME + "/restart?forward=false"),
- EasyMock.eq("POST"), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+ EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(202, new
HashMap<String, String>(), null));
PowerMock.replayAll();
- connectorsResource.restartConnector(CONNECTOR_NAME, true);
+ connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS,
true);
PowerMock.verifyAll();
}
@@ -584,7 +621,7 @@ public class ConnectorsResourceTest {
PowerMock.replayAll();
- connectorsResource.restartTask(CONNECTOR_NAME, 0, FORWARD);
+ connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS,
FORWARD);
PowerMock.verifyAll();
}
@@ -598,12 +635,12 @@ public class ConnectorsResourceTest {
expectAndCallbackNotLeaderException(cb);
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/"
+ CONNECTOR_NAME + "/tasks/0/restart?forward=true"),
- EasyMock.eq("POST"), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+ EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(202, new
HashMap<String, String>(), null));
PowerMock.replayAll();
- connectorsResource.restartTask(CONNECTOR_NAME, 0, null);
+ connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, null);
PowerMock.verifyAll();
}
@@ -618,12 +655,12 @@ public class ConnectorsResourceTest {
expectAndCallbackException(cb, new NotAssignedException("not owner
test", ownerUrl));
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/"
+ CONNECTOR_NAME + "/tasks/0/restart?forward=false"),
- EasyMock.eq("POST"), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+ EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(),
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
.andReturn(new RestClient.HttpResponse<>(202, new
HashMap<String, String>(), null));
PowerMock.replayAll();
- connectorsResource.restartTask(CONNECTOR_NAME, 0, true);
+ connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, true);
PowerMock.verifyAll();
}