Repository: kafka Updated Branches: refs/heads/trunk be2918b3a -> 5a2960f81
KAFKA-5563: Standardize validation and substitution of connector names in REST API connector configs â¦from config to own function and added check to create connector call. Author: Soenke Liebau <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #4230 from soenkeliebau/KAFKA-5563 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a2960f8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a2960f8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a2960f8 Branch: refs/heads/trunk Commit: 5a2960f811c27f59d78dfdb99c7c3c6eeed16c4b Parents: be2918b Author: Soenke Liebau <[email protected]> Authored: Sat Nov 25 17:50:17 2017 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Sat Nov 25 17:50:17 2017 -0800 ---------------------------------------------------------------------- .../rest/resources/ConnectorsResource.java | 23 ++++++++++++-------- .../rest/resources/ConnectorsResourceTest.java | 8 +++++++ 2 files changed, 22 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2960f8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index e681a68..2c03124 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -92,8 +92,7 @@ public class ConnectorsResource { throw new BadRequestException("connector name should not contain '/'"); } Map<String, String> configs = createRequest.config(); - if (!configs.containsKey(ConnectorConfig.NAME_CONFIG)) - configs.put(ConnectorConfig.NAME_CONFIG, name); + checkAndPutConnectorConfigName(name, configs); FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); herder.putConnectorConfig(name, configs, false, cb); @@ -134,13 +133,7 @@ public class ConnectorsResource { final @QueryParam("forward") Boolean forward, final Map<String, String> connectorConfig) throws Throwable { FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(); - String includedName = connectorConfig.get(ConnectorConfig.NAME_CONFIG); - if (includedName != null) { - if (!includedName.equals(connector)) - throw new BadRequestException("Connector name configuration (" + includedName + ") doesn't match connector name in the URL (" + connector + ")"); - } else { - connectorConfig.put(ConnectorConfig.NAME_CONFIG, connector); - } + checkAndPutConnectorConfigName(connector, connectorConfig); herder.putConnectorConfig(connector, connectorConfig, true, cb); Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config", @@ -225,6 +218,18 @@ public class ConnectorsResource { completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward); } + // Check whether the connector name from the url matches the one (if there is one) provided in the connectorconfig + // object. Throw BadRequestException on mismatch, otherwise put connectorname in config + private void checkAndPutConnectorConfigName(String connectorName, Map<String, String> connectorConfig) { + String includedName = connectorConfig.get(ConnectorConfig.NAME_CONFIG); + if (includedName != null) { + if (!includedName.equals(connectorName)) + throw new BadRequestException("Connector name configuration (" + includedName + ") doesn't match connector name in the URL (" + connectorName + ")"); + } else { + connectorConfig.put(ConnectorConfig.NAME_CONFIG, connectorName); + } + } + // Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the // request to the leader. private <T, U> T completeOrForwardRequest(FutureCallback<T> cb, http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2960f8/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index cb86143..89a2218 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -365,6 +365,14 @@ public class ConnectorsResourceTest { connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, connConfig); } + @Test(expected = BadRequestException.class) + public void testCreateConnectorConfigNameMismatch() throws Throwable { + Map<String, String> connConfig = new HashMap<>(); + connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name"); + CreateConnectorRequest request = new CreateConnectorRequest(CONNECTOR_NAME, connConfig); + connectorsResource.createConnector(FORWARD, request); + } + @Test public void testGetConnectorTaskConfigs() throws Throwable { final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance();
