This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1bc4de75a2f KAFKA-15470: Allow creating connectors in a stopped state
(#14704)
1bc4de75a2f is described below
commit 1bc4de75a2f870c31ad0025ff4feab063c5a5267
Author: Yash Mayya <[email protected]>
AuthorDate: Wed Nov 15 06:07:50 2023 +0000
KAFKA-15470: Allow creating connectors in a stopped state (#14704)
Reviewers: Chris Egerton <[email protected]>
---
bin/connect-standalone.sh | 2 +-
checkstyle/import-control.xml | 1 +
.../kafka/connect/cli/ConnectStandalone.java | 84 +++++++-
.../org/apache/kafka/connect/runtime/Herder.java | 13 ++
.../runtime/distributed/DistributedHerder.java | 8 +-
.../rest/entities/CreateConnectorRequest.java | 49 ++++-
.../runtime/rest/resources/ConnectorsResource.java | 2 +-
.../runtime/standalone/StandaloneHerder.java | 11 +-
.../kafka/connect/storage/ConfigBackingStore.java | 4 +-
.../connect/storage/KafkaConfigBackingStore.java | 36 +++-
.../connect/storage/MemoryConfigBackingStore.java | 19 +-
.../kafka/connect/cli/ConnectStandaloneTest.java | 127 +++++++++++
.../integration/ConnectWorkerIntegrationTest.java | 233 ++++++++++++++++++++-
.../integration/RestForwardingIntegrationTest.java | 5 +-
.../StandaloneWorkerIntegrationTest.java | 53 +++++
.../runtime/distributed/DistributedHerderTest.java | 51 ++++-
.../rest/entities/CreateConnectorRequestTest.java | 53 +++++
.../rest/resources/ConnectorsResourceTest.java | 90 ++++++--
.../runtime/standalone/StandaloneHerderTest.java | 31 +++
.../storage/KafkaConfigBackingStoreTest.java | 71 ++++++-
.../storage/MemoryConfigBackingStoreTest.java | 32 ++-
.../connect/util/clusters/EmbeddedConnect.java | 33 ++-
docs/connect.html | 6 +-
23 files changed, 929 insertions(+), 85 deletions(-)
diff --git a/bin/connect-standalone.sh b/bin/connect-standalone.sh
index 441069fed31..bef78d658fd 100755
--- a/bin/connect-standalone.sh
+++ b/bin/connect-standalone.sh
@@ -16,7 +16,7 @@
if [ $# -lt 1 ];
then
- echo "USAGE: $0 [-daemon] connect-standalone.properties"
+ echo "USAGE: $0 [-daemon] connect-standalone.properties
[connector1.properties connector2.json ...]"
exit 1
fi
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 0db1700c612..9e34d1e4029 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -534,6 +534,7 @@
</subpackage>
<subpackage name="cli">
+ <allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.connect.runtime" />
<allow pkg="org.apache.kafka.connect.storage" />
<allow pkg="org.apache.kafka.connect.util" />
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index c1977de3df9..7068377a900 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -16,20 +16,26 @@
*/
package org.apache.kafka.connect.cli;
+import com.fasterxml.jackson.core.exc.StreamReadException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DatabindException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
@@ -38,9 +44,14 @@ import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
import java.util.Collections;
import java.util.Map;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+
/**
* <p>
* Command line utility that runs Kafka Connect as a standalone process. In
this mode, work (connectors and tasks) is not
@@ -61,23 +72,24 @@ public class ConnectStandalone extends
AbstractConnectCli<StandaloneConfig> {
@Override
protected String usage() {
- return "ConnectStandalone worker.properties [connector1.properties
connector2.properties ...]";
+ return "ConnectStandalone worker.properties [connector1.properties
connector2.json ...]";
}
@Override
protected void processExtraArgs(Herder herder, Connect connect, String[]
extraArgs) {
try {
- for (final String connectorPropsFile : extraArgs) {
- Map<String, String> connectorProps =
Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+ for (final String connectorConfigFile : extraArgs) {
+ CreateConnectorRequest createConnectorRequest =
parseConnectorConfigurationFile(connectorConfigFile);
FutureCallback<Herder.Created<ConnectorInfo>> cb = new
FutureCallback<>((error, info) -> {
if (error != null)
- log.error("Failed to create connector for {}",
connectorPropsFile);
+ log.error("Failed to create connector for {}",
connectorConfigFile);
else
log.info("Created connector {}", info.result().name());
});
herder.putConnectorConfig(
- connectorProps.get(ConnectorConfig.NAME_CONFIG),
- connectorProps, false, cb);
+ createConnectorRequest.name(),
createConnectorRequest.config(),
+ createConnectorRequest.initialTargetState(),
+ false, cb);
cb.get();
}
} catch (Throwable t) {
@@ -87,6 +99,64 @@ public class ConnectStandalone extends
AbstractConnectCli<StandaloneConfig> {
}
}
+ /**
+ * Parse a connector configuration file into a {@link
CreateConnectorRequest}. The file can have any one of the following formats
(note that
+ * we attempt to parse the file in this order):
+ * <ol>
+ * <li>A JSON file containing an Object with only String keys and
values that represent the connector configuration.</li>
+ * <li>A JSON file containing an Object that can be parsed directly
into a {@link CreateConnectorRequest}</li>
+ * <li>A valid Java Properties file (i.e. containing String key/value
pairs representing the connector configuration)</li>
+ * </ol>
+ * <p>
+ * Visible for testing.
+ *
+ * @param filePath the path of the connector configuration file
+ * @return the parsed connector configuration in the form of a {@link
CreateConnectorRequest}
+ */
+ CreateConnectorRequest parseConnectorConfigurationFile(String filePath)
throws IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ File connectorConfigurationFile = Paths.get(filePath).toFile();
+ try {
+ Map<String, String> connectorConfigs = objectMapper.readValue(
+ connectorConfigurationFile,
+ new TypeReference<Map<String, String>>() { });
+
+ if (!connectorConfigs.containsKey(NAME_CONFIG)) {
+ throw new ConnectException("Connector configuration at '" +
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+ + "configuration");
+ }
+ return new
CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs,
null);
+ } catch (StreamReadException | DatabindException e) {
+ log.debug("Could not parse connector configuration file '{}' into
a Map with String keys and values", filePath);
+ }
+
+ try {
+
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ CreateConnectorRequest createConnectorRequest =
objectMapper.readValue(connectorConfigurationFile,
+ new TypeReference<CreateConnectorRequest>() { });
+ if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
+ if
(!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name()))
{
+ throw new ConnectException("Connector name configuration
in 'config' doesn't match the one specified in 'name' at '" + filePath
+ + "'");
+ }
+ } else {
+ createConnectorRequest.config().put(NAME_CONFIG,
createConnectorRequest.name());
+ }
+ return createConnectorRequest;
+ } catch (StreamReadException | DatabindException e) {
+ log.debug("Could not parse connector configuration file '{}' into
an object of type {}",
+ filePath, CreateConnectorRequest.class.getSimpleName());
+ }
+
+ Map<String, String> connectorConfigs =
Utils.propsToStringMap(Utils.loadProps(filePath));
+ if (!connectorConfigs.containsKey(NAME_CONFIG)) {
+ throw new ConnectException("Connector configuration at '" +
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+ + "configuration");
+ }
+ return new CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG),
connectorConfigs, null);
+ }
+
@Override
protected Herder createHerder(StandaloneConfig config, String workerId,
Plugins plugins,
ConnectorClientConfigOverridePolicy
connectorClientConfigOverridePolicy,
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 566a5c4c096..a8a6e7858d8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -108,6 +108,19 @@ public interface Herder {
*/
void putConnectorConfig(String connName, Map<String, String> config,
boolean allowReplace, Callback<Created<ConnectorInfo>> callback);
+ /**
+ * Set the configuration for a connector, along with a target state
optionally. This supports creation and updating.
+ * @param connName name of the connector
+ * @param config the connector's configuration
+ * @param targetState the desired target state for the connector; may be
{@code null} if no target state change is desired. Note that the default
+ * target state is {@link TargetState#STARTED} if no
target state exists previously
+ * @param allowReplace if true, allow overwriting previous configs; if
false, throw {@link AlreadyExistsException}
+ * if a connector with the same name already exists
+ * @param callback callback to invoke when the configuration has been
written
+ */
+ void putConnectorConfig(String connName, Map<String, String> config,
TargetState targetState, boolean allowReplace,
+ Callback<Created<ConnectorInfo>> callback);
+
/**
* Delete a connector and its configuration.
* @param connName name of the connector
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 48b5fad3423..8ae19dd3a7b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1051,6 +1051,12 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
@Override
public void putConnectorConfig(final String connName, final Map<String,
String> config, final boolean allowReplace,
final Callback<Created<ConnectorInfo>>
callback) {
+ putConnectorConfig(connName, config, null, allowReplace, callback);
+ }
+
+ @Override
+ public void putConnectorConfig(final String connName, final Map<String,
String> config, final TargetState targetState,
+ final boolean allowReplace, final
Callback<Created<ConnectorInfo>> callback) {
log.trace("Submitting connector config write request {}", connName);
addRequest(
() -> {
@@ -1081,7 +1087,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
}
log.trace("Submitting connector config {} {} {}",
connName, allowReplace, configState.connectors());
- writeToConfigTopicAsLeader(() ->
configBackingStore.putConnectorConfig(connName, config));
+ writeToConfigTopicAsLeader(() ->
configBackingStore.putConnectorConfig(connName, config, targetState));
// Note that we use the updated connector config
despite the fact that we don't have an updated
// snapshot yet. The existing task info should
still be accurate.
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
index 1c52d8db997..da8e235e424 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
@@ -18,18 +18,23 @@ package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.connect.runtime.TargetState;
+import java.util.Locale;
import java.util.Map;
import java.util.Objects;
public class CreateConnectorRequest {
private final String name;
private final Map<String, String> config;
+ private final InitialState initialState;
@JsonCreator
- public CreateConnectorRequest(@JsonProperty("name") String name,
@JsonProperty("config") Map<String, String> config) {
+ public CreateConnectorRequest(@JsonProperty("name") String name,
@JsonProperty("config") Map<String, String> config,
+ @JsonProperty("initial_state") InitialState
initialState) {
this.name = name;
this.config = config;
+ this.initialState = initialState;
}
@JsonProperty
@@ -42,17 +47,55 @@ public class CreateConnectorRequest {
return config;
}
+ @JsonProperty
+ public InitialState initialState() {
+ return initialState;
+ }
+
+ public TargetState initialTargetState() {
+ if (initialState != null) {
+ return initialState.toTargetState();
+ } else {
+ return null;
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CreateConnectorRequest that = (CreateConnectorRequest) o;
return Objects.equals(name, that.name) &&
- Objects.equals(config, that.config);
+ Objects.equals(config, that.config) &&
+ Objects.equals(initialState, that.initialState);
}
@Override
public int hashCode() {
- return Objects.hash(name, config);
+ return Objects.hash(name, config, initialState);
+ }
+
+ public enum InitialState {
+ RUNNING,
+ PAUSED,
+ STOPPED;
+
+ @JsonCreator
+ public static InitialState forValue(String value) {
+ return InitialState.valueOf(value.toUpperCase(Locale.ROOT));
+ }
+
+ public TargetState toTargetState() {
+ switch (this) {
+ case RUNNING:
+ return TargetState.STARTED;
+ case PAUSED:
+ return TargetState.PAUSED;
+ case STOPPED:
+ return TargetState.STOPPED;
+ default:
+ throw new IllegalArgumentException("Unknown initial state:
" + this);
+ }
+ }
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 4878b8df9e1..75e510ef9ad 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -145,7 +145,7 @@ public class ConnectorsResource implements ConnectResource {
checkAndPutConnectorConfigName(name, configs);
FutureCallback<Herder.Created<ConnectorInfo>> cb = new
FutureCallback<>();
- herder.putConnectorConfig(name, configs, false, cb);
+ herder.putConnectorConfig(name, configs,
createRequest.initialTargetState(), false, cb);
Herder.Created<ConnectorInfo> info =
requestHandler.completeOrForwardRequest(cb, "/connectors", "POST", headers,
createRequest,
new TypeReference<ConnectorInfo>() { }, new
CreatedConnectorInfoTranslator(), forward);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 0da89b2f668..40e19da19c7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -183,6 +183,12 @@ public class StandaloneHerder extends AbstractHerder {
final Map<String, String>
config,
boolean allowReplace,
final
Callback<Created<ConnectorInfo>> callback) {
+ putConnectorConfig(connName, config, null, allowReplace, callback);
+ }
+
+ @Override
+ public void putConnectorConfig(final String connName, final Map<String,
String> config, final TargetState targetState,
+ final boolean allowReplace, final
Callback<Created<ConnectorInfo>> callback) {
try {
validateConnectorConfig(config, (error, configInfos) -> {
if (error != null) {
@@ -191,7 +197,7 @@ public class StandaloneHerder extends AbstractHerder {
}
requestExecutorService.submit(
- () -> putConnectorConfig(connName, config, allowReplace,
callback, configInfos)
+ () -> putConnectorConfig(connName, config, targetState,
allowReplace, callback, configInfos)
);
});
} catch (Throwable t) {
@@ -201,6 +207,7 @@ public class StandaloneHerder extends AbstractHerder {
private synchronized void putConnectorConfig(String connName,
final Map<String, String>
config,
+ TargetState targetState,
boolean allowReplace,
final
Callback<Created<ConnectorInfo>> callback,
ConfigInfos configInfos) {
@@ -221,7 +228,7 @@ public class StandaloneHerder extends AbstractHerder {
created = true;
}
- configBackingStore.putConnectorConfig(connName, config);
+ configBackingStore.putConnectorConfig(connName, config,
targetState);
startConnector(connName, (error, result) -> {
if (error != null) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
index c869c545f80..9f12ab0d3cf 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
@@ -56,8 +56,10 @@ public interface ConfigBackingStore {
* Update the configuration for a connector.
* @param connector name of the connector
* @param properties the connector configuration
+ * @param targetState the desired target state for the connector; may be
{@code null} if no target state change is desired. Note that the default
+ * target state is {@link TargetState#STARTED} if no
target state exists previously
*/
- void putConnectorConfig(String connector, Map<String, String> properties);
+ void putConnectorConfig(String connector, Map<String, String> properties,
TargetState targetState);
/**
* Remove configuration for a connector
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index a95c8249942..35d43ea3cca 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -497,26 +497,34 @@ public class KafkaConfigBackingStore extends
KafkaTopicBasedBackingStore impleme
}
/**
- * Write this connector configuration to persistent storage and wait until
it has been acknowledged and read back by
- * tailing the Kafka log with a consumer. {@link #claimWritePrivileges()}
must be successfully invoked before calling
+ * Write this connector configuration (and optionally a target state) to
persistent storage and wait until it has been acknowledged and read
+ * back by tailing the Kafka log with a consumer. {@link
#claimWritePrivileges()} must be successfully invoked before calling
* this method if the worker is configured to use a fencable producer for
writes to the config topic.
*
* @param connector name of the connector to write data for
* @param properties the configuration to write
+ * @param targetState the desired target state for the connector; may be
{@code null} if no target state change is desired. Note that the default
+ * target state is {@link TargetState#STARTED} if no
target state exists previously
* @throws IllegalStateException if {@link #claimWritePrivileges()} is
required, but was not successfully invoked before
* this method was called
* @throws PrivilegedWriteException if the worker is configured to use a
fencable producer for writes to the config topic
* and the write fails
*/
@Override
- public void putConnectorConfig(String connector, Map<String, String>
properties) {
+ public void putConnectorConfig(String connector, Map<String, String>
properties, TargetState targetState) {
log.debug("Writing connector configuration for connector '{}'",
connector);
Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
connectConfig.put("properties", properties);
byte[] serializedConfig = converter.fromConnectData(topic,
CONNECTOR_CONFIGURATION_V0, connectConfig);
try {
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
- sendPrivileged(CONNECTOR_KEY(connector), serializedConfig, timer);
+ List<ProducerKeyValue> keyValues = new ArrayList<>();
+ if (targetState != null) {
+ log.debug("Writing target state {} for connector {}",
targetState, connector);
+ keyValues.add(new
ProducerKeyValue(TARGET_STATE_KEY(connector),
serializeTargetState(targetState)));
+ }
+ keyValues.add(new ProducerKeyValue(CONNECTOR_KEY(connector),
serializedConfig));
+ sendPrivileged(keyValues, timer);
configLog.readToEnd().get(timer.remainingMs(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException
e) {
log.error("Failed to write connector configuration to Kafka: ", e);
@@ -647,20 +655,24 @@ public class KafkaConfigBackingStore extends
KafkaTopicBasedBackingStore impleme
*/
@Override
public void putTargetState(String connector, TargetState state) {
- Struct connectTargetState = new Struct(TARGET_STATE_V1);
- // Older workers don't support the STOPPED state; fall back on PAUSED
- connectTargetState.put("state", state == STOPPED ? PAUSED.name() :
state.name());
- connectTargetState.put("state.v2", state.name());
- byte[] serializedTargetState = converter.fromConnectData(topic,
TARGET_STATE_V1, connectTargetState);
log.debug("Writing target state {} for connector {}", state,
connector);
try {
- configLog.sendWithReceipt(TARGET_STATE_KEY(connector),
serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ configLog.sendWithReceipt(TARGET_STATE_KEY(connector),
serializeTargetState(state))
+ .get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException
e) {
log.error("Failed to write target state to Kafka", e);
throw new ConnectException("Error writing target state to Kafka",
e);
}
}
+ private byte[] serializeTargetState(TargetState state) {
+ Struct connectTargetState = new Struct(TARGET_STATE_V1);
+ // Older workers don't support the STOPPED state; fall back on PAUSED
+ connectTargetState.put("state", state == STOPPED ? PAUSED.name() :
state.name());
+ connectTargetState.put("state.v2", state.name());
+ return converter.fromConnectData(topic, TARGET_STATE_V1,
connectTargetState);
+ }
+
/**
* Write a task count record for a connector to persistent storage and
wait until it has been acknowledged and read back by
* tailing the Kafka log with a consumer. {@link #claimWritePrivileges()}
must be successfully invoked before calling this method
@@ -985,7 +997,9 @@ public class KafkaConfigBackingStore extends
KafkaTopicBasedBackingStore impleme
// Note that we do not notify the update listener if the target state
has been removed.
// Instead we depend on the removal callback of the connector config
itself to notify the worker.
- if (started && !removed && stateChanged)
+ // We also don't notify the update listener if the connector doesn't
exist yet - a scenario that can
+ // occur if an explicit initial target state is specified in the
connector creation request.
+ if (started && !removed && stateChanged &&
connectorConfigs.containsKey(connectorName))
updateListener.onConnectorTargetStateChange(connectorName);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index 52c360c3b33..3b9ba966ca2 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -92,12 +92,16 @@ public class MemoryConfigBackingStore implements
ConfigBackingStore {
}
@Override
- public synchronized void putConnectorConfig(String connector, Map<String,
String> properties) {
+ public synchronized void putConnectorConfig(String connector, Map<String,
String> properties, TargetState targetState) {
ConnectorState state = connectors.get(connector);
if (state == null)
- connectors.put(connector, new ConnectorState(properties));
- else
+ connectors.put(connector, new ConnectorState(properties,
targetState));
+ else {
state.connConfig = properties;
+ if (targetState != null) {
+ state.targetState = targetState;
+ }
+ }
if (updateListener != null)
updateListener.onConnectorConfigUpdate(connector);
@@ -184,8 +188,13 @@ public class MemoryConfigBackingStore implements
ConfigBackingStore {
private Map<String, String> connConfig;
private Map<ConnectorTaskId, Map<String, String>> taskConfigs;
- public ConnectorState(Map<String, String> connConfig) {
- this.targetState = TargetState.STARTED;
+ /**
+ * @param connConfig the connector's configuration
+ * @param targetState the connector's initial {@link TargetState}; may
be {@code null} in which case the default initial target state
+ * {@link TargetState#STARTED} will be used
+ */
+ public ConnectorState(Map<String, String> connConfig, TargetState
targetState) {
+ this.targetState = targetState == null ? TargetState.STARTED :
targetState;
this.connConfig = connConfig;
this.taskConfigs = new HashMap<>();
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/cli/ConnectStandaloneTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/cli/ConnectStandaloneTest.java
new file mode 100644
index 00000000000..9a762f1a394
--- /dev/null
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/cli/ConnectStandaloneTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ConnectStandaloneTest {
+
+ private static final String CONNECTOR_NAME = "test-connector";
+ private static final Map<String, String> CONNECTOR_CONFIG = new
HashMap<>();
+ static {
+ CONNECTOR_CONFIG.put(NAME_CONFIG, CONNECTOR_NAME);
+ CONNECTOR_CONFIG.put("key1", "val1");
+ CONNECTOR_CONFIG.put("key2", "val2");
+ }
+
+ private final ConnectStandalone connectStandalone = new
ConnectStandalone();
+ private File connectorConfigurationFile;
+
+ @Before
+ public void setUp() throws IOException {
+ connectorConfigurationFile = TestUtils.tempFile();
+ }
+
+ @Test
+ public void testParseJavaPropertiesFile() throws Exception {
+ Properties properties = new Properties();
+ CONNECTOR_CONFIG.forEach(properties::setProperty);
+
+ try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+ properties.store(writer, null);
+ }
+
+ CreateConnectorRequest request =
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+ assertEquals(CONNECTOR_NAME, request.name());
+ assertEquals(CONNECTOR_CONFIG, request.config());
+ assertNull(request.initialState());
+ }
+
+ @Test
+ public void testParseJsonFileWithConnectorConfiguration() throws Exception
{
+ try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+ writer.write(new
ObjectMapper().writeValueAsString(CONNECTOR_CONFIG));
+ }
+
+ CreateConnectorRequest request =
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+ assertEquals(CONNECTOR_NAME, request.name());
+ assertEquals(CONNECTOR_CONFIG, request.config());
+ assertNull(request.initialState());
+ }
+
+ @Test
+ public void testParseJsonFileWithCreateConnectorRequest() throws Exception
{
+ CreateConnectorRequest requestToWrite = new CreateConnectorRequest(
+ CONNECTOR_NAME,
+ CONNECTOR_CONFIG,
+ CreateConnectorRequest.InitialState.STOPPED
+ );
+
+ try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+ writer.write(new
ObjectMapper().writeValueAsString(requestToWrite));
+ }
+
+ CreateConnectorRequest parsedRequest =
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+ assertEquals(requestToWrite, parsedRequest);
+ }
+
+ @Test
+ public void
testParseJsonFileWithCreateConnectorRequestWithoutInitialState() throws
Exception {
+ Map<String, Object> requestToWrite = new HashMap<>();
+ requestToWrite.put("name", CONNECTOR_NAME);
+ requestToWrite.put("config", CONNECTOR_CONFIG);
+
+ try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+ writer.write(new
ObjectMapper().writeValueAsString(requestToWrite));
+ }
+
+ CreateConnectorRequest parsedRequest =
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+ CreateConnectorRequest expectedRequest = new
CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG, null);
+ assertEquals(expectedRequest, parsedRequest);
+ }
+
+ @Test
+ public void testParseJsonFileWithCreateConnectorRequestWithUnknownField()
throws Exception {
+ Map<String, Object> requestToWrite = new HashMap<>();
+ requestToWrite.put("name", CONNECTOR_NAME);
+ requestToWrite.put("config", CONNECTOR_CONFIG);
+ requestToWrite.put("unknown-field", "random-value");
+
+ try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+ writer.write(new
ObjectMapper().writeValueAsString(requestToWrite));
+ }
+
+ CreateConnectorRequest parsedRequest =
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+ CreateConnectorRequest expectedRequest = new
CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG, null);
+ assertEquals(expectedRequest, parsedRequest);
+ }
+}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index cd9b9c05171..8a271603eef 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -16,8 +16,11 @@
*/
package org.apache.kafka.connect.integration;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
@@ -49,14 +52,15 @@ import static
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_
import static
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static
org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
-import static
org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -73,7 +77,7 @@ public class ConnectWorkerIntegrationTest {
private static final int NUM_WORKERS = 3;
private static final int NUM_TASKS = 4;
private static final int MESSAGES_PER_POLL = 10;
- private static final String CONNECTOR_NAME = "simple-source";
+ private static final String CONNECTOR_NAME = "simple-connector";
private static final String TOPIC_NAME = "test-topic";
private EmbeddedConnectCluster.Builder connectBuilder;
@@ -529,8 +533,10 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
- connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
- "Initial group of workers did not start in time.");
+ connect.assertions().assertAtLeastNumWorkersAreUp(
+ NUM_WORKERS,
+ "Initial group of workers did not start in time."
+ );
connect.configureConnector(CONNECTOR_NAME,
defaultSourceConnectorProps(TOPIC_NAME));
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
@@ -546,10 +552,227 @@ public class ConnectWorkerIntegrationTest {
assertEquals(Level.WARN.toString(), logEvents.get(0).getLevel());
assertThat(logEvents.get(0).getMessage(),
containsString("deprecated"));
}
+
+ }
+
+ @Test
+ public void testCreateConnectorWithPausedInitialState() throws Exception {
+ connect = connectBuilder.build();
+ // start the clusters
+ connect.start();
+
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
+
+ CreateConnectorRequest createConnectorRequest = new
CreateConnectorRequest(
+ CONNECTOR_NAME,
+ defaultSourceConnectorProps(TOPIC_NAME),
+ CreateConnectorRequest.InitialState.PAUSED
+ );
+ connect.configureConnector(createConnectorRequest);
+
+ // Verify that the connector's status is PAUSED and also that no tasks
were spawned for the connector
+ connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+ CONNECTOR_NAME,
+ 0,
+ "Connector was not created in a paused state"
+ );
+ assertEquals(Collections.emptyList(),
connect.connectorInfo(CONNECTOR_NAME).tasks());
+ assertEquals(Collections.emptyList(),
connect.taskConfigs(CONNECTOR_NAME));
+
+ // Verify that a connector created in the PAUSED state can be resumed
successfully
+ connect.resumeConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME,
+ NUM_TASKS,
+ "Connector or tasks did not start running healthily in time"
+ );
+ }
+
+ @Test
+ public void
testCreateSourceConnectorWithStoppedInitialStateAndModifyOffsets() throws
Exception {
+ connect = connectBuilder.build();
+ // start the clusters
+ connect.start();
+
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
+
+ Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+
+ // Configure the connector to produce a maximum of 10 messages
+ props.put("max.messages", "10");
+ props.put(TASKS_MAX_CONFIG, "1");
+ CreateConnectorRequest createConnectorRequest = new
CreateConnectorRequest(
+ CONNECTOR_NAME,
+ props,
+ CreateConnectorRequest.InitialState.STOPPED
+ );
+ connect.configureConnector(createConnectorRequest);
+
+ // Verify that the connector's status is STOPPED and also that no
tasks were spawned for the connector
+ connect.assertions().assertConnectorIsStopped(
+ CONNECTOR_NAME,
+ "Connector was not created in a stopped state"
+ );
+ assertEquals(Collections.emptyList(),
connect.connectorInfo(CONNECTOR_NAME).tasks());
+ assertEquals(Collections.emptyList(),
connect.taskConfigs(CONNECTOR_NAME));
+
+ // Verify that the offsets can be modified for a source connector
created in the STOPPED state
+
+ // Alter the offsets so that only 5 messages are produced
+ connect.alterSourceConnectorOffset(
+ CONNECTOR_NAME,
+ Collections.singletonMap("task.id", CONNECTOR_NAME + "-0"),
+ Collections.singletonMap("saved", 5L)
+ );
+
+ // Verify that a connector created in the STOPPED state can be resumed
successfully
+ connect.resumeConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME,
+ 1,
+ "Connector or tasks did not start running healthily in time"
+ );
+
+ // Verify that only 5 messages were produced. We verify this by
consuming all the messages from the topic after we've already ensured that at
+ // least 5 messages can be consumed.
+ long timeoutMs = TimeUnit.SECONDS.toMillis(10);
+ connect.kafka().consume(5, timeoutMs, TOPIC_NAME);
+ assertEquals(5, connect.kafka().consumeAll(timeoutMs,
TOPIC_NAME).count());
+ }
+
+ @Test
+ public void
testCreateSinkConnectorWithStoppedInitialStateAndModifyOffsets() throws
Exception {
+ connect = connectBuilder.build();
+ // start the clusters
+ connect.start();
+
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
+
+ // Create topic and produce 10 messages
+ connect.kafka().createTopic(TOPIC_NAME);
+ for (int i = 0; i < 10; i++) {
+ connect.kafka().produce(TOPIC_NAME, "Message " + i);
+ }
+
+ Map<String, String> props = defaultSinkConnectorProps(TOPIC_NAME);
+ props.put(TASKS_MAX_CONFIG, "1");
+
+ CreateConnectorRequest createConnectorRequest = new
CreateConnectorRequest(
+ CONNECTOR_NAME,
+ props,
+ CreateConnectorRequest.InitialState.STOPPED
+ );
+ connect.configureConnector(createConnectorRequest);
+
+ // Verify that the connector's status is STOPPED and also that no
tasks were spawned for the connector
+ connect.assertions().assertConnectorIsStopped(
+ CONNECTOR_NAME,
+ "Connector was not created in a stopped state"
+ );
+ assertEquals(Collections.emptyList(),
connect.connectorInfo(CONNECTOR_NAME).tasks());
+ assertEquals(Collections.emptyList(),
connect.taskConfigs(CONNECTOR_NAME));
+
+ // Verify that the offsets can be modified for a sink connector
created in the STOPPED state
+
+ // Alter the offsets so that the first 5 messages in the topic are
skipped
+ connect.alterSinkConnectorOffset(CONNECTOR_NAME, new
TopicPartition(TOPIC_NAME, 0), 5L);
+
+ // This will cause the connector task to fail if it encounters a
record with offset < 5
+ TaskHandle taskHandle =
RuntimeHandles.get().connectorHandle(CONNECTOR_NAME).taskHandle(CONNECTOR_NAME
+ "-0",
+ sinkRecord -> {
+ if (sinkRecord.kafkaOffset() < 5L) {
+ throw new ConnectException("Unexpected record encountered:
" + sinkRecord);
+ }
+ });
+
+ // We produced 10 records and altered the connector offsets to skip
over the first 5, so we expect 5 records to be consumed
+ taskHandle.expectedRecords(5);
+
+ // Verify that a connector created in the STOPPED state can be resumed
successfully
+ connect.resumeConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME,
+ 1,
+ "Connector or tasks did not start running healthily in time"
+ );
+
+ taskHandle.awaitRecords(TimeUnit.SECONDS.toMillis(10));
+
+ // Confirm that the task is still running (i.e. it didn't fail due to
encountering any records with offset < 5)
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME,
+ 1,
+ "Connector or tasks did not start running healthily in time"
+ );
+ }
+
+ @Test
+ public void testDeleteConnectorCreatedWithPausedOrStoppedInitialState()
throws Exception {
+ connect = connectBuilder.build();
+ // start the clusters
+ connect.start();
+
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
+
+ // Create a connector with PAUSED initial state
+ CreateConnectorRequest createConnectorRequest = new
CreateConnectorRequest(
+ CONNECTOR_NAME,
+ defaultSourceConnectorProps(TOPIC_NAME),
+ CreateConnectorRequest.InitialState.PAUSED
+ );
+ connect.configureConnector(createConnectorRequest);
+
+ // Verify that the connector's status is PAUSED and also that no tasks
were spawned for the connector
+ connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+ CONNECTOR_NAME,
+ 0,
+ "Connector was not created in a paused state"
+ );
+ assertEquals(Collections.emptyList(),
connect.connectorInfo(CONNECTOR_NAME).tasks());
+ assertEquals(Collections.emptyList(),
connect.taskConfigs(CONNECTOR_NAME));
+
+ // Verify that a connector created in the PAUSED state can be deleted
successfully
+ connect.deleteConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME,
"Connector wasn't deleted in time");
+
+
+ // Create a connector with STOPPED initial state
+ createConnectorRequest = new CreateConnectorRequest(
+ CONNECTOR_NAME,
+ defaultSourceConnectorProps(TOPIC_NAME),
+ CreateConnectorRequest.InitialState.STOPPED
+ );
+ connect.configureConnector(createConnectorRequest);
+
+ // Verify that the connector's status is STOPPED and also that no
tasks were spawned for the connector
+ connect.assertions().assertConnectorIsStopped(
+ CONNECTOR_NAME,
+ "Connector was not created in a stopped state"
+ );
+ assertEquals(Collections.emptyList(),
connect.connectorInfo(CONNECTOR_NAME).tasks());
+ assertEquals(Collections.emptyList(),
connect.taskConfigs(CONNECTOR_NAME));
+
+ // Verify that a connector created in the STOPPED state can be deleted
successfully
+ connect.deleteConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME,
"Connector wasn't deleted in time");
+ }
+
+ private Map<String, String> defaultSinkConnectorProps(String topics) {
+ // setup props for the sink connector
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG,
MonitorableSinkConnector.class.getSimpleName());
+ props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+ props.put(TOPICS_CONFIG, topics);
+
+ return props;
}
private Map<String, String> defaultSourceConnectorProps(String topic) {
- // setup up props for the source connector
+ // setup props for the source connector
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG,
MonitorableSourceConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
index 7c9e2f2c51e..0bbad10a57d 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
@@ -70,6 +70,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@@ -187,7 +188,7 @@ public class RestForwardingIntegrationTest {
followerCallbackCaptor.getValue().onCompletion(forwardException,
null);
return null;
}).when(followerHerder)
- .putConnectorConfig(any(), any(), anyBoolean(),
followerCallbackCaptor.capture());
+ .putConnectorConfig(any(), any(), isNull(), anyBoolean(),
followerCallbackCaptor.capture());
// Leader will reply
ConnectorInfo connectorInfo = new ConnectorInfo("blah",
Collections.emptyMap(), Collections.emptyList(), ConnectorType.SOURCE);
@@ -197,7 +198,7 @@ public class RestForwardingIntegrationTest {
leaderCallbackCaptor.getValue().onCompletion(null, leaderAnswer);
return null;
}).when(leaderHerder)
- .putConnectorConfig(any(), any(), anyBoolean(),
leaderCallbackCaptor.capture());
+ .putConnectorConfig(any(), any(), isNull(), anyBoolean(),
leaderCallbackCaptor.capture());
// Client makes request to the follower
URI followerUrl = followerServer.advertisedUrl();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java
index ea938f9a4f6..e47fe4304d3 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java
@@ -17,7 +17,9 @@
package org.apache.kafka.connect.integration;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
@@ -26,12 +28,21 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -41,6 +52,10 @@ import static org.junit.Assert.assertTrue;
@Category(IntegrationTest.class)
public class StandaloneWorkerIntegrationTest {
+ private static final String CONNECTOR_NAME = "test-connector";
+ private static final int NUM_TASKS = 4;
+ private static final String TOPIC_NAME = "test-topic";
+
private EmbeddedConnectStandalone connect;
@Before
@@ -202,4 +217,42 @@ public class StandaloneWorkerIntegrationTest {
return entry.getValue().level();
}
+ @Test
+ public void testCreateConnectorWithStoppedInitialState() throws Exception {
+ CreateConnectorRequest createConnectorRequest = new
CreateConnectorRequest(
+ CONNECTOR_NAME,
+ defaultSourceConnectorProps(TOPIC_NAME),
+ CreateConnectorRequest.InitialState.STOPPED
+ );
+ connect.configureConnector(createConnectorRequest);
+
+ // Verify that the connector's status is STOPPED and also that no
tasks were spawned for the connector
+ connect.assertions().assertConnectorIsStopped(
+ CONNECTOR_NAME,
+ "Connector was not created in a stopped state"
+ );
+ assertEquals(Collections.emptyList(),
connect.connectorInfo(CONNECTOR_NAME).tasks());
+ assertEquals(Collections.emptyList(),
connect.taskConfigs(CONNECTOR_NAME));
+
+ // Verify that a connector created in the STOPPED state can be resumed
successfully
+ connect.resumeConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME,
+ NUM_TASKS,
+ "Connector or tasks did not start running healthily in time"
+ );
+ }
+
+ private Map<String, String> defaultSourceConnectorProps(String topic) {
+ // setup props for the source connector
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG,
MonitorableSourceConnector.class.getSimpleName());
+ props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+ props.put(TOPIC_CONFIG, topic);
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG,
StringConverter.class.getName());
+ props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG,
String.valueOf(1));
+ props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG,
String.valueOf(1));
+ return props;
+ }
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 4c08ea2a441..5a22e6c10f0 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -690,7 +690,7 @@ public class DistributedHerderTest {
}).when(herder).validateConnectorConfig(eq(CONN2_CONFIG),
validateCallback.capture());
// CONN2 is new, should succeed
- doNothing().when(configBackingStore).putConnectorConfig(CONN2,
CONN2_CONFIG);
+ doNothing().when(configBackingStore).putConnectorConfig(eq(CONN2),
eq(CONN2_CONFIG), isNull());
// This will occur just before/during the second tick
doNothing().when(member).ensureActive();
@@ -713,6 +713,51 @@ public class DistributedHerderTest {
verifyNoMoreInteractions(worker, member, configBackingStore,
statusBackingStore, putConnectorCallback);
}
+ @Test
+ public void testCreateConnectorWithInitialState() throws Exception {
+ when(member.memberId()).thenReturn("leader");
+ when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList(),
true);
+ expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+ doNothing().when(member).poll(anyLong());
+
+ // Initial rebalance where this member becomes the leader
+ herder.tick();
+
+ // mock the actual validation since its asynchronous nature is
difficult to test and should
+ // be covered sufficiently by the unit tests for the AbstractHerder
class
+ ArgumentCaptor<Callback<ConfigInfos>> validateCallback =
ArgumentCaptor.forClass(Callback.class);
+ doAnswer(invocation -> {
+ validateCallback.getValue().onCompletion(null, CONN2_CONFIG_INFOS);
+ return null;
+ }).when(herder).validateConnectorConfig(eq(CONN2_CONFIG),
validateCallback.capture());
+
+ // CONN2 is new, should succeed
+ doNothing().when(configBackingStore).putConnectorConfig(eq(CONN2),
eq(CONN2_CONFIG), eq(TargetState.STOPPED));
+
+ // This will occur just before/during the second tick
+ doNothing().when(member).ensureActive();
+
+ // No immediate action besides this -- change will be picked up via
the config log
+
+ herder.putConnectorConfig(CONN2, CONN2_CONFIG, TargetState.STOPPED,
false, putConnectorCallback);
+ // This tick runs the initial herder request, which issues an
asynchronous request for
+ // connector validation
+ herder.tick();
+
+ // Once that validation is complete, another request is added to the
herder request queue
+ // for actually performing the config write; this tick is for that
request
+ herder.tick();
+ time.sleep(1000L);
+ assertStatistics(3, 1, 100, 1000L);
+
+ ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG,
Collections.emptyList(), ConnectorType.SOURCE);
+ verify(putConnectorCallback).onCompletion(isNull(), eq(new
Herder.Created<>(true, info)));
+ verifyNoMoreInteractions(worker, member, configBackingStore,
statusBackingStore, putConnectorCallback);
+ }
+
@Test
public void testCreateConnectorConfigBackingStoreError() {
when(member.memberId()).thenReturn("leader");
@@ -735,7 +780,7 @@ public class DistributedHerderTest {
}).when(herder).validateConnectorConfig(eq(CONN2_CONFIG),
validateCallback.capture());
doThrow(new ConnectException("Error writing connector configuration to
Kafka"))
- .when(configBackingStore).putConnectorConfig(CONN2,
CONN2_CONFIG);
+ .when(configBackingStore).putConnectorConfig(eq(CONN2),
eq(CONN2_CONFIG), isNull());
// This will occur just before/during the second tick
doNothing().when(member).ensureActive();
@@ -2184,7 +2229,7 @@ public class DistributedHerderTest {
// Simulate response to writing config + waiting until end of log
to be read
configUpdateListener.onConnectorConfigUpdate(CONN1);
return null;
- }).when(configBackingStore).putConnectorConfig(eq(CONN1),
eq(CONN1_CONFIG_UPDATED));
+ }).when(configBackingStore).putConnectorConfig(eq(CONN1),
eq(CONN1_CONFIG_UPDATED), isNull());
// As a result of reconfig, should need to update snapshot. With only
connector updates, we'll just restart
// connector without rebalance
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequestTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequestTest.java
new file mode 100644
index 00000000000..1d32479f82c
--- /dev/null
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequestTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import org.apache.kafka.connect.runtime.TargetState;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class CreateConnectorRequestTest {
+
+ @Test
+ public void testToTargetState() {
+ assertEquals(TargetState.STARTED,
CreateConnectorRequest.InitialState.RUNNING.toTargetState());
+ assertEquals(TargetState.PAUSED,
CreateConnectorRequest.InitialState.PAUSED.toTargetState());
+ assertEquals(TargetState.STOPPED,
CreateConnectorRequest.InitialState.STOPPED.toTargetState());
+
+ CreateConnectorRequest createConnectorRequest = new
CreateConnectorRequest("test-name", Collections.emptyMap(), null);
+ assertNull(createConnectorRequest.initialTargetState());
+ }
+
+ @Test
+ public void testForValue() {
+ assertEquals(CreateConnectorRequest.InitialState.RUNNING,
CreateConnectorRequest.InitialState.forValue("running"));
+ assertEquals(CreateConnectorRequest.InitialState.RUNNING,
CreateConnectorRequest.InitialState.forValue("Running"));
+ assertEquals(CreateConnectorRequest.InitialState.RUNNING,
CreateConnectorRequest.InitialState.forValue("RUNNING"));
+
+ assertEquals(CreateConnectorRequest.InitialState.PAUSED,
CreateConnectorRequest.InitialState.forValue("paused"));
+ assertEquals(CreateConnectorRequest.InitialState.PAUSED,
CreateConnectorRequest.InitialState.forValue("Paused"));
+ assertEquals(CreateConnectorRequest.InitialState.PAUSED,
CreateConnectorRequest.InitialState.forValue("PAUSED"));
+
+ assertEquals(CreateConnectorRequest.InitialState.STOPPED,
CreateConnectorRequest.InitialState.forValue("stopped"));
+ assertEquals(CreateConnectorRequest.InitialState.STOPPED,
CreateConnectorRequest.InitialState.forValue("Stopped"));
+ assertEquals(CreateConnectorRequest.InitialState.STOPPED,
CreateConnectorRequest.InitialState.forValue("STOPPED"));
+ }
+}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 0183e251600..aed081cf4d6 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest;
+import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
@@ -272,23 +273,64 @@ public class ConnectorsResourceTest {
@Test
public void testCreateConnector() throws Throwable {
- CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+ CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
+ Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME), null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb =
ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
- ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME),
eq(body.config()), eq(false), cb.capture());
+ ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME),
eq(body.config()), isNull(), eq(false), cb.capture());
+
+ connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
+ }
+
+ @Test
+ public void testCreateConnectorWithPausedInitialState() throws Throwable {
+ CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
+ Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME), CreateConnectorRequest.InitialState.PAUSED);
+
+ final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb =
ArgumentCaptor.forClass(Callback.class);
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
+ CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
+ ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME),
eq(body.config()), eq(TargetState.PAUSED), eq(false), cb.capture());
+
+ connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
+ }
+
+ @Test
+ public void testCreateConnectorWithStoppedInitialState() throws Throwable {
+ CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
+ Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME), CreateConnectorRequest.InitialState.STOPPED);
+
+ final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb =
ArgumentCaptor.forClass(Callback.class);
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
+ CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
+ ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME),
eq(body.config()), eq(TargetState.STOPPED), eq(false), cb.capture());
+
+ connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
+ }
+
+ @Test
+ public void testCreateConnectorWithRunningInitialState() throws Throwable {
+ CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
+ Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME), CreateConnectorRequest.InitialState.RUNNING);
+
+ final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb =
ArgumentCaptor.forClass(Callback.class);
+ expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
+ CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
+ ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME),
eq(body.config()), eq(TargetState.STARTED), eq(false), cb.capture());
connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
}
@Test
public void testCreateConnectorNotLeader() throws Throwable {
- CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+ CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
+ Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME), null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb =
ArgumentCaptor.forClass(Callback.class);
expectAndCallbackNotLeaderException(cb).when(herder)
- .putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()),
eq(false), cb.capture());
+ .putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()),
isNull(), eq(false), cb.capture());
when(restClient.httpRequest(eq(LEADER_URL +
"connectors?forward=false"), eq("POST"), isNull(), eq(body), any()))
.thenReturn(new RestClient.HttpResponse<>(201, new
HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
@@ -297,11 +339,12 @@ public class ConnectorsResourceTest {
@Test
public void testCreateConnectorWithHeaders() throws Throwable {
- CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+ CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
+ Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME), null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb =
ArgumentCaptor.forClass(Callback.class);
HttpHeaders httpHeaders = mock(HttpHeaders.class);
expectAndCallbackNotLeaderException(cb)
- .when(herder).putConnectorConfig(eq(CONNECTOR_NAME),
eq(body.config()), eq(false), cb.capture());
+ .when(herder).putConnectorConfig(eq(CONNECTOR_NAME),
eq(body.config()), isNull(), eq(false), cb.capture());
when(restClient.httpRequest(eq(LEADER_URL +
"connectors?forward=false"), eq("POST"), eq(httpHeaders), any(), any()))
.thenReturn(new RestClient.HttpResponse<>(202, new
HashMap<>(), null));
@@ -310,11 +353,12 @@ public class ConnectorsResourceTest {
@Test
public void testCreateConnectorExists() {
- CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+ CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME,
+ Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME), null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb =
ArgumentCaptor.forClass(Callback.class);
expectAndCallbackException(cb, new AlreadyExistsException("already
exists"))
- .when(herder).putConnectorConfig(eq(CONNECTOR_NAME),
eq(body.config()), eq(false), cb.capture());
+ .when(herder).putConnectorConfig(eq(CONNECTOR_NAME),
eq(body.config()), isNull(), eq(false), cb.capture());
assertThrows(AlreadyExistsException.class, () ->
connectorsResource.createConnector(FORWARD, NULL_HEADERS, body));
}
@@ -323,13 +367,13 @@ public class ConnectorsResourceTest {
// Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes
it (puts the name in it) and this
// will affect later tests
Map<String, String> inputConfig =
getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
- final CreateConnectorRequest bodyIn = new
CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES, inputConfig);
- final CreateConnectorRequest bodyOut = new
CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG);
+ final CreateConnectorRequest bodyIn = new
CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES, inputConfig, null);
+ final CreateConnectorRequest bodyOut = new
CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG, null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb =
ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(bodyOut.name(), bodyOut.config(),
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
- ).when(herder).putConnectorConfig(eq(bodyOut.name()),
eq(bodyOut.config()), eq(false), cb.capture());
+ ).when(herder).putConnectorConfig(eq(bodyOut.name()),
eq(bodyOut.config()), isNull(), eq(false), cb.capture());
connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
}
@@ -339,13 +383,13 @@ public class ConnectorsResourceTest {
// Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes
it (puts the name in it) and this
// will affect later tests
Map<String, String> inputConfig =
getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
- final CreateConnectorRequest bodyIn = new
CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES, inputConfig);
- final CreateConnectorRequest bodyOut = new CreateConnectorRequest("",
CONNECTOR_CONFIG_WITH_EMPTY_NAME);
+ final CreateConnectorRequest bodyIn = new
CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES, inputConfig, null);
+ final CreateConnectorRequest bodyOut = new CreateConnectorRequest("",
CONNECTOR_CONFIG_WITH_EMPTY_NAME, null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb =
ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(bodyOut.name(), bodyOut.config(),
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
- ).when(herder).putConnectorConfig(eq(bodyOut.name()),
eq(bodyOut.config()), eq(false), cb.capture());
+ ).when(herder).putConnectorConfig(eq(bodyOut.name()),
eq(bodyOut.config()), isNull(), eq(false), cb.capture());
connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
}
@@ -355,13 +399,13 @@ public class ConnectorsResourceTest {
// Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes
it (puts the name in it) and this
// will affect later tests
Map<String, String> inputConfig =
getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
- final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null,
inputConfig);
- final CreateConnectorRequest bodyOut = new CreateConnectorRequest("",
CONNECTOR_CONFIG_WITH_EMPTY_NAME);
+ final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null,
inputConfig, null);
+ final CreateConnectorRequest bodyOut = new CreateConnectorRequest("",
CONNECTOR_CONFIG_WITH_EMPTY_NAME, null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb =
ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(bodyOut.name(), bodyOut.config(),
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
- ).when(herder).putConnectorConfig(eq(bodyOut.name()),
eq(bodyOut.config()), eq(false), cb.capture());
+ ).when(herder).putConnectorConfig(eq(bodyOut.name()),
eq(bodyOut.config()), isNull(), eq(false), cb.capture());
connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
}
@@ -476,12 +520,13 @@ public class ConnectorsResourceTest {
@Test
public void testCreateConnectorWithSpecialCharsInName() throws Throwable {
- CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME_SPECIAL_CHARS,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME_SPECIAL_CHARS));
+ CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME_SPECIAL_CHARS,
+ Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME_SPECIAL_CHARS), null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb =
ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME_SPECIAL_CHARS, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
- ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_SPECIAL_CHARS),
eq(body.config()), eq(false), cb.capture());
+ ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_SPECIAL_CHARS),
eq(body.config()), isNull(), eq(false), cb.capture());
String rspLocation = connectorsResource.createConnector(FORWARD,
NULL_HEADERS, body).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
@@ -490,12 +535,13 @@ public class ConnectorsResourceTest {
@Test
public void testCreateConnectorWithControlSequenceInName() throws
Throwable {
- CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME_CONTROL_SEQUENCES1,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME_CONTROL_SEQUENCES1));
+ CreateConnectorRequest body = new
CreateConnectorRequest(CONNECTOR_NAME_CONTROL_SEQUENCES1,
+ Collections.singletonMap(ConnectorConfig.NAME_CONFIG,
CONNECTOR_NAME_CONTROL_SEQUENCES1), null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb =
ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new
ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
-
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_CONTROL_SEQUENCES1),
eq(body.config()), eq(false), cb.capture());
+
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_CONTROL_SEQUENCES1),
eq(body.config()), isNull(), eq(false), cb.capture());
String rspLocation = connectorsResource.createConnector(FORWARD,
NULL_HEADERS, body).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
@@ -540,7 +586,7 @@ public class ConnectorsResourceTest {
public void testCreateConnectorConfigNameMismatch() {
Map<String, String> connConfig = new HashMap<>();
connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
- CreateConnectorRequest request = new
CreateConnectorRequest(CONNECTOR_NAME, connConfig);
+ CreateConnectorRequest request = new
CreateConnectorRequest(CONNECTOR_NAME, connConfig, null);
assertThrows(BadRequestException.class, () ->
connectorsResource.createConnector(FORWARD, NULL_HEADERS, request));
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index d9c64f5e36c..7e4126aa141 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -247,6 +247,37 @@ public class StandaloneHerderTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testCreateConnectorWithStoppedInitialState() throws Exception {
+ connector = PowerMock.createMock(BogusSinkConnector.class);
+ Map<String, String> config = connectorConfig(SourceSink.SINK);
+ Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+ expectConfigValidation(connectorMock, false, config);
+
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+
+ // Only the connector should be created; we expect no tasks to be
spawned for a connector created with a paused or stopped initial state
+ Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
+ worker.startConnector(eq(CONNECTOR_NAME), eq(config),
EasyMock.anyObject(HerderConnectorContext.class),
+ eq(herder), eq(TargetState.STOPPED), EasyMock.capture(onStart));
+ EasyMock.expectLastCall().andAnswer(() -> {
+ onStart.getValue().onCompletion(null, TargetState.STOPPED);
+ return true;
+ });
+
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true).anyTimes();
+
EasyMock.expect(herder.connectorType(anyObject())).andReturn(ConnectorType.SINK);
+
+ PowerMock.replayAll();
+
+ herder.putConnectorConfig(CONNECTOR_NAME, config, TargetState.STOPPED,
false, createCallback);
+ Herder.Created<ConnectorInfo> connectorInfo =
createCallback.get(1000L, TimeUnit.SECONDS);
+ assertEquals(
+ new ConnectorInfo(CONNECTOR_NAME,
connectorConfig(SourceSink.SINK), Collections.emptyList(), ConnectorType.SINK),
+ connectorInfo.result()
+ );
+
+ PowerMock.verifyAll();
+ }
+
@Test
public void testDestroyConnector() throws Exception {
connector = PowerMock.createMock(BogusSourceConnector.class);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 59420e8faf0..a0322414951 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -24,11 +24,11 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Field;
@@ -77,12 +77,12 @@ import static
org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
+import static
org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
import static
org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG;
import static
org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME;
import static
org.apache.kafka.connect.storage.KafkaConfigBackingStore.ONLY_FAILED_FIELD_NAME;
import static
org.apache.kafka.connect.storage.KafkaConfigBackingStore.READ_WRITE_TOTAL_TIMEOUT_MS;
import static
org.apache.kafka.connect.storage.KafkaConfigBackingStore.RESTART_KEY;
-import static
org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
@@ -177,6 +177,10 @@ public class KafkaConfigBackingStoreTest {
"config-bytes-7".getBytes(), "config-bytes-8".getBytes(),
"config-bytes-9".getBytes()
);
+ private static final List<byte[]> TARGET_STATES_SERIALIZED = Arrays.asList(
+ "started".getBytes(), "paused".getBytes(), "stopped".getBytes()
+ );
+
@Mock
private Converter converter;
@Mock
@@ -320,14 +324,14 @@ public class KafkaConfigBackingStoreTest {
assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
// Writing should block until it is written and read back from Kafka
- configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0));
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), null);
configState = configStorage.snapshot();
assertEquals(1, configState.offset());
assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
// Second should also block and all configs should still be available
- configStorage.putConnectorConfig(CONNECTOR_IDS.get(1),
SAMPLE_CONFIGS.get(1));
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(1),
SAMPLE_CONFIGS.get(1), null);
configState = configStorage.snapshot();
assertEquals(2, configState.offset());
assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
@@ -346,6 +350,55 @@ public class KafkaConfigBackingStoreTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testPutConnectorConfigWithTargetState() throws Exception {
+ expectConfigure();
+ expectStart(Collections.emptyList(), Collections.emptyMap());
+
+ // We expect to write the target state first, followed by the config
write and then a read to end
+
+ expectConvertWriteRead(
+ TARGET_STATE_KEYS.get(0), KafkaConfigBackingStore.TARGET_STATE_V1,
TARGET_STATES_SERIALIZED.get(2),
+ "state.v2", TargetState.STOPPED.name());
+ // We don't expect the config update listener's
onConnectorTargetStateChange hook to be invoked
+
+ expectConvertWriteRead(
+ CONNECTOR_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+ "properties", SAMPLE_CONFIGS.get(0));
+ configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
+ EasyMock.expectLastCall();
+
+ LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
+ recordsToRead.put(TARGET_STATE_KEYS.get(0),
TARGET_STATES_SERIALIZED.get(2));
+ recordsToRead.put(CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ expectReadToEnd(recordsToRead);
+
+ expectPartitionCount(1);
+ expectStop();
+
+ PowerMock.replayAll();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ configStorage.start();
+
+ // Null before writing
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(-1, configState.offset());
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertNull(configState.targetState(CONNECTOR_IDS.get(0)));
+
+ // Writing should block until it is written and read back from Kafka
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), TargetState.STOPPED);
+ configState = configStorage.snapshot();
+ assertEquals(2, configState.offset());
+ assertEquals(TargetState.STOPPED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+
+ configStorage.stop();
+
+ PowerMock.verifyAll();
+ }
+
@Test
public void testPutConnectorConfigProducerError() throws Exception {
expectConfigure();
@@ -373,7 +426,8 @@ public class KafkaConfigBackingStoreTest {
assertEquals(0, configState.connectors().size());
// verify that the producer exception from KafkaBasedLog::send is
propagated
- ConnectException e = assertThrows(ConnectException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0)));
+ ConnectException e = assertThrows(ConnectException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
+ SAMPLE_CONFIGS.get(0), null));
assertTrue(e.getMessage().contains("Error writing connector
configuration to Kafka"));
configStorage.stop();
@@ -505,16 +559,16 @@ public class KafkaConfigBackingStoreTest {
configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6);
// Should fail again when we get fenced out
- assertThrows(PrivilegedWriteException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0)));
+ assertThrows(PrivilegedWriteException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0),
null));
// Should fail if we retry without reclaiming write privileges
- assertThrows(IllegalStateException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0)));
+ assertThrows(IllegalStateException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0),
null));
// Should succeed even without write privileges (target states can be
written by anyone)
configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED);
// Should succeed if we re-claim write privileges
configStorage.claimWritePrivileges();
- configStorage.putConnectorConfig(CONNECTOR_IDS.get(1),
SAMPLE_CONFIGS.get(0));
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(1),
SAMPLE_CONFIGS.get(0), null);
configStorage.stop();
@@ -891,7 +945,6 @@ public class KafkaConfigBackingStoreTest {
expectRead(serializedAfterStartup, deserializedAfterStartup);
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
-
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(1));
EasyMock.expectLastCall();
expectPartitionCount(1);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java
index 3e449b44cf9..185cb604cbf 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java
@@ -40,6 +40,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -66,7 +67,7 @@ public class MemoryConfigBackingStoreTest {
@Test
public void testPutConnectorConfig() {
- configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0));
+ configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), null);
ClusterConfigState configState = configStore.snapshot();
assertTrue(configState.contains(CONNECTOR_IDS.get(0)));
@@ -78,9 +79,24 @@ public class MemoryConfigBackingStoreTest {
verify(configUpdateListener).onConnectorConfigUpdate(eq(CONNECTOR_IDS.get(0)));
}
+ @Test
+ public void testPutConnectorConfigWithTargetState() {
+ configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), TargetState.PAUSED);
+ ClusterConfigState configState = configStore.snapshot();
+
+ assertTrue(configState.contains(CONNECTOR_IDS.get(0)));
+ assertEquals(TargetState.PAUSED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertEquals(1, configState.connectors().size());
+
+
verify(configUpdateListener).onConnectorConfigUpdate(eq(CONNECTOR_IDS.get(0)));
+ // onConnectorTargetStateChange hook shouldn't be called when a
connector is created with a specific initial target state
+ verify(configUpdateListener,
never()).onConnectorTargetStateChange(eq(CONNECTOR_IDS.get(0)));
+ }
+
@Test
public void testPutConnectorConfigUpdateExisting() {
- configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0));
+ configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), null);
ClusterConfigState configState = configStore.snapshot();
assertTrue(configState.contains(CONNECTOR_IDS.get(0)));
@@ -89,7 +105,7 @@ public class MemoryConfigBackingStoreTest {
assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
assertEquals(1, configState.connectors().size());
- configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(1));
+ configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(1), null);
configState = configStore.snapshot();
assertEquals(SAMPLE_CONFIGS.get(1),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
@@ -98,8 +114,8 @@ public class MemoryConfigBackingStoreTest {
@Test
public void testRemoveConnectorConfig() {
- configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0));
- configStore.putConnectorConfig(CONNECTOR_IDS.get(1),
SAMPLE_CONFIGS.get(1));
+ configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), null);
+ configStore.putConnectorConfig(CONNECTOR_IDS.get(1),
SAMPLE_CONFIGS.get(1), null);
ClusterConfigState configState = configStore.snapshot();
Set<String> expectedConnectors = new HashSet<>();
@@ -124,7 +140,7 @@ public class MemoryConfigBackingStoreTest {
assertThrows(IllegalArgumentException.class,
() -> configStore.putTaskConfigs(CONNECTOR_IDS.get(0),
Collections.singletonList(SAMPLE_CONFIGS.get(1))));
- configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0));
+ configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), null);
configStore.putTaskConfigs(CONNECTOR_IDS.get(0),
Collections.singletonList(SAMPLE_CONFIGS.get(1)));
ClusterConfigState configState = configStore.snapshot();
@@ -151,7 +167,7 @@ public class MemoryConfigBackingStoreTest {
return null;
}).when(configUpdateListener).onTaskConfigUpdate(anySet());
- configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0));
+ configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), null);
configStore.putTaskConfigs(CONNECTOR_IDS.get(0),
Collections.singletonList(SAMPLE_CONFIGS.get(1)));
configStore.removeTaskConfigs(CONNECTOR_IDS.get(0));
ClusterConfigState configState = configStore.snapshot();
@@ -171,7 +187,7 @@ public class MemoryConfigBackingStoreTest {
// Can't write target state for non-existent connector
assertThrows(IllegalArgumentException.class, () ->
configStore.putTargetState(CONNECTOR_IDS.get(0), TargetState.PAUSED));
- configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0));
+ configStore.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), null);
configStore.putTargetState(CONNECTOR_IDS.get(0), TargetState.PAUSED);
// Ensure that
ConfigBackingStore.UpdateListener::onConnectorTargetStateChange is called only
once if the same state is written twice
configStore.putTargetState(CONNECTOR_IDS.get(0), TargetState.PAUSED);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
index 3b8c504e60e..147e435adf6 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
@@ -30,6 +30,7 @@ import
org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@@ -187,7 +188,7 @@ abstract class EmbeddedConnect {
*
* @param connName the name of the connector
* @param connConfig the intended configuration
- * @throws ConnectRestException if the REST api returns error status
+ * @throws ConnectRestException if the REST API returns error status
* @throws ConnectException if the configuration fails to be serialized or
if the request could not be sent
*/
public String configureConnector(String connName, Map<String, String>
connConfig) {
@@ -195,6 +196,36 @@ abstract class EmbeddedConnect {
return putConnectorConfig(url, connConfig);
}
+ /**
+ * Configure a new connector using the <strong><em>POST
/connectors</em></strong> endpoint. If the connector already exists, a
+ * {@link ConnectRestException} will be thrown.
+ *
+ * @param createConnectorRequest the connector creation request
+ * @throws ConnectRestException if the REST API returns error status
+ * @throws ConnectException if the request could not be sent
+ */
+ public String configureConnector(CreateConnectorRequest
createConnectorRequest) {
+ String url = endpointForResource("connectors");
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ String requestBody;
+ try {
+ requestBody =
objectMapper.writeValueAsString(createConnectorRequest);
+ } catch (IOException e) {
+ throw new ConnectException("Failed to serialize connector creation
request: " + createConnectorRequest);
+ }
+
+ Response response = requestPost(url, requestBody,
Collections.emptyMap());
+ if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
+ return responseToString(response);
+ } else {
+ throw new ConnectRestException(
+ response.getStatus(),
+ "Could not execute 'POST /connectors' request. Error response:
" + responseToString(response)
+ );
+ }
+ }
+
/**
* Validate a given connector configuration. If the configuration
validates or
* has a configuration error, an instance of {@link ConfigInfos} is
returned. If the validation fails
diff --git a/docs/connect.html b/docs/connect.html
index 16f268e78b0..10b423aa380 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -41,7 +41,7 @@
<p>In standalone mode all work is performed in a single process. This
configuration is simpler to setup and get started with and may be useful in
situations where only one worker makes sense (e.g. collecting log files), but
it does not benefit from some of the features of Kafka Connect such as fault
tolerance. You can start a standalone process with the following command:</p>
<pre class="brush: bash;">
-> bin/connect-standalone.sh config/connect-standalone.properties
[connector1.properties connector2.properties ...]</pre>
+> bin/connect-standalone.sh config/connect-standalone.properties
[connector1.properties connector2.json ...]</pre>
<p>The first parameter is the configuration for the worker. This includes
settings such as the Kafka connection parameters, serialization format, and how
frequently to commit offsets. The provided example should work well with a
local cluster running with the default configuration provided by
<code>config/server.properties</code>. It will require tweaking to use with a
different configuration or production deployment. All workers (both standalone
and distributed) require a few configs:</p>
<ul>
@@ -60,7 +60,7 @@
<p>Starting with 2.3.0, client configuration overrides can be configured
individually per connector by using the prefixes
<code>producer.override.</code> and <code>consumer.override.</code> for Kafka
sources or Kafka sinks respectively. These overrides are included with the rest
of the connector's configuration properties.</p>
- <p>The remaining parameters are connector configuration files. You may
include as many as you want, but all will execute within the same process (on
different threads). You can also choose not to specify any connector
configuration files on the command line, and instead use the REST API to create
connectors at runtime after your standalone worker starts.</p>
+ <p>The remaining parameters are connector configuration files. Each file
may either be a Java Properties file or a JSON file containing an object with
the same structure as the request body of either the <code>POST
/connectors</code> endpoint or the <code>PUT /connectors/{name}/config</code>
endpoint (see the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI
documentation</a>). You may include as many as you want, but all will execute
within the same process (on different th [...]
<p>Distributed mode handles automatic balancing of work, allows you to
scale up (or down) dynamically, and offers fault tolerance both in the active
tasks and for configuration and offset commit data. Execution is very similar
to standalone mode:</p>
@@ -293,7 +293,7 @@ listeners=http://localhost:8080,https://localhost:8443</pre>
<ul>
<li><code>GET /connectors</code> - return a list of active
connectors</li>
- <li><code>POST /connectors</code> - create a new connector; the
request body should be a JSON object containing a string <code>name</code>
field and an object <code>config</code> field with the connector configuration
parameters</li>
+ <li><code>POST /connectors</code> - create a new connector; the
request body should be a JSON object containing a string <code>name</code>
field and an object <code>config</code> field with the connector configuration
parameters. The JSON object may also optionally contain a string
<code>initial_state</code> field which can take the following values -
<code>STOPPED</code>, <code>PAUSED</code> or <code>RUNNING</code> (the default
value)</li>
<li><code>GET /connectors/{name}</code> - get information about a
specific connector</li>
<li><code>GET /connectors/{name}/config</code> - get the configuration
parameters for a specific connector</li>
<li><code>PUT /connectors/{name}/config</code> - update the
configuration parameters for a specific connector</li>