This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 530bc59 KAFKA-4930: Enforce set of legal characters for connector
names (KIP-212)
530bc59 is described below
commit 530bc59de2732ba71e82c0110c59e9f6162531c6
Author: Soenke Liebau <[email protected]>
AuthorDate: Wed Jan 31 08:49:23 2018 -0800
KAFKA-4930: Enforce set of legal characters for connector names (KIP-212)
…to check for empty connector name and illegal characters in connector
name. This also fixes KAFKA-4938 by removing the check for slashes in
connector name from ConnectorsResource.
Author: Ewen Cheslack-Postava <[email protected]>
Author: Soenke Liebau <[email protected]>
Reviewers: Gwen Shapira <[email protected]>, Viktor Somogyi
<[email protected]>, Randall Hauch <[email protected]>, Ewen
Cheslack-Postava <[email protected]>
Closes #2755 from soenkeliebau/KAFKA-4930
---
.../org/apache/kafka/common/config/ConfigDef.java | 34 ++++++
.../apache/kafka/common/config/ConfigDefTest.java | 3 +
.../kafka/connect/runtime/ConnectorConfig.java | 3 +-
.../runtime/rest/resources/ConnectorsResource.java | 9 +-
.../kafka/connect/runtime/ConnectorConfigTest.java | 10 +-
.../rest/resources/ConnectorsResourceTest.java | 119 +++++++++++++++++++--
6 files changed, 163 insertions(+), 15 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 3080298..bb199dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -982,6 +982,40 @@ public class ConfigDef {
}
}
+ public static class NonEmptyStringWithoutControlChars implements Validator
{
+
+ public static NonEmptyStringWithoutControlChars
nonEmptyStringWithoutControlChars() {
+ return new NonEmptyStringWithoutControlChars();
+ }
+
+ @Override
+ public void ensureValid(String name, Object value) {
+ String s = (String) value;
+
+ if (s == null) {
+ // This can happen during creation of the config object due to
no default value being defined for the
+ // name configuration - a missing name parameter is caught
when checking for mandatory parameters,
+ // thus we can ok a null value here
+ return;
+ } else if (s.isEmpty()) {
+ throw new ConfigException(name, value, "String may not be
empty");
+ }
+
+ // Check name string for illegal characters
+ ArrayList<Integer> foundIllegalCharacters = new ArrayList<>();
+
+ for (int i = 0; i < s.length(); i++) {
+ if (Character.isISOControl(s.codePointAt(i))) {
+ foundIllegalCharacters.add(s.codePointAt(i));
+ }
+ }
+
+ if (!foundIllegalCharacters.isEmpty()) {
+ throw new ConfigException(name, value, "String may not contain
control sequences but had the following ASCII chars: " +
Utils.join(foundIllegalCharacters, ", "));
+ }
+ }
+ }
+
public static class ConfigKey {
public final String name;
public final Type type;
diff --git
a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 602147b..339c51a 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -160,6 +160,9 @@ public class ConfigDefTest {
testValidators(Type.LIST, ConfigDef.ValidList.in("1", "2", "3"), "1",
new Object[]{"1", "2", "3"}, new Object[]{"4", "5", "6"});
testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new
Object[]{"abb"}, new Object[] {null});
testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new
ConfigDef.NonNullValidator(), ValidString.in("a", "b")), "a", new Object[]{"a",
"b"}, new Object[] {null, -1, "c"});
+ testValidators(Type.STRING, new
ConfigDef.NonEmptyStringWithoutControlChars(), "defaultname",
+ new Object[]{"test", "name", "test/test", "test\u1234",
"\u1324name\\", "/+%>&):??<&()?-", "+1", "\uD83D\uDE01", "\uF3B1", " test
\n\r", "\n hello \t"},
+ new Object[]{"nontrailing\nnotallowed", "as\u0001cii control
char", "tes\rt", "test\btest", "1\t2", ""});
}
@Test
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index e63d100..aad12c3 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -37,6 +37,7 @@ import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static
org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
/**
* <p>
@@ -96,7 +97,7 @@ public class ConnectorConfig extends AbstractConfig {
public static ConfigDef configDef() {
return new ConfigDef()
- .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC,
COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
+ .define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP,
1, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH,
CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT,
atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3,
Width.SHORT, TASK_MAX_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null,
Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT,
KEY_CONVERTER_CLASS_DISPLAY)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 7a01168..e966104 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -90,10 +90,11 @@ public class ConnectorsResource {
@Path("/")
public Response createConnector(final @QueryParam("forward") Boolean
forward,
final CreateConnectorRequest
createRequest) throws Throwable {
- String name = createRequest.name();
- if (name.contains("/")) {
- throw new BadRequestException("connector name should not contain
'/'");
- }
+ // Trim leading and trailing whitespaces from the connector name,
replace null with empty string
+ // if no name element present to keep validation within validator
(NonEmptyStringWithoutControlChars
+ // allows null values)
+ String name = createRequest.name() == null ? "" :
createRequest.name().trim();
+
Map<String, String> configs = createRequest.config();
checkAndPutConnectorConfigName(name, configs);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index f8c4fd6..fe1bf26 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -91,6 +91,14 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>>
{
}
@Test(expected = ConfigException.class)
+ public void emptyConnectorName() {
+ Map<String, String> props = new HashMap<>();
+ props.put("name", "");
+ props.put("connector.class", TestConnector.class.getName());
+ new ConnectorConfig(MOCK_PLUGINS, props);
+ }
+
+ @Test(expected = ConfigException.class)
public void wrongTransformationType() {
Map<String, String> props = new HashMap<>();
props.put("name", "test");
@@ -168,5 +176,5 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
assertEquals(42, ((SimpleTransformation)
transformations.get(0)).magicNumber);
assertEquals(84, ((SimpleTransformation)
transformations.get(1)).magicNumber);
}
-
+
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 6e17349..cc46080 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -68,8 +68,11 @@ public class ConnectorsResourceTest {
// URL construction properly, avoiding //, which will mess up routing in
the REST server
private static final String LEADER_URL = "http://leader:8083/";
private static final String CONNECTOR_NAME = "test";
- private static final String CONNECTOR_NAME_SPECIAL_CHARS =
"t?a=b&c=d\rx=1.1\n>< \t`'\" x%y+z!#$&'()*+,:;=?@[]";
+ private static final String CONNECTOR_NAME_SPECIAL_CHARS =
"ta/b&c=d//\\rx=1þ.1>< `'\" x%y+z!ሴ#$&'(æ)*+,:;=?ñ@[]ÿ";
+ private static final String CONNECTOR_NAME_CONTROL_SEQUENCES1 =
"ta/b&c=drx=1\n.1>< `'\" x%y+z!#$&'()*+,:;=?@[]";
private static final String CONNECTOR2_NAME = "test2";
+ private static final String CONNECTOR_NAME_ALL_WHITESPACES = " \t\n \b";
+ private static final String CONNECTOR_NAME_PADDING_WHITESPACES = " " +
CONNECTOR_NAME + " \n ";
private static final Boolean FORWARD = true;
private static final Map<String, String> CONNECTOR_CONFIG_SPECIAL_CHARS =
new HashMap<>();
static {
@@ -82,6 +85,24 @@ public class ConnectorsResourceTest {
CONNECTOR_CONFIG.put("name", CONNECTOR_NAME);
CONNECTOR_CONFIG.put("sample_config", "test_config");
}
+
+ private static final Map<String, String>
CONNECTOR_CONFIG_CONTROL_SEQUENCES = new HashMap<>();
+ static {
+ CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("name",
CONNECTOR_NAME_CONTROL_SEQUENCES1);
+ CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("sample_config", "test_config");
+ }
+
+ private static final Map<String, String> CONNECTOR_CONFIG_WITHOUT_NAME =
new HashMap<>();
+ static {
+ CONNECTOR_CONFIG_WITHOUT_NAME.put("sample_config", "test_config");
+ }
+
+ private static final Map<String, String> CONNECTOR_CONFIG_WITH_EMPTY_NAME
= new HashMap<>();
+
+ static {
+ CONNECTOR_CONFIG_WITH_EMPTY_NAME.put(ConnectorConfig.NAME_CONFIG, "");
+ CONNECTOR_CONFIG_WITH_EMPTY_NAME.put("sample_config", "test_config");
+ }
private static final List<ConnectorTaskId> CONNECTOR_TASK_NAMES =
Arrays.asList(
new ConnectorTaskId(CONNECTOR_NAME, 0),
new ConnectorTaskId(CONNECTOR_NAME, 1)
@@ -108,6 +129,12 @@ public class ConnectorsResourceTest {
connectorsResource = new ConnectorsResource(herder, null);
}
+ private static final Map<String, String> getConnectorConfig(Map<String,
String> mapToClone) {
+ Map<String, String> result = new HashMap<>();
+ result.putAll(mapToClone);
+ return result;
+ }
+
@Test
public void testListConnectors() throws Throwable {
final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
@@ -206,20 +233,59 @@ public class ConnectorsResourceTest {
PowerMock.verifyAll();
}
- @Test(expected = BadRequestException.class)
- public void testCreateConnectorWithASlashInItsName() throws Throwable {
- String badConnectorName = CONNECTOR_NAME + "/" + "test";
+ @Test
+ public void testCreateConnectorNameTrimWhitespaces() throws Throwable {
+ // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes
it (puts the name in it) and this
+ // will affect later tests
+ Map<String, String> inputConfig =
getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
+ final CreateConnectorRequest bodyIn = new
CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES, inputConfig);
+ final CreateConnectorRequest bodyOut = new
CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG);
+
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
+ herder.putConnectorConfig(EasyMock.eq(bodyOut.name()),
EasyMock.eq(bodyOut.config()), EasyMock.eq(false), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(bodyOut.name(), bodyOut.config(), CONNECTOR_TASK_NAMES,
ConnectorType.SOURCE)));
+
+ PowerMock.replayAll();
+
+ connectorsResource.createConnector(FORWARD, bodyIn);
- CreateConnectorRequest body = new
CreateConnectorRequest(badConnectorName,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, badConnectorName));
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCreateConnectorNameAllWhitespaces() throws Throwable {
+ // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes
it (puts the name in it) and this
+ // will affect later tests
+ Map<String, String> inputConfig =
getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
+ final CreateConnectorRequest bodyIn = new
CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES, inputConfig);
+ final CreateConnectorRequest bodyOut = new CreateConnectorRequest("",
CONNECTOR_CONFIG_WITH_EMPTY_NAME);
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
- herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME),
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
- expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
- ConnectorType.SOURCE)));
+ herder.putConnectorConfig(EasyMock.eq(bodyOut.name()),
EasyMock.eq(bodyOut.config()), EasyMock.eq(false), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(bodyOut.name(), bodyOut.config(), CONNECTOR_TASK_NAMES,
ConnectorType.SOURCE)));
PowerMock.replayAll();
- connectorsResource.createConnector(FORWARD, body);
+ connectorsResource.createConnector(FORWARD, bodyIn);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCreateConnectorNoName() throws Throwable {
+ // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes
it (puts the name in it) and this
+ // will affect later tests
+ Map<String, String> inputConfig =
getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
+ final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null,
inputConfig);
+ final CreateConnectorRequest bodyOut = new CreateConnectorRequest("",
CONNECTOR_CONFIG_WITH_EMPTY_NAME);
+
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
+ herder.putConnectorConfig(EasyMock.eq(bodyOut.name()),
EasyMock.eq(bodyOut.config()), EasyMock.eq(false), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(bodyOut.name(), bodyOut.config(), CONNECTOR_TASK_NAMES,
ConnectorType.SOURCE)));
+
+ PowerMock.replayAll();
+
+ connectorsResource.createConnector(FORWARD, bodyIn);
PowerMock.verifyAll();
}
@@ -343,6 +409,24 @@ public class ConnectorsResourceTest {
}
@Test
+ public void testCreateConnectorWithControlSequenceInName() throws
Throwable {
+ CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME_CONTROL_SEQUENCES1,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME_CONTROL_SEQUENCES1));
+
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
+
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME_CONTROL_SEQUENCES1),
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1, CONNECTOR_CONFIG,
+ CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+
+ PowerMock.replayAll();
+
+ String rspLocation = connectorsResource.createConnector(FORWARD,
body).getLocation().toString();
+ String decoded = new URI(rspLocation).getPath();
+ Assert.assertEquals("/connectors/" +
CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testPutConnectorConfigWithSpecialCharsInName() throws
Throwable {
final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
@@ -359,6 +443,23 @@ public class ConnectorsResourceTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testPutConnectorConfigWithControlSequenceInName() throws
Throwable {
+ final Capture<Callback<Herder.Created<ConnectorInfo>>> cb =
Capture.newInstance();
+
+
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME_CONTROL_SEQUENCES1),
EasyMock.eq(CONNECTOR_CONFIG_CONTROL_SEQUENCES), EasyMock.eq(true),
EasyMock.capture(cb));
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1,
CONNECTOR_CONFIG_CONTROL_SEQUENCES, CONNECTOR_TASK_NAMES,
+ ConnectorType.SINK)));
+
+ PowerMock.replayAll();
+
+ String rspLocation =
connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1,
FORWARD, CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
+ String decoded = new URI(rspLocation).getPath();
+ Assert.assertEquals("/connectors/" +
CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
+
+ PowerMock.verifyAll();
+ }
+
@Test(expected = BadRequestException.class)
public void testPutConnectorConfigNameMismatch() throws Throwable {
Map<String, String> connConfig = new HashMap<>(CONNECTOR_CONFIG);
--
To stop receiving notification emails like this one, please contact
[email protected].