This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 70761b4  KAFKA-8404: Add HttpHeader to RestClient HTTP Request and 
Connector REST API (#6791)
70761b4 is described below

commit 70761b4b93479e53c277c4d427d9fc13e5ef19a8
Author: Hai-Dang Dam <damquanghaid...@gmail.com>
AuthorDate: Mon Jun 3 19:06:00 2019 -0700

    KAFKA-8404: Add HttpHeader to RestClient HTTP Request and Connector REST 
API (#6791)
    
    When Connect forwards a REST request from one worker to another, the 
Authorization header was not forwarded. This commit changes the Connect 
framework to add include the authorization header when forwarding requests to 
other workers.
    
    Author: Hai-Dang Dam <damquanghaid...@gmail.com>
    Reviewers: Robert Yokota <rayok...@gmail.com>, Randall Hauch 
<rha...@gmail.com>
    
    # Conflicts:
    #   
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
    #   
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
    
    # Conflicts:
    #   
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
---
 .../basic/auth/extension/JaasBasicAuthFilter.java  |  16 +--
 .../auth/extension/JaasBasicAuthFilterTest.java    |  34 +++++-
 .../runtime/distributed/DistributedHerder.java     |  11 +-
 .../kafka/connect/runtime/rest/RestClient.java     |  21 +++-
 .../runtime/rest/resources/ConnectorsResource.java |  49 +++++---
 .../resources/ConnectorPluginsResourceTest.java    |   3 +-
 .../rest/resources/ConnectorsResourceTest.java     | 123 ++++++++++++++-------
 7 files changed, 185 insertions(+), 72 deletions(-)

diff --git 
a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
 
b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
index 6167434..d5b15c6 100644
--- 
a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
+++ 
b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.connect.rest.basic.auth.extension;
 
+import java.util.regex.Pattern;
+import javax.ws.rs.HttpMethod;
 import org.apache.kafka.common.config.ConfigException;
 
 import java.io.IOException;
@@ -35,18 +37,18 @@ import javax.ws.rs.container.ContainerRequestFilter;
 import javax.ws.rs.core.Response;
 
 public class JaasBasicAuthFilter implements ContainerRequestFilter {
-
     private static final String CONNECT_LOGIN_MODULE = "KafkaConnect";
     static final String AUTHORIZATION = "Authorization";
-
+    private static final Pattern TASK_REQUEST_PATTERN = 
Pattern.compile("/?connectors/([^/]+)/tasks/?");
     @Override
     public void filter(ContainerRequestContext requestContext) throws 
IOException {
-
         try {
-            LoginContext loginContext =
-                new LoginContext(CONNECT_LOGIN_MODULE, new 
BasicAuthCallBackHandler(
-                    requestContext.getHeaderString(AUTHORIZATION)));
-            loginContext.login();
+            if (!(requestContext.getMethod().equals(HttpMethod.POST) && 
TASK_REQUEST_PATTERN.matcher(requestContext.getUriInfo().getPath()).matches())) 
{
+                LoginContext loginContext =
+                    new LoginContext(CONNECT_LOGIN_MODULE, new 
BasicAuthCallBackHandler(
+                        requestContext.getHeaderString(AUTHORIZATION)));
+                loginContext.login();
+            }
         } catch (LoginException | ConfigException e) {
             requestContext.abortWith(
                 Response.status(Response.Status.UNAUTHORIZED)
diff --git 
a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
 
b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
index d61fc06..c81f8f6 100644
--- 
a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
+++ 
b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.kafka.connect.rest.basic.auth.extension;
 
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.UriInfo;
 import org.apache.kafka.common.security.JaasUtils;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.MockStrict;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -52,6 +55,9 @@ public class JaasBasicAuthFilterTest {
     private String previousJaasConfig;
     private Configuration previousConfiguration;
 
+    @MockStrict
+    private UriInfo uriInfo;
+
     @Before
     public void setup() throws IOException {
         EasyMock.reset(requestContext);
@@ -137,7 +143,34 @@ public class JaasBasicAuthFilterTest {
         jaasBasicAuthFilter.filter(requestContext);
     }
 
+    @Test
+    public void testPostWithoutAppropriateCredential() throws IOException {
+        EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.POST);
+        EasyMock.expect(requestContext.getUriInfo()).andReturn(uriInfo);
+        
EasyMock.expect(uriInfo.getPath()).andReturn("connectors/connName/tasks");
+
+        PowerMock.replayAll();
+        jaasBasicAuthFilter.filter(requestContext);
+        EasyMock.verify(requestContext);
+    }
+
+    @Test
+    public void testPostNotChangingConnectorTask() throws IOException {
+        EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.POST);
+        EasyMock.expect(requestContext.getUriInfo()).andReturn(uriInfo);
+        
EasyMock.expect(uriInfo.getPath()).andReturn("local:randomport/connectors/connName");
+        String authHeader = "Basic" + 
Base64.getEncoder().encodeToString(("user" + ":" + "password").getBytes());
+        
EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
+            .andReturn(authHeader);
+        requestContext.abortWith(EasyMock.anyObject(Response.class));
+        EasyMock.expectLastCall();
+        PowerMock.replayAll();
+        jaasBasicAuthFilter.filter(requestContext);
+        EasyMock.verify(requestContext);
+    }
+
     private void setMock(String authorization, String username, String 
password, boolean exceptionCase) {
+        EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.GET);
         String authHeader = authorization + " " + 
Base64.getEncoder().encodeToString((username + ":" + password).getBytes());
         
EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
             .andReturn(authHeader);
@@ -152,7 +185,6 @@ public class JaasBasicAuthFilterTest {
         File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
         jaasConfigFile.deleteOnExit();
         previousJaasConfig = 
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
-
         List<String> lines;
         lines = new ArrayList<>();
         lines.add(loginModule + " { 
org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule 
required ");
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 7edc3b2..b229102 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1023,8 +1023,15 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                         @Override
                         public void run() {
                             try {
-                                String reconfigUrl = 
RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks");
-                                RestClient.httpRequest(reconfigUrl, "POST", 
rawTaskProps, null, config);
+                                String leaderUrl = leaderUrl();
+                                if (leaderUrl == null || 
leaderUrl.trim().isEmpty()) {
+                                    cb.onCompletion(new 
ConnectException("Request to leader to " +
+                                            "reconfigure connector tasks 
failed " +
+                                            "because the URL of the leader's 
REST interface is empty!"), null);
+                                    return;
+                                }
+                                String reconfigUrl = 
RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks");
+                                RestClient.httpRequest(reconfigUrl, "POST", 
null, rawTaskProps, null, config);
                                 cb.onCompletion(null, null);
                             } catch (ConnectException e) {
                                 log.error("Request to leader to reconfigure 
connector tasks failed", e);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index 15e8418..de11f26 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.rest;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import javax.ws.rs.core.HttpHeaders;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@@ -50,12 +51,13 @@ public class RestClient {
      *
      * @param url             HTTP connection will be established with this 
url.
      * @param method          HTTP method ("GET", "POST", "PUT", etc.)
+     * @param headers         HTTP headers from REST endpoint
      * @param requestBodyData Object to serialize as JSON and send in the 
request body.
      * @param responseFormat  Expected format of the response to the HTTP 
request.
      * @param <T>             The type of the deserialized response to the 
HTTP request.
      * @return The deserialized response to the HTTP request, or null if no 
data is expected.
      */
-    public static <T> HttpResponse<T> httpRequest(String url, String method, 
Object requestBodyData,
+    public static <T> HttpResponse<T> httpRequest(String url, String method, 
HttpHeaders headers, Object requestBodyData,
                                                   TypeReference<T> 
responseFormat, WorkerConfig config) {
         HttpClient client;
 
@@ -82,6 +84,8 @@ public class RestClient {
             req.method(method);
             req.accept("application/json");
             req.agent("kafka-connect");
+            addHeadersToRequest(headers, req);
+
             if (serializedBody != null) {
                 req.content(new StringContentProvider(serializedBody, 
StandardCharsets.UTF_8), "application/json");
             }
@@ -116,6 +120,21 @@ public class RestClient {
         }
     }
 
+
+    /**
+     * Extract headers from REST call and add to client request
+     * @param headers         Headers from REST endpoint
+     * @param req             The client request to modify
+     */
+    private static void addHeadersToRequest(HttpHeaders headers, Request req) {
+        if (headers != null) {
+            String credentialAuthorization = 
headers.getHeaderString(HttpHeaders.AUTHORIZATION);
+            if (credentialAuthorization != null) {
+                req.header(HttpHeaders.AUTHORIZATION, credentialAuthorization);
+            }
+        }
+    }
+
     /**
      * Convert response parameters from Jetty format (HttpFields)
      * @param httpFields
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 4a04512..26a09ea 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+
+import javax.ws.rs.core.HttpHeaders;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -44,6 +46,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
@@ -79,16 +82,18 @@ public class ConnectorsResource {
 
     @GET
     @Path("/")
-    public Collection<String> listConnectors(final @QueryParam("forward") 
Boolean forward) throws Throwable {
+    public Collection<String> listConnectors(final @QueryParam("forward") 
Boolean forward,
+                                             final @Context HttpHeaders 
headers) throws Throwable {
         FutureCallback<Collection<String>> cb = new FutureCallback<>();
         herder.connectors(cb);
-        return completeOrForwardRequest(cb, "/connectors", "GET", null, new 
TypeReference<Collection<String>>() {
+        return completeOrForwardRequest(cb, "/connectors", "GET", headers, 
null, new TypeReference<Collection<String>>() {
         }, forward);
     }
 
     @POST
     @Path("/")
     public Response createConnector(final @QueryParam("forward") Boolean 
forward,
+                                    final @Context HttpHeaders headers,
                                     final CreateConnectorRequest 
createRequest) throws Throwable {
         // Trim leading and trailing whitespaces from the connector name, 
replace null with empty string
         // if no name element present to keep validation within validator 
(NonEmptyStringWithoutControlChars
@@ -100,7 +105,7 @@ public class ConnectorsResource {
 
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>();
         herder.putConnectorConfig(name, configs, false, cb);
-        Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, 
"/connectors", "POST", createRequest,
+        Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, 
"/connectors", "POST", headers, createRequest,
                 new TypeReference<ConnectorInfo>() { }, new 
CreatedConnectorInfoTranslator(), forward);
 
         URI location = UriBuilder.fromUri("/connectors").path(name).build();
@@ -110,19 +115,21 @@ public class ConnectorsResource {
     @GET
     @Path("/{connector}")
     public ConnectorInfo getConnector(final @PathParam("connector") String 
connector,
+                                      final @Context HttpHeaders headers,
                                       final @QueryParam("forward") Boolean 
forward) throws Throwable {
         FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
         herder.connectorInfo(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", 
null, forward);
+        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", 
headers, null, forward);
     }
 
     @GET
     @Path("/{connector}/config")
     public Map<String, String> getConnectorConfig(final 
@PathParam("connector") String connector,
+                                                  final @Context HttpHeaders 
headers,
                                                   final @QueryParam("forward") 
Boolean forward) throws Throwable {
         FutureCallback<Map<String, String>> cb = new FutureCallback<>();
         herder.connectorConfig(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config", "GET", null, forward);
+        return completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config", "GET", headers, null, forward);
     }
 
     @GET
@@ -134,6 +141,7 @@ public class ConnectorsResource {
     @PUT
     @Path("/{connector}/config")
     public Response putConnectorConfig(final @PathParam("connector") String 
connector,
+                                       final @Context HttpHeaders headers,
                                        final @QueryParam("forward") Boolean 
forward,
                                        final Map<String, String> 
connectorConfig) throws Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>();
@@ -141,7 +149,7 @@ public class ConnectorsResource {
 
         herder.putConnectorConfig(connector, connectorConfig, true, cb);
         Herder.Created<ConnectorInfo> createdInfo = 
completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
-                "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { 
}, new CreatedConnectorInfoTranslator(), forward);
+                "PUT", headers, connectorConfig, new 
TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), 
forward);
         Response.ResponseBuilder response;
         if (createdInfo.created()) {
             URI location = 
UriBuilder.fromUri("/connectors").path(connector).build();
@@ -155,15 +163,16 @@ public class ConnectorsResource {
     @POST
     @Path("/{connector}/restart")
     public void restartConnector(final @PathParam("connector") String 
connector,
+                                 final @Context HttpHeaders headers,
                                  final @QueryParam("forward") Boolean forward) 
throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartConnector(connector, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", 
"POST", null, forward);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", 
"POST", headers, null, forward);
     }
 
     @PUT
     @Path("/{connector}/pause")
-    public Response pauseConnector(@PathParam("connector") String connector) {
+    public Response pauseConnector(@PathParam("connector") String connector, 
final @Context HttpHeaders headers) {
         herder.pauseConnector(connector);
         return Response.accepted().build();
     }
@@ -178,26 +187,29 @@ public class ConnectorsResource {
     @GET
     @Path("/{connector}/tasks")
     public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String 
connector,
+                                         final @Context HttpHeaders headers,
                                          final @QueryParam("forward") Boolean 
forward) throws Throwable {
         FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
         herder.taskConfigs(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + 
"/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
+        return completeOrForwardRequest(cb, "/connectors/" + connector + 
"/tasks", "GET", headers, null, new TypeReference<List<TaskInfo>>() {
         }, forward);
     }
 
     @POST
     @Path("/{connector}/tasks")
     public void putTaskConfigs(final @PathParam("connector") String connector,
+                               final @Context HttpHeaders headers,
                                final @QueryParam("forward") Boolean forward,
                                final List<Map<String, String>> taskConfigs) 
throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.putTaskConfigs(connector, taskConfigs, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", 
"POST", taskConfigs, forward);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", 
"POST", headers, taskConfigs, forward);
     }
 
     @GET
     @Path("/{connector}/tasks/{task}/status")
     public ConnectorStateInfo.TaskState getTaskStatus(final 
@PathParam("connector") String connector,
+                                                      final @Context 
HttpHeaders headers,
                                                       final @PathParam("task") 
Integer task) throws Throwable {
         return herder.taskStatus(new ConnectorTaskId(connector, task));
     }
@@ -206,20 +218,22 @@ public class ConnectorsResource {
     @Path("/{connector}/tasks/{task}/restart")
     public void restartTask(final @PathParam("connector") String connector,
                             final @PathParam("task") Integer task,
+                            final @Context HttpHeaders headers,
                             final @QueryParam("forward") Boolean forward) 
throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
         ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
         herder.restartTask(taskId, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + 
task + "/restart", "POST", null, forward);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + 
task + "/restart", "POST", headers, null, forward);
     }
 
     @DELETE
     @Path("/{connector}")
     public void destroyConnector(final @PathParam("connector") String 
connector,
+                                 final @Context HttpHeaders headers,
                                  final @QueryParam("forward") Boolean forward) 
throws Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>();
         herder.deleteConnectorConfig(connector, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", 
null, forward);
+        completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", 
headers, null, forward);
     }
 
     // Check whether the connector name from the url matches the one (if there 
is one) provided in the connectorconfig
@@ -239,6 +253,7 @@ public class ConnectorsResource {
     private <T, U> T completeOrForwardRequest(FutureCallback<T> cb,
                                               String path,
                                               String method,
+                                              HttpHeaders headers,
                                               Object body,
                                               TypeReference<U> resultType,
                                               Translator<T, U> translator,
@@ -261,7 +276,7 @@ public class ConnectorsResource {
                             .build()
                             .toString();
                     log.debug("Forwarding request {} {} {}", forwardUrl, 
method, body);
-                    return 
translator.translate(RestClient.httpRequest(forwardUrl, method, body, 
resultType, config));
+                    return 
translator.translate(RestClient.httpRequest(forwardUrl, method, headers, body, 
resultType, config));
                 } else {
                     // we should find the right target for the query within 
two hops, so if
                     // we don't, it probably means that a rebalance has taken 
place.
@@ -283,14 +298,14 @@ public class ConnectorsResource {
         }
     }
 
-    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, 
String method, Object body,
+    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, 
String method, HttpHeaders headers, Object body,
                                            TypeReference<T> resultType, 
Boolean forward) throws Throwable {
-        return completeOrForwardRequest(cb, path, method, body, resultType, 
new IdentityTranslator<T>(), forward);
+        return completeOrForwardRequest(cb, path, method, headers, body, 
resultType, new IdentityTranslator<T>(), forward);
     }
 
-    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, 
String method,
+    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, 
String method, HttpHeaders headers,
                                            Object body, Boolean forward) 
throws Throwable {
-        return completeOrForwardRequest(cb, path, method, body, null, new 
IdentityTranslator<T>(), forward);
+        return completeOrForwardRequest(cb, path, method, headers, body, null, 
new IdentityTranslator<T>(), forward);
     }
 
     private interface Translator<T, U> {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index ad360b6..67cae67 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import javax.ws.rs.core.HttpHeaders;
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -179,7 +180,7 @@ public class ConnectorPluginsResourceTest {
     @Before
     public void setUp() throws Exception {
         PowerMock.mockStatic(RestClient.class,
-                RestClient.class.getMethod("httpRequest", String.class, 
String.class, Object.class, TypeReference.class, WorkerConfig.class));
+                RestClient.class.getMethod("httpRequest", String.class, 
String.class, HttpHeaders.class, Object.class, TypeReference.class, 
WorkerConfig.class));
 
         plugins = PowerMock.createMock(Plugins.class);
         herder = PowerMock.createMock(AbstractHerder.class);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index f84cd25..ba5a2c3 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 
+import javax.ws.rs.core.HttpHeaders;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
@@ -75,6 +76,7 @@ public class ConnectorsResourceTest {
     private static final String CONNECTOR_NAME_PADDING_WHITESPACES = "   " + 
CONNECTOR_NAME + "  \n  ";
     private static final Boolean FORWARD = true;
     private static final Map<String, String> CONNECTOR_CONFIG_SPECIAL_CHARS = 
new HashMap<>();
+    private static final HttpHeaders NULL_HEADERS = null;
     static {
         CONNECTOR_CONFIG_SPECIAL_CHARS.put("name", 
CONNECTOR_NAME_SPECIAL_CHARS);
         CONNECTOR_CONFIG_SPECIAL_CHARS.put("sample_config", "test_config");
@@ -125,7 +127,7 @@ public class ConnectorsResourceTest {
     @Before
     public void setUp() throws NoSuchMethodException {
         PowerMock.mockStatic(RestClient.class,
-                RestClient.class.getMethod("httpRequest", String.class, 
String.class, Object.class, TypeReference.class, WorkerConfig.class));
+                RestClient.class.getMethod("httpRequest", String.class, 
String.class, HttpHeaders.class, Object.class, TypeReference.class, 
WorkerConfig.class));
         connectorsResource = new ConnectorsResource(herder, null);
     }
 
@@ -142,7 +144,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        Collection<String> connectors = 
connectorsResource.listConnectors(FORWARD);
+        Collection<String> connectors = 
connectorsResource.listConnectors(FORWARD, NULL_HEADERS);
         // Ordering isn't guaranteed, compare sets
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, 
CONNECTOR2_NAME)), new HashSet<>(connectors));
 
@@ -156,15 +158,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
         
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false";),
 EasyMock.eq("GET"),
-                EasyMock.isNull(), EasyMock.anyObject(TypeReference.class), 
EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.anyObject(TypeReference.class), 
EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(200, new 
HashMap<String, String>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)));
 
         PowerMock.replayAll();
 
-        Collection<String> connectors = 
connectorsResource.listConnectors(FORWARD);
-        // Ordering isn't guaranteed, compare sets
-        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, 
CONNECTOR2_NAME)), new HashSet<>(connectors));
-
+        Collection<String> connectors = 
connectorsResource.listConnectors(FORWARD, NULL_HEADERS);
         PowerMock.verifyAll();
     }
 
@@ -177,7 +176,7 @@ public class ConnectorsResourceTest {
         PowerMock.replayAll();
 
         // throws
-        connectorsResource.listConnectors(FORWARD);
+        connectorsResource.listConnectors(FORWARD, NULL_HEADERS);
     }
 
     @Test
@@ -191,7 +190,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, body);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
 
         PowerMock.verifyAll();
     }
@@ -204,19 +203,57 @@ public class ConnectorsResourceTest {
         herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
-        
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false";),
 EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject(), 
EasyMock.anyObject(WorkerConfig.class)))
+        
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false";),
 EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.eq(body), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(201, new 
HashMap<String, String>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, 
CONNECTOR_TASK_NAMES,
                     ConnectorType.SOURCE)));
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, body);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
 
         PowerMock.verifyAll();
 
 
     }
 
+    @Test
+    public void testCreateConnectorWithHeaderAuthorization() throws Throwable {
+        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME, 
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = 
Capture.newInstance();
+        HttpHeaders httpHeaders = EasyMock.mock(HttpHeaders.class);
+        
EasyMock.expect(httpHeaders.getHeaderString("Authorization")).andReturn("Basic 
YWxhZGRpbjpvcGVuc2VzYW1l").times(1);
+        EasyMock.replay(httpHeaders);
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new 
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
+            CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(FORWARD, httpHeaders, body);
+
+        PowerMock.verifyAll();
+    }
+
+
+
+    @Test
+    public void testCreateConnectorWithoutHeaderAuthorization() throws 
Throwable {
+        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME, 
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = 
Capture.newInstance();
+        HttpHeaders httpHeaders = EasyMock.mock(HttpHeaders.class);
+        
EasyMock.expect(httpHeaders.getHeaderString("Authorization")).andReturn(null).times(1);
+        EasyMock.replay(httpHeaders);
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new 
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
+            CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(FORWARD, httpHeaders, body);
+
+        PowerMock.verifyAll();
+    }
+
     @Test(expected = AlreadyExistsException.class)
     public void testCreateConnectorExists() throws Throwable {
         CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME, 
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
@@ -227,7 +264,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, body);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
 
         PowerMock.verifyAll();
     }
@@ -246,7 +283,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, bodyIn);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
 
         PowerMock.verifyAll();
     }
@@ -265,7 +302,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, bodyIn);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
 
         PowerMock.verifyAll();
     }
@@ -284,7 +321,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, bodyIn);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
 
         PowerMock.verifyAll();
     }
@@ -297,7 +334,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
+        connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, 
FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -308,12 +345,12 @@ public class ConnectorsResourceTest {
         herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
-        
EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/"; + 
CONNECTOR_NAME + "?forward=false", "DELETE", null, null, null))
+        
EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/"; + 
CONNECTOR_NAME + "?forward=false", "DELETE", NULL_HEADERS, null, null, null))
                 .andReturn(new RestClient.HttpResponse<>(204, new 
HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
+        connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, 
FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -327,7 +364,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
+        connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, 
FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -341,7 +378,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        ConnectorInfo connInfo = 
connectorsResource.getConnector(CONNECTOR_NAME, FORWARD);
+        ConnectorInfo connInfo = 
connectorsResource.getConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
         assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, 
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE),
             connInfo);
 
@@ -356,7 +393,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        Map<String, String> connConfig = 
connectorsResource.getConnectorConfig(CONNECTOR_NAME, FORWARD);
+        Map<String, String> connConfig = 
connectorsResource.getConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
         assertEquals(CONNECTOR_CONFIG, connConfig);
 
         PowerMock.verifyAll();
@@ -370,7 +407,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.getConnectorConfig(CONNECTOR_NAME, FORWARD);
+        connectorsResource.getConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, 
FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -384,7 +421,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, 
CONNECTOR_CONFIG);
+        connectorsResource.putConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, 
FORWARD, CONNECTOR_CONFIG);
 
         PowerMock.verifyAll();
     }
@@ -400,7 +437,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        String rspLocation = connectorsResource.createConnector(FORWARD, 
body).getLocation().toString();
+        String rspLocation = connectorsResource.createConnector(FORWARD, 
NULL_HEADERS, body).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
         Assert.assertEquals("/connectors/" + CONNECTOR_NAME_SPECIAL_CHARS, 
decoded);
 
@@ -418,7 +455,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        String rspLocation = connectorsResource.createConnector(FORWARD, 
body).getLocation().toString();
+        String rspLocation = connectorsResource.createConnector(FORWARD, 
NULL_HEADERS, body).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
         Assert.assertEquals("/connectors/" + 
CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
 
@@ -435,7 +472,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        String rspLocation = 
connectorsResource.putConnectorConfig(CONNECTOR_NAME_SPECIAL_CHARS, FORWARD, 
CONNECTOR_CONFIG_SPECIAL_CHARS).getLocation().toString();
+        String rspLocation = 
connectorsResource.putConnectorConfig(CONNECTOR_NAME_SPECIAL_CHARS, 
NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_SPECIAL_CHARS).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
         Assert.assertEquals("/connectors/" + CONNECTOR_NAME_SPECIAL_CHARS, 
decoded);
 
@@ -452,7 +489,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        String rspLocation = 
connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1, 
FORWARD, CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
+        String rspLocation = 
connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1, 
NULL_HEADERS, FORWARD, 
CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
         Assert.assertEquals("/connectors/" + 
CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
 
@@ -463,7 +500,7 @@ public class ConnectorsResourceTest {
     public void testPutConnectorConfigNameMismatch() throws Throwable {
         Map<String, String> connConfig = new HashMap<>(CONNECTOR_CONFIG);
         connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
-        connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, 
connConfig);
+        connectorsResource.putConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, 
FORWARD, connConfig);
     }
 
     @Test(expected = BadRequestException.class)
@@ -471,7 +508,7 @@ public class ConnectorsResourceTest {
         Map<String, String> connConfig = new HashMap<>();
         connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
         CreateConnectorRequest request = new 
CreateConnectorRequest(CONNECTOR_NAME, connConfig);
-        connectorsResource.createConnector(FORWARD, request);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, request);
     }
 
     @Test
@@ -482,7 +519,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        List<TaskInfo> taskInfos = 
connectorsResource.getTaskConfigs(CONNECTOR_NAME, FORWARD);
+        List<TaskInfo> taskInfos = 
connectorsResource.getTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
         assertEquals(TASK_INFOS, taskInfos);
 
         PowerMock.verifyAll();
@@ -496,7 +533,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.getTaskConfigs(CONNECTOR_NAME, FORWARD);
+        connectorsResource.getTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, 
FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -509,7 +546,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD, 
TASK_CONFIGS);
+        connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, 
FORWARD, TASK_CONFIGS);
 
         PowerMock.verifyAll();
     }
@@ -522,7 +559,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD, 
TASK_CONFIGS);
+        connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, 
FORWARD, TASK_CONFIGS);
 
         PowerMock.verifyAll();
     }
@@ -535,7 +572,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.restartConnector(CONNECTOR_NAME, FORWARD);
+        connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, 
FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -547,12 +584,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackNotLeaderException(cb);
 
         
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/";
 + CONNECTOR_NAME + "/restart?forward=true"),
-                EasyMock.eq("POST"), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new 
HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.restartConnector(CONNECTOR_NAME, null);
+        connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, 
null);
 
         PowerMock.verifyAll();
     }
@@ -565,12 +602,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackException(cb, new NotAssignedException("not owner 
test", ownerUrl));
 
         
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/";
 + CONNECTOR_NAME + "/restart?forward=false"),
-                EasyMock.eq("POST"), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new 
HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.restartConnector(CONNECTOR_NAME, true);
+        connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, 
true);
 
         PowerMock.verifyAll();
     }
@@ -584,7 +621,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.restartTask(CONNECTOR_NAME, 0, FORWARD);
+        connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, 
FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -598,12 +635,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackNotLeaderException(cb);
 
         
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/";
 + CONNECTOR_NAME + "/tasks/0/restart?forward=true"),
-                EasyMock.eq("POST"), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new 
HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.restartTask(CONNECTOR_NAME, 0, null);
+        connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, null);
 
         PowerMock.verifyAll();
     }
@@ -618,12 +655,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackException(cb, new NotAssignedException("not owner 
test", ownerUrl));
 
         
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/";
 + CONNECTOR_NAME + "/tasks/0/restart?forward=false"),
-                EasyMock.eq("POST"), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), 
EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new 
HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.restartTask(CONNECTOR_NAME, 0, true);
+        connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, true);
 
         PowerMock.verifyAll();
     }

Reply via email to