Repository: kafka
Updated Branches:
  refs/heads/trunk c7f9bd2a6 -> bc5051565


KAFKA-3605: Return error if connector config includes mismatching connector 
name.

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Jason Gustafson

Closes #1253 from ewencp/kafka-3605-connector-name-mismatch


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc505156
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc505156
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc505156

Branch: refs/heads/trunk
Commit: bc5051565171cf65b4ed7dd4d9ef269d66a1021a
Parents: c7f9bd2
Author: Ewen Cheslack-Postava <[email protected]>
Authored: Fri Apr 22 07:09:14 2016 -0700
Committer: Gwen Shapira <[email protected]>
Committed: Fri Apr 22 07:09:14 2016 -0700

----------------------------------------------------------------------
 .../runtime/distributed/DistributedHerder.java      | 16 +++-------------
 .../runtime/rest/resources/ConnectorsResource.java  |  9 +++++++++
 .../rest/resources/ConnectorsResourceTest.java      |  8 ++++++++
 3 files changed, 20 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bc505156/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 3aa6c33..cbef186 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -435,16 +435,6 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     @Override
     public void putConnectorConfig(final String connName, final Map<String, 
String> config, final boolean allowReplace,
                                    final Callback<Created<ConnectorInfo>> 
callback) {
-        final Map<String, String> connConfig;
-        if (config == null) {
-            connConfig = null;
-        } else if (!config.containsKey(ConnectorConfig.NAME_CONFIG)) {
-            connConfig = new HashMap<>(config);
-            connConfig.put(ConnectorConfig.NAME_CONFIG, connName);
-        } else {
-            connConfig = config;
-        }
-
         log.trace("Submitting connector config write request {}", connName);
 
         addRequest(
@@ -463,7 +453,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                             return null;
                         }
 
-                        if (connConfig == null) {
+                        if (config == null) {
                             if (!exists) {
                                 callback.onCompletion(new 
NotFoundException("Connector " + connName + " not found"), null);
                             } else {
@@ -475,11 +465,11 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                         }
 
                         log.trace("Submitting connector config {} {} {}", 
connName, allowReplace, configState.connectors());
-                        configBackingStore.putConnectorConfig(connName, 
connConfig);
+                        configBackingStore.putConnectorConfig(connName, 
config);
 
                         // Note that we use the updated connector config 
despite the fact that we don't have an updated
                         // snapshot yet. The existing task info should still 
be accurate.
-                        ConnectorInfo info = new ConnectorInfo(connName, 
connConfig, configState.tasks(connName));
+                        ConnectorInfo info = new ConnectorInfo(connName, 
config, configState.tasks(connName));
                         callback.onCompletion(null, new Created<>(!exists, 
info));
                         return null;
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc505156/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index fc713ca..2ec35f4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.servlet.ServletContext;
+import javax.ws.rs.BadRequestException;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
@@ -129,6 +130,14 @@ public class ConnectorsResource {
                                        final @QueryParam("forward") Boolean 
forward,
                                        final Map<String, String> 
connectorConfig) throws Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>();
+        String includedName = connectorConfig.get(ConnectorConfig.NAME_CONFIG);
+        if (includedName != null) {
+            if (!includedName.equals(connector))
+                throw new BadRequestException("Connector name configuration (" 
+ includedName + ") doesn't match connector name in the URL (" + connector + 
")");
+        } else {
+            connectorConfig.put(ConnectorConfig.NAME_CONFIG, connector);
+        }
+
         herder.putConnectorConfig(connector, connectorConfig, true, cb);
         Herder.Created<ConnectorInfo> createdInfo = 
completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
                 "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { 
}, new CreatedConnectorInfoTranslator(), forward);

http://git-wip-us.apache.org/repos/asf/kafka/blob/bc505156/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index fa7d997..aa1b9a7 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -44,6 +44,7 @@ import 
org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import javax.ws.rs.BadRequestException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -290,6 +291,13 @@ public class ConnectorsResourceTest {
         PowerMock.verifyAll();
     }
 
+    @Test(expected = BadRequestException.class)
+    public void testPutConnectorConfigNameMismatch() throws Throwable {
+        Map<String, String> connConfig = new HashMap<>(CONNECTOR_CONFIG);
+        connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
+        connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, 
connConfig);
+    }
+
     @Test
     public void testGetConnectorTaskConfigs() throws Throwable {
         final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance();

Reply via email to