Repository: kafka Updated Branches: refs/heads/trunk c7f9bd2a6 -> bc5051565
KAFKA-3605: Return error if connector config includes mismatching connector name. Author: Ewen Cheslack-Postava <[email protected]> Reviewers: Jason Gustafson Closes #1253 from ewencp/kafka-3605-connector-name-mismatch Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc505156 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc505156 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc505156 Branch: refs/heads/trunk Commit: bc5051565171cf65b4ed7dd4d9ef269d66a1021a Parents: c7f9bd2 Author: Ewen Cheslack-Postava <[email protected]> Authored: Fri Apr 22 07:09:14 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Fri Apr 22 07:09:14 2016 -0700 ---------------------------------------------------------------------- .../runtime/distributed/DistributedHerder.java | 16 +++------------- .../runtime/rest/resources/ConnectorsResource.java | 9 +++++++++ .../rest/resources/ConnectorsResourceTest.java | 8 ++++++++ 3 files changed, 20 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bc505156/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 3aa6c33..cbef186 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -435,16 +435,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @Override public void putConnectorConfig(final String connName, final Map<String, String> config, final boolean allowReplace, final Callback<Created<ConnectorInfo>> callback) { - final Map<String, String> connConfig; - if (config == null) { - connConfig = null; - } else if (!config.containsKey(ConnectorConfig.NAME_CONFIG)) { - connConfig = new HashMap<>(config); - connConfig.put(ConnectorConfig.NAME_CONFIG, connName); - } else { - connConfig = config; - } - log.trace("Submitting connector config write request {}", connName); addRequest( @@ -463,7 +453,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { return null; } - if (connConfig == null) { + if (config == null) { if (!exists) { callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); } else { @@ -475,11 +465,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors()); - configBackingStore.putConnectorConfig(connName, connConfig); + configBackingStore.putConnectorConfig(connName, config); // Note that we use the updated connector config despite the fact that we don't have an updated // snapshot yet. The existing task info should still be accurate. - ConnectorInfo info = new ConnectorInfo(connName, connConfig, configState.tasks(connName)); + ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName)); callback.onCompletion(null, new Created<>(!exists, info)); return null; } http://git-wip-us.apache.org/repos/asf/kafka/blob/bc505156/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index fc713ca..2ec35f4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.servlet.ServletContext; +import javax.ws.rs.BadRequestException; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.GET; @@ -129,6 +130,14 @@ public class ConnectorsResource { final @QueryParam("forward") Boolean forward, final Map<String, String> connectorConfig) throws Throwable { FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); + String includedName = connectorConfig.get(ConnectorConfig.NAME_CONFIG); + if (includedName != null) { + if (!includedName.equals(connector)) + throw new BadRequestException("Connector name configuration (" + includedName + ") doesn't match connector name in the URL (" + connector + ")"); + } else { + connectorConfig.put(ConnectorConfig.NAME_CONFIG, connector); + } + herder.putConnectorConfig(connector, connectorConfig, true, cb); Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward); http://git-wip-us.apache.org/repos/asf/kafka/blob/bc505156/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index fa7d997..aa1b9a7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -44,6 +44,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import javax.ws.rs.BadRequestException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -290,6 +291,13 @@ public class ConnectorsResourceTest { PowerMock.verifyAll(); } + @Test(expected = BadRequestException.class) + public void testPutConnectorConfigNameMismatch() throws Throwable { + Map<String, String> connConfig = new HashMap<>(CONNECTOR_CONFIG); + connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name"); + connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, connConfig); + } + @Test public void testGetConnectorTaskConfigs() throws Throwable { final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance();
