This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1bc4de75a2f KAFKA-15470: Allow creating connectors in a stopped state 
(#14704)
1bc4de75a2f is described below

commit 1bc4de75a2f870c31ad0025ff4feab063c5a5267
Author: Yash Mayya <[email protected]>
AuthorDate: Wed Nov 15 06:07:50 2023 +0000

    KAFKA-15470: Allow creating connectors in a stopped state (#14704)
    
    Reviewers: Chris Egerton <[email protected]>
---
 bin/connect-standalone.sh                          |   2 +-
 checkstyle/import-control.xml                      |   1 +
 .../kafka/connect/cli/ConnectStandalone.java       |  84 +++++++-
 .../org/apache/kafka/connect/runtime/Herder.java   |  13 ++
 .../runtime/distributed/DistributedHerder.java     |   8 +-
 .../rest/entities/CreateConnectorRequest.java      |  49 ++++-
 .../runtime/rest/resources/ConnectorsResource.java |   2 +-
 .../runtime/standalone/StandaloneHerder.java       |  11 +-
 .../kafka/connect/storage/ConfigBackingStore.java  |   4 +-
 .../connect/storage/KafkaConfigBackingStore.java   |  36 +++-
 .../connect/storage/MemoryConfigBackingStore.java  |  19 +-
 .../kafka/connect/cli/ConnectStandaloneTest.java   | 127 +++++++++++
 .../integration/ConnectWorkerIntegrationTest.java  | 233 ++++++++++++++++++++-
 .../integration/RestForwardingIntegrationTest.java |   5 +-
 .../StandaloneWorkerIntegrationTest.java           |  53 +++++
 .../runtime/distributed/DistributedHerderTest.java |  51 ++++-
 .../rest/entities/CreateConnectorRequestTest.java  |  53 +++++
 .../rest/resources/ConnectorsResourceTest.java     |  90 ++++++--
 .../runtime/standalone/StandaloneHerderTest.java   |  31 +++
 .../storage/KafkaConfigBackingStoreTest.java       |  71 ++++++-
 .../storage/MemoryConfigBackingStoreTest.java      |  32 ++-
 .../connect/util/clusters/EmbeddedConnect.java     |  33 ++-
 docs/connect.html                                  |   6 +-
 23 files changed, 929 insertions(+), 85 deletions(-)

diff --git a/bin/connect-standalone.sh b/bin/connect-standalone.sh
index 441069fed31..bef78d658fd 100755
--- a/bin/connect-standalone.sh
+++ b/bin/connect-standalone.sh
@@ -16,7 +16,7 @@
 
 if [ $# -lt 1 ];
 then
-        echo "USAGE: $0 [-daemon] connect-standalone.properties"
+        echo "USAGE: $0 [-daemon] connect-standalone.properties 
[connector1.properties connector2.json ...]"
         exit 1
 fi
 
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 0db1700c612..9e34d1e4029 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -534,6 +534,7 @@
     </subpackage>
 
     <subpackage name="cli">
+      <allow pkg="com.fasterxml.jackson" />
       <allow pkg="org.apache.kafka.connect.runtime" />
       <allow pkg="org.apache.kafka.connect.storage" />
       <allow pkg="org.apache.kafka.connect.util" />
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index c1977de3df9..7068377a900 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -16,20 +16,26 @@
  */
 package org.apache.kafka.connect.cli;
 
+import com.fasterxml.jackson.core.exc.StreamReadException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DatabindException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
@@ -38,9 +44,14 @@ import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.Map;
 
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+
 /**
  * <p>
  * Command line utility that runs Kafka Connect as a standalone process. In 
this mode, work (connectors and tasks) is not
@@ -61,23 +72,24 @@ public class ConnectStandalone extends 
AbstractConnectCli<StandaloneConfig> {
 
     @Override
     protected String usage() {
-        return "ConnectStandalone worker.properties [connector1.properties 
connector2.properties ...]";
+        return "ConnectStandalone worker.properties [connector1.properties 
connector2.json ...]";
     }
 
     @Override
     protected void processExtraArgs(Herder herder, Connect connect, String[] 
extraArgs) {
         try {
-            for (final String connectorPropsFile : extraArgs) {
-                Map<String, String> connectorProps = 
Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+            for (final String connectorConfigFile : extraArgs) {
+                CreateConnectorRequest createConnectorRequest = 
parseConnectorConfigurationFile(connectorConfigFile);
                 FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>((error, info) -> {
                     if (error != null)
-                        log.error("Failed to create connector for {}", 
connectorPropsFile);
+                        log.error("Failed to create connector for {}", 
connectorConfigFile);
                     else
                         log.info("Created connector {}", info.result().name());
                 });
                 herder.putConnectorConfig(
-                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
-                        connectorProps, false, cb);
+                    createConnectorRequest.name(), 
createConnectorRequest.config(),
+                    createConnectorRequest.initialTargetState(),
+                    false, cb);
                 cb.get();
             }
         } catch (Throwable t) {
@@ -87,6 +99,64 @@ public class ConnectStandalone extends 
AbstractConnectCli<StandaloneConfig> {
         }
     }
 
+    /**
+     * Parse a connector configuration file into a {@link 
CreateConnectorRequest}. The file can have any one of the following formats 
(note that
+     * we attempt to parse the file in this order):
+     * <ol>
+     *     <li>A JSON file containing an Object with only String keys and 
values that represent the connector configuration.</li>
+     *     <li>A JSON file containing an Object that can be parsed directly 
into a {@link CreateConnectorRequest}</li>
+     *     <li>A valid Java Properties file (i.e. containing String key/value 
pairs representing the connector configuration)</li>
+     * </ol>
+     * <p>
+     * Visible for testing.
+     *
+     * @param filePath the path of the connector configuration file
+     * @return the parsed connector configuration in the form of a {@link 
CreateConnectorRequest}
+     */
+    CreateConnectorRequest parseConnectorConfigurationFile(String filePath) 
throws IOException {
+        ObjectMapper objectMapper = new ObjectMapper();
+
+        File connectorConfigurationFile = Paths.get(filePath).toFile();
+        try {
+            Map<String, String> connectorConfigs = objectMapper.readValue(
+                connectorConfigurationFile,
+                new TypeReference<Map<String, String>>() { });
+
+            if (!connectorConfigs.containsKey(NAME_CONFIG)) {
+                throw new ConnectException("Connector configuration at '" + 
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+                    + "configuration");
+            }
+            return new 
CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs, 
null);
+        } catch (StreamReadException | DatabindException e) {
+            log.debug("Could not parse connector configuration file '{}' into 
a Map with String keys and values", filePath);
+        }
+
+        try {
+            
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+            CreateConnectorRequest createConnectorRequest = 
objectMapper.readValue(connectorConfigurationFile,
+                new TypeReference<CreateConnectorRequest>() { });
+            if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
+                if 
(!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name()))
 {
+                    throw new ConnectException("Connector name configuration 
in 'config' doesn't match the one specified in 'name' at '" + filePath
+                        + "'");
+                }
+            } else {
+                createConnectorRequest.config().put(NAME_CONFIG, 
createConnectorRequest.name());
+            }
+            return createConnectorRequest;
+        } catch (StreamReadException | DatabindException e) {
+            log.debug("Could not parse connector configuration file '{}' into 
an object of type {}",
+                filePath, CreateConnectorRequest.class.getSimpleName());
+        }
+
+        Map<String, String> connectorConfigs = 
Utils.propsToStringMap(Utils.loadProps(filePath));
+        if (!connectorConfigs.containsKey(NAME_CONFIG)) {
+            throw new ConnectException("Connector configuration at '" + 
filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+                + "configuration");
+        }
+        return new CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), 
connectorConfigs, null);
+    }
+
     @Override
     protected Herder createHerder(StandaloneConfig config, String workerId, 
Plugins plugins,
                                   ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 566a5c4c096..a8a6e7858d8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -108,6 +108,19 @@ public interface Herder {
      */
     void putConnectorConfig(String connName, Map<String, String> config, 
boolean allowReplace, Callback<Created<ConnectorInfo>> callback);
 
+    /**
+     * Set the configuration for a connector, along with a target state 
optionally. This supports creation and updating.
+     * @param connName name of the connector
+     * @param config the connector's configuration
+     * @param targetState the desired target state for the connector; may be 
{@code null} if no target state change is desired. Note that the default
+     *                    target state is {@link TargetState#STARTED} if no 
target state exists previously
+     * @param allowReplace if true, allow overwriting previous configs; if 
false, throw {@link AlreadyExistsException}
+     *                     if a connector with the same name already exists
+     * @param callback callback to invoke when the configuration has been 
written
+     */
+    void putConnectorConfig(String connName, Map<String, String> config, 
TargetState targetState, boolean allowReplace,
+                            Callback<Created<ConnectorInfo>> callback);
+
     /**
      * Delete a connector and its configuration.
      * @param connName name of the connector
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 48b5fad3423..8ae19dd3a7b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1051,6 +1051,12 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     @Override
     public void putConnectorConfig(final String connName, final Map<String, 
String> config, final boolean allowReplace,
                                    final Callback<Created<ConnectorInfo>> 
callback) {
+        putConnectorConfig(connName, config, null, allowReplace, callback);
+    }
+
+    @Override
+    public void putConnectorConfig(final String connName, final Map<String, 
String> config, final TargetState targetState,
+                                   final boolean allowReplace, final 
Callback<Created<ConnectorInfo>> callback) {
         log.trace("Submitting connector config write request {}", connName);
         addRequest(
             () -> {
@@ -1081,7 +1087,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                             }
 
                             log.trace("Submitting connector config {} {} {}", 
connName, allowReplace, configState.connectors());
-                            writeToConfigTopicAsLeader(() -> 
configBackingStore.putConnectorConfig(connName, config));
+                            writeToConfigTopicAsLeader(() -> 
configBackingStore.putConnectorConfig(connName, config, targetState));
 
                             // Note that we use the updated connector config 
despite the fact that we don't have an updated
                             // snapshot yet. The existing task info should 
still be accurate.
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
index 1c52d8db997..da8e235e424 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
@@ -18,18 +18,23 @@ package org.apache.kafka.connect.runtime.rest.entities;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.connect.runtime.TargetState;
 
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 
 public class CreateConnectorRequest {
     private final String name;
     private final Map<String, String> config;
+    private final InitialState initialState;
 
     @JsonCreator
-    public CreateConnectorRequest(@JsonProperty("name") String name, 
@JsonProperty("config") Map<String, String> config) {
+    public CreateConnectorRequest(@JsonProperty("name") String name, 
@JsonProperty("config") Map<String, String> config,
+                                  @JsonProperty("initial_state") InitialState 
initialState) {
         this.name = name;
         this.config = config;
+        this.initialState = initialState;
     }
 
     @JsonProperty
@@ -42,17 +47,55 @@ public class CreateConnectorRequest {
         return config;
     }
 
+    @JsonProperty
+    public InitialState initialState() {
+        return initialState;
+    }
+
+    public TargetState initialTargetState() {
+        if (initialState != null) {
+            return initialState.toTargetState();
+        } else {
+            return null;
+        }
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         CreateConnectorRequest that = (CreateConnectorRequest) o;
         return Objects.equals(name, that.name) &&
-                Objects.equals(config, that.config);
+            Objects.equals(config, that.config) &&
+            Objects.equals(initialState, that.initialState);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, config);
+        return Objects.hash(name, config, initialState);
+    }
+
+    public enum InitialState {
+        RUNNING,
+        PAUSED,
+        STOPPED;
+
+        @JsonCreator
+        public static InitialState forValue(String value) {
+            return InitialState.valueOf(value.toUpperCase(Locale.ROOT));
+        }
+
+        public TargetState toTargetState() {
+            switch (this) {
+                case RUNNING:
+                    return TargetState.STARTED;
+                case PAUSED:
+                    return TargetState.PAUSED;
+                case STOPPED:
+                    return TargetState.STOPPED;
+                default:
+                    throw new IllegalArgumentException("Unknown initial state: 
" + this);
+            }
+        }
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 4878b8df9e1..75e510ef9ad 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -145,7 +145,7 @@ public class ConnectorsResource implements ConnectResource {
         checkAndPutConnectorConfigName(name, configs);
 
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>();
-        herder.putConnectorConfig(name, configs, false, cb);
+        herder.putConnectorConfig(name, configs, 
createRequest.initialTargetState(), false, cb);
         Herder.Created<ConnectorInfo> info = 
requestHandler.completeOrForwardRequest(cb, "/connectors", "POST", headers, 
createRequest,
                 new TypeReference<ConnectorInfo>() { }, new 
CreatedConnectorInfoTranslator(), forward);
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 0da89b2f668..40e19da19c7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -183,6 +183,12 @@ public class StandaloneHerder extends AbstractHerder {
                                                 final Map<String, String> 
config,
                                                 boolean allowReplace,
                                                 final 
Callback<Created<ConnectorInfo>> callback) {
+        putConnectorConfig(connName, config, null, allowReplace, callback);
+    }
+
+    @Override
+    public void putConnectorConfig(final String connName, final Map<String, 
String> config, final TargetState targetState,
+                                   final boolean allowReplace, final 
Callback<Created<ConnectorInfo>> callback) {
         try {
             validateConnectorConfig(config, (error, configInfos) -> {
                 if (error != null) {
@@ -191,7 +197,7 @@ public class StandaloneHerder extends AbstractHerder {
                 }
 
                 requestExecutorService.submit(
-                    () -> putConnectorConfig(connName, config, allowReplace, 
callback, configInfos)
+                    () -> putConnectorConfig(connName, config, targetState, 
allowReplace, callback, configInfos)
                 );
             });
         } catch (Throwable t) {
@@ -201,6 +207,7 @@ public class StandaloneHerder extends AbstractHerder {
 
     private synchronized void putConnectorConfig(String connName,
                                                  final Map<String, String> 
config,
+                                                 TargetState targetState,
                                                  boolean allowReplace,
                                                  final 
Callback<Created<ConnectorInfo>> callback,
                                                  ConfigInfos configInfos) {
@@ -221,7 +228,7 @@ public class StandaloneHerder extends AbstractHerder {
                 created = true;
             }
 
-            configBackingStore.putConnectorConfig(connName, config);
+            configBackingStore.putConnectorConfig(connName, config, 
targetState);
 
             startConnector(connName, (error, result) -> {
                 if (error != null) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
index c869c545f80..9f12ab0d3cf 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java
@@ -56,8 +56,10 @@ public interface ConfigBackingStore {
      * Update the configuration for a connector.
      * @param connector name of the connector
      * @param properties the connector configuration
+     * @param targetState the desired target state for the connector; may be 
{@code null} if no target state change is desired. Note that the default
+     *                    target state is {@link TargetState#STARTED} if no 
target state exists previously
      */
-    void putConnectorConfig(String connector, Map<String, String> properties);
+    void putConnectorConfig(String connector, Map<String, String> properties, 
TargetState targetState);
 
     /**
      * Remove configuration for a connector
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index a95c8249942..35d43ea3cca 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -497,26 +497,34 @@ public class KafkaConfigBackingStore extends 
KafkaTopicBasedBackingStore impleme
     }
 
     /**
-     * Write this connector configuration to persistent storage and wait until 
it has been acknowledged and read back by
-     * tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} 
must be successfully invoked before calling
+     * Write this connector configuration (and optionally a target state) to 
persistent storage and wait until it has been acknowledged and read
+     * back by tailing the Kafka log with a consumer. {@link 
#claimWritePrivileges()} must be successfully invoked before calling
      * this method if the worker is configured to use a fencable producer for 
writes to the config topic.
      *
      * @param connector  name of the connector to write data for
      * @param properties the configuration to write
+     * @param targetState the desired target state for the connector; may be 
{@code null} if no target state change is desired. Note that the default
+     *                    target state is {@link TargetState#STARTED} if no 
target state exists previously
      * @throws IllegalStateException if {@link #claimWritePrivileges()} is 
required, but was not successfully invoked before
      * this method was called
      * @throws PrivilegedWriteException if the worker is configured to use a 
fencable producer for writes to the config topic
      * and the write fails
      */
     @Override
-    public void putConnectorConfig(String connector, Map<String, String> 
properties) {
+    public void putConnectorConfig(String connector, Map<String, String> 
properties, TargetState targetState) {
         log.debug("Writing connector configuration for connector '{}'", 
connector);
         Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
         connectConfig.put("properties", properties);
         byte[] serializedConfig = converter.fromConnectData(topic, 
CONNECTOR_CONFIGURATION_V0, connectConfig);
         try {
             Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
-            sendPrivileged(CONNECTOR_KEY(connector), serializedConfig, timer);
+            List<ProducerKeyValue> keyValues = new ArrayList<>();
+            if (targetState != null) {
+                log.debug("Writing target state {} for connector {}", 
targetState, connector);
+                keyValues.add(new 
ProducerKeyValue(TARGET_STATE_KEY(connector), 
serializeTargetState(targetState)));
+            }
+            keyValues.add(new ProducerKeyValue(CONNECTOR_KEY(connector), 
serializedConfig));
+            sendPrivileged(keyValues, timer);
             configLog.readToEnd().get(timer.remainingMs(), 
TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
             log.error("Failed to write connector configuration to Kafka: ", e);
@@ -647,20 +655,24 @@ public class KafkaConfigBackingStore extends 
KafkaTopicBasedBackingStore impleme
      */
     @Override
     public void putTargetState(String connector, TargetState state) {
-        Struct connectTargetState = new Struct(TARGET_STATE_V1);
-        // Older workers don't support the STOPPED state; fall back on PAUSED
-        connectTargetState.put("state", state == STOPPED ? PAUSED.name() : 
state.name());
-        connectTargetState.put("state.v2", state.name());
-        byte[] serializedTargetState = converter.fromConnectData(topic, 
TARGET_STATE_V1, connectTargetState);
         log.debug("Writing target state {} for connector {}", state, 
connector);
         try {
-            configLog.sendWithReceipt(TARGET_STATE_KEY(connector), 
serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            configLog.sendWithReceipt(TARGET_STATE_KEY(connector), 
serializeTargetState(state))
+                .get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
             log.error("Failed to write target state to Kafka", e);
             throw new ConnectException("Error writing target state to Kafka", 
e);
         }
     }
 
+    private byte[] serializeTargetState(TargetState state) {
+        Struct connectTargetState = new Struct(TARGET_STATE_V1);
+        // Older workers don't support the STOPPED state; fall back on PAUSED
+        connectTargetState.put("state", state == STOPPED ? PAUSED.name() : 
state.name());
+        connectTargetState.put("state.v2", state.name());
+        return converter.fromConnectData(topic, TARGET_STATE_V1, 
connectTargetState);
+    }
+
     /**
      * Write a task count record for a connector to persistent storage and 
wait until it has been acknowledged and read back by
      * tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} 
must be successfully invoked before calling this method
@@ -985,7 +997,9 @@ public class KafkaConfigBackingStore extends 
KafkaTopicBasedBackingStore impleme
 
         // Note that we do not notify the update listener if the target state 
has been removed.
         // Instead we depend on the removal callback of the connector config 
itself to notify the worker.
-        if (started && !removed && stateChanged)
+        // We also don't notify the update listener if the connector doesn't 
exist yet - a scenario that can
+        // occur if an explicit initial target state is specified in the 
connector creation request.
+        if (started && !removed && stateChanged && 
connectorConfigs.containsKey(connectorName))
             updateListener.onConnectorTargetStateChange(connectorName);
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index 52c360c3b33..3b9ba966ca2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -92,12 +92,16 @@ public class MemoryConfigBackingStore implements 
ConfigBackingStore {
     }
 
     @Override
-    public synchronized void putConnectorConfig(String connector, Map<String, 
String> properties) {
+    public synchronized void putConnectorConfig(String connector, Map<String, 
String> properties, TargetState targetState) {
         ConnectorState state = connectors.get(connector);
         if (state == null)
-            connectors.put(connector, new ConnectorState(properties));
-        else
+            connectors.put(connector, new ConnectorState(properties, 
targetState));
+        else {
             state.connConfig = properties;
+            if (targetState != null) {
+                state.targetState = targetState;
+            }
+        }
 
         if (updateListener != null)
             updateListener.onConnectorConfigUpdate(connector);
@@ -184,8 +188,13 @@ public class MemoryConfigBackingStore implements 
ConfigBackingStore {
         private Map<String, String> connConfig;
         private Map<ConnectorTaskId, Map<String, String>> taskConfigs;
 
-        public ConnectorState(Map<String, String> connConfig) {
-            this.targetState = TargetState.STARTED;
+        /**
+         * @param connConfig the connector's configuration
+         * @param targetState the connector's initial {@link TargetState}; may 
be {@code null} in which case the default initial target state
+         * {@link TargetState#STARTED} will be used
+         */
+        public ConnectorState(Map<String, String> connConfig, TargetState 
targetState) {
+            this.targetState = targetState == null ? TargetState.STARTED : 
targetState;
             this.connConfig = connConfig;
             this.taskConfigs = new HashMap<>();
         }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/cli/ConnectStandaloneTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/cli/ConnectStandaloneTest.java
new file mode 100644
index 00000000000..9a762f1a394
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/cli/ConnectStandaloneTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.cli;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ConnectStandaloneTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final Map<String, String> CONNECTOR_CONFIG = new 
HashMap<>();
+    static {
+        CONNECTOR_CONFIG.put(NAME_CONFIG, CONNECTOR_NAME);
+        CONNECTOR_CONFIG.put("key1", "val1");
+        CONNECTOR_CONFIG.put("key2", "val2");
+    }
+
+    private final ConnectStandalone connectStandalone = new 
ConnectStandalone();
+    private File connectorConfigurationFile;
+
+    @Before
+    public void setUp() throws IOException {
+        connectorConfigurationFile = TestUtils.tempFile();
+    }
+
+    @Test
+    public void testParseJavaPropertiesFile() throws Exception {
+        Properties properties = new Properties();
+        CONNECTOR_CONFIG.forEach(properties::setProperty);
+
+        try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+            properties.store(writer, null);
+        }
+
+        CreateConnectorRequest request = 
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+        assertEquals(CONNECTOR_NAME, request.name());
+        assertEquals(CONNECTOR_CONFIG, request.config());
+        assertNull(request.initialState());
+    }
+
+    @Test
+    public void testParseJsonFileWithConnectorConfiguration() throws Exception 
{
+        try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+            writer.write(new 
ObjectMapper().writeValueAsString(CONNECTOR_CONFIG));
+        }
+
+        CreateConnectorRequest request = 
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+        assertEquals(CONNECTOR_NAME, request.name());
+        assertEquals(CONNECTOR_CONFIG, request.config());
+        assertNull(request.initialState());
+    }
+
+    @Test
+    public void testParseJsonFileWithCreateConnectorRequest() throws Exception 
{
+        CreateConnectorRequest requestToWrite = new CreateConnectorRequest(
+            CONNECTOR_NAME,
+            CONNECTOR_CONFIG,
+            CreateConnectorRequest.InitialState.STOPPED
+        );
+
+        try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+            writer.write(new 
ObjectMapper().writeValueAsString(requestToWrite));
+        }
+
+        CreateConnectorRequest parsedRequest = 
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+        assertEquals(requestToWrite, parsedRequest);
+    }
+
+    @Test
+    public void 
testParseJsonFileWithCreateConnectorRequestWithoutInitialState() throws 
Exception {
+        Map<String, Object> requestToWrite = new HashMap<>();
+        requestToWrite.put("name", CONNECTOR_NAME);
+        requestToWrite.put("config", CONNECTOR_CONFIG);
+
+        try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+            writer.write(new 
ObjectMapper().writeValueAsString(requestToWrite));
+        }
+
+        CreateConnectorRequest parsedRequest = 
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+        CreateConnectorRequest expectedRequest = new 
CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG, null);
+        assertEquals(expectedRequest, parsedRequest);
+    }
+
+    @Test
+    public void testParseJsonFileWithCreateConnectorRequestWithUnknownField() 
throws Exception {
+        Map<String, Object> requestToWrite = new HashMap<>();
+        requestToWrite.put("name", CONNECTOR_NAME);
+        requestToWrite.put("config", CONNECTOR_CONFIG);
+        requestToWrite.put("unknown-field", "random-value");
+
+        try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
+            writer.write(new 
ObjectMapper().writeValueAsString(requestToWrite));
+        }
+
+        CreateConnectorRequest parsedRequest = 
connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
+        CreateConnectorRequest expectedRequest = new 
CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG, null);
+        assertEquals(expectedRequest, parsedRequest);
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index cd9b9c05171..8a271603eef 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -16,8 +16,11 @@
  */
 package org.apache.kafka.connect.integration;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
 import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
@@ -49,14 +52,15 @@ import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static 
org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -73,7 +77,7 @@ public class ConnectWorkerIntegrationTest {
     private static final int NUM_WORKERS = 3;
     private static final int NUM_TASKS = 4;
     private static final int MESSAGES_PER_POLL = 10;
-    private static final String CONNECTOR_NAME = "simple-source";
+    private static final String CONNECTOR_NAME = "simple-connector";
     private static final String TOPIC_NAME = "test-topic";
 
     private EmbeddedConnectCluster.Builder connectBuilder;
@@ -529,8 +533,10 @@ public class ConnectWorkerIntegrationTest {
         // start the clusters
         connect.start();
 
-        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
-            "Initial group of workers did not start in time.");
+        connect.assertions().assertAtLeastNumWorkersAreUp(
+            NUM_WORKERS,
+            "Initial group of workers did not start in time."
+        );
 
         connect.configureConnector(CONNECTOR_NAME, 
defaultSourceConnectorProps(TOPIC_NAME));
         connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
@@ -546,10 +552,227 @@ public class ConnectWorkerIntegrationTest {
             assertEquals(Level.WARN.toString(), logEvents.get(0).getLevel());
             assertThat(logEvents.get(0).getMessage(), 
containsString("deprecated"));
         }
+
+    }
+
+    @Test
+    public void testCreateConnectorWithPausedInitialState() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+            "Initial group of workers did not start in time.");
+
+        CreateConnectorRequest createConnectorRequest = new 
CreateConnectorRequest(
+            CONNECTOR_NAME,
+            defaultSourceConnectorProps(TOPIC_NAME),
+            CreateConnectorRequest.InitialState.PAUSED
+        );
+        connect.configureConnector(createConnectorRequest);
+
+        // Verify that the connector's status is PAUSED and also that no tasks 
were spawned for the connector
+        connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+            CONNECTOR_NAME,
+            0,
+            "Connector was not created in a paused state"
+        );
+        assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
+        assertEquals(Collections.emptyList(), 
connect.taskConfigs(CONNECTOR_NAME));
+
+        // Verify that a connector created in the PAUSED state can be resumed 
successfully
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME,
+            NUM_TASKS,
+            "Connector or tasks did not start running healthily in time"
+        );
+    }
+
+    @Test
+    public void 
testCreateSourceConnectorWithStoppedInitialStateAndModifyOffsets() throws 
Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+            "Initial group of workers did not start in time.");
+
+        Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+
+        // Configure the connector to produce a maximum of 10 messages
+        props.put("max.messages", "10");
+        props.put(TASKS_MAX_CONFIG, "1");
+        CreateConnectorRequest createConnectorRequest = new 
CreateConnectorRequest(
+            CONNECTOR_NAME,
+            props,
+            CreateConnectorRequest.InitialState.STOPPED
+        );
+        connect.configureConnector(createConnectorRequest);
+
+        // Verify that the connector's status is STOPPED and also that no 
tasks were spawned for the connector
+        connect.assertions().assertConnectorIsStopped(
+            CONNECTOR_NAME,
+            "Connector was not created in a stopped state"
+        );
+        assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
+        assertEquals(Collections.emptyList(), 
connect.taskConfigs(CONNECTOR_NAME));
+
+        // Verify that the offsets can be modified for a source connector 
created in the STOPPED state
+
+        // Alter the offsets so that only 5 messages are produced
+        connect.alterSourceConnectorOffset(
+            CONNECTOR_NAME,
+            Collections.singletonMap("task.id", CONNECTOR_NAME + "-0"),
+            Collections.singletonMap("saved", 5L)
+        );
+
+        // Verify that a connector created in the STOPPED state can be resumed 
successfully
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME,
+            1,
+            "Connector or tasks did not start running healthily in time"
+        );
+
+        // Verify that only 5 messages were produced. We verify this by 
consuming all the messages from the topic after we've already ensured that at
+        // least 5 messages can be consumed.
+        long timeoutMs = TimeUnit.SECONDS.toMillis(10);
+        connect.kafka().consume(5, timeoutMs, TOPIC_NAME);
+        assertEquals(5, connect.kafka().consumeAll(timeoutMs, 
TOPIC_NAME).count());
+    }
+
+    @Test
+    public void 
testCreateSinkConnectorWithStoppedInitialStateAndModifyOffsets() throws 
Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+            "Initial group of workers did not start in time.");
+
+        // Create topic and produce 10 messages
+        connect.kafka().createTopic(TOPIC_NAME);
+        for (int i = 0; i < 10; i++) {
+            connect.kafka().produce(TOPIC_NAME, "Message " + i);
+        }
+
+        Map<String, String> props = defaultSinkConnectorProps(TOPIC_NAME);
+        props.put(TASKS_MAX_CONFIG, "1");
+
+        CreateConnectorRequest createConnectorRequest = new 
CreateConnectorRequest(
+            CONNECTOR_NAME,
+            props,
+            CreateConnectorRequest.InitialState.STOPPED
+        );
+        connect.configureConnector(createConnectorRequest);
+
+        // Verify that the connector's status is STOPPED and also that no 
tasks were spawned for the connector
+        connect.assertions().assertConnectorIsStopped(
+            CONNECTOR_NAME,
+            "Connector was not created in a stopped state"
+        );
+        assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
+        assertEquals(Collections.emptyList(), 
connect.taskConfigs(CONNECTOR_NAME));
+
+        // Verify that the offsets can be modified for a sink connector 
created in the STOPPED state
+
+        // Alter the offsets so that the first 5 messages in the topic are 
skipped
+        connect.alterSinkConnectorOffset(CONNECTOR_NAME, new 
TopicPartition(TOPIC_NAME, 0), 5L);
+
+        // This will cause the connector task to fail if it encounters a 
record with offset < 5
+        TaskHandle taskHandle = 
RuntimeHandles.get().connectorHandle(CONNECTOR_NAME).taskHandle(CONNECTOR_NAME 
+ "-0",
+            sinkRecord -> {
+                if (sinkRecord.kafkaOffset() < 5L) {
+                    throw new ConnectException("Unexpected record encountered: 
" + sinkRecord);
+                }
+            });
+
+        // We produced 10 records and altered the connector offsets to skip 
over the first 5, so we expect 5 records to be consumed
+        taskHandle.expectedRecords(5);
+
+        // Verify that a connector created in the STOPPED state can be resumed 
successfully
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME,
+            1,
+            "Connector or tasks did not start running healthily in time"
+        );
+
+        taskHandle.awaitRecords(TimeUnit.SECONDS.toMillis(10));
+
+        // Confirm that the task is still running (i.e. it didn't fail due to 
encountering any records with offset < 5)
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME,
+            1,
+            "Connector or tasks did not start running healthily in time"
+        );
+    }
+
+    @Test
+    public void testDeleteConnectorCreatedWithPausedOrStoppedInitialState() 
throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+            "Initial group of workers did not start in time.");
+
+        // Create a connector with PAUSED initial state
+        CreateConnectorRequest createConnectorRequest = new 
CreateConnectorRequest(
+            CONNECTOR_NAME,
+            defaultSourceConnectorProps(TOPIC_NAME),
+            CreateConnectorRequest.InitialState.PAUSED
+        );
+        connect.configureConnector(createConnectorRequest);
+
+        // Verify that the connector's status is PAUSED and also that no tasks 
were spawned for the connector
+        connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+            CONNECTOR_NAME,
+            0,
+            "Connector was not created in a paused state"
+        );
+        assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
+        assertEquals(Collections.emptyList(), 
connect.taskConfigs(CONNECTOR_NAME));
+
+        // Verify that a connector created in the PAUSED state can be deleted 
successfully
+        connect.deleteConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, 
"Connector wasn't deleted in time");
+
+
+        // Create a connector with STOPPED initial state
+        createConnectorRequest = new CreateConnectorRequest(
+            CONNECTOR_NAME,
+            defaultSourceConnectorProps(TOPIC_NAME),
+            CreateConnectorRequest.InitialState.STOPPED
+        );
+        connect.configureConnector(createConnectorRequest);
+
+        // Verify that the connector's status is STOPPED and also that no 
tasks were spawned for the connector
+        connect.assertions().assertConnectorIsStopped(
+            CONNECTOR_NAME,
+            "Connector was not created in a stopped state"
+        );
+        assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
+        assertEquals(Collections.emptyList(), 
connect.taskConfigs(CONNECTOR_NAME));
+
+        // Verify that a connector created in the STOPPED state can be deleted 
successfully
+        connect.deleteConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, 
"Connector wasn't deleted in time");
+    }
+
+    private Map<String, String> defaultSinkConnectorProps(String topics) {
+        // setup props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSinkConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPICS_CONFIG, topics);
+
+        return props;
     }
 
     private Map<String, String> defaultSourceConnectorProps(String topic) {
-        // setup up props for the source connector
+        // setup props for the source connector
         Map<String, String> props = new HashMap<>();
         props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getSimpleName());
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
index 7c9e2f2c51e..0bbad10a57d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
@@ -70,6 +70,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.when;
 
@@ -187,7 +188,7 @@ public class RestForwardingIntegrationTest {
             followerCallbackCaptor.getValue().onCompletion(forwardException, 
null);
             return null;
         }).when(followerHerder)
-                .putConnectorConfig(any(), any(), anyBoolean(), 
followerCallbackCaptor.capture());
+                .putConnectorConfig(any(), any(), isNull(), anyBoolean(), 
followerCallbackCaptor.capture());
 
         // Leader will reply
         ConnectorInfo connectorInfo = new ConnectorInfo("blah", 
Collections.emptyMap(), Collections.emptyList(), ConnectorType.SOURCE);
@@ -197,7 +198,7 @@ public class RestForwardingIntegrationTest {
             leaderCallbackCaptor.getValue().onCompletion(null, leaderAnswer);
             return null;
         }).when(leaderHerder)
-                .putConnectorConfig(any(), any(), anyBoolean(), 
leaderCallbackCaptor.capture());
+                .putConnectorConfig(any(), any(), isNull(), anyBoolean(), 
leaderCallbackCaptor.capture());
 
         // Client makes request to the follower
         URI followerUrl = followerServer.advertisedUrl();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java
index ea938f9a4f6..e47fe4304d3 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java
@@ -17,7 +17,9 @@
 package org.apache.kafka.connect.integration;
 
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
 import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
@@ -26,12 +28,21 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -41,6 +52,10 @@ import static org.junit.Assert.assertTrue;
 @Category(IntegrationTest.class)
 public class StandaloneWorkerIntegrationTest {
 
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final int NUM_TASKS = 4;
+    private static final String TOPIC_NAME = "test-topic";
+
     private EmbeddedConnectStandalone connect;
 
     @Before
@@ -202,4 +217,42 @@ public class StandaloneWorkerIntegrationTest {
         return entry.getValue().level();
     }
 
+    @Test
+    public void testCreateConnectorWithStoppedInitialState() throws Exception {
+        CreateConnectorRequest createConnectorRequest = new 
CreateConnectorRequest(
+            CONNECTOR_NAME,
+            defaultSourceConnectorProps(TOPIC_NAME),
+            CreateConnectorRequest.InitialState.STOPPED
+        );
+        connect.configureConnector(createConnectorRequest);
+
+        // Verify that the connector's status is STOPPED and also that no 
tasks were spawned for the connector
+        connect.assertions().assertConnectorIsStopped(
+            CONNECTOR_NAME,
+            "Connector was not created in a stopped state"
+        );
+        assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
+        assertEquals(Collections.emptyList(), 
connect.taskConfigs(CONNECTOR_NAME));
+
+        // Verify that a connector created in the STOPPED state can be resumed 
successfully
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME,
+            NUM_TASKS,
+            "Connector or tasks did not start running healthily in time"
+        );
+    }
+
+    private Map<String, String> defaultSourceConnectorProps(String topic) {
+        // setup props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put(TOPIC_CONFIG, topic);
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, 
String.valueOf(1));
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, 
String.valueOf(1));
+        return props;
+    }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 4c08ea2a441..5a22e6c10f0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -690,7 +690,7 @@ public class DistributedHerderTest {
         }).when(herder).validateConnectorConfig(eq(CONN2_CONFIG), 
validateCallback.capture());
 
         // CONN2 is new, should succeed
-        doNothing().when(configBackingStore).putConnectorConfig(CONN2, 
CONN2_CONFIG);
+        doNothing().when(configBackingStore).putConnectorConfig(eq(CONN2), 
eq(CONN2_CONFIG), isNull());
 
         // This will occur just before/during the second tick
         doNothing().when(member).ensureActive();
@@ -713,6 +713,51 @@ public class DistributedHerderTest {
         verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore, putConnectorCallback);
     }
 
+    @Test
+    public void testCreateConnectorWithInitialState() throws Exception {
+        when(member.memberId()).thenReturn("leader");
+        when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+        doNothing().when(member).poll(anyLong());
+
+        // Initial rebalance where this member becomes the leader
+        herder.tick();
+
+        // mock the actual validation since its asynchronous nature is 
difficult to test and should
+        // be covered sufficiently by the unit tests for the AbstractHerder 
class
+        ArgumentCaptor<Callback<ConfigInfos>> validateCallback = 
ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            validateCallback.getValue().onCompletion(null, CONN2_CONFIG_INFOS);
+            return null;
+        }).when(herder).validateConnectorConfig(eq(CONN2_CONFIG), 
validateCallback.capture());
+
+        // CONN2 is new, should succeed
+        doNothing().when(configBackingStore).putConnectorConfig(eq(CONN2), 
eq(CONN2_CONFIG), eq(TargetState.STOPPED));
+
+        // This will occur just before/during the second tick
+        doNothing().when(member).ensureActive();
+
+        // No immediate action besides this -- change will be picked up via 
the config log
+
+        herder.putConnectorConfig(CONN2, CONN2_CONFIG, TargetState.STOPPED, 
false, putConnectorCallback);
+        // This tick runs the initial herder request, which issues an 
asynchronous request for
+        // connector validation
+        herder.tick();
+
+        // Once that validation is complete, another request is added to the 
herder request queue
+        // for actually performing the config write; this tick is for that 
request
+        herder.tick();
+        time.sleep(1000L);
+        assertStatistics(3, 1, 100, 1000L);
+
+        ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, 
Collections.emptyList(), ConnectorType.SOURCE);
+        verify(putConnectorCallback).onCompletion(isNull(), eq(new 
Herder.Created<>(true, info)));
+        verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore, putConnectorCallback);
+    }
+
     @Test
     public void testCreateConnectorConfigBackingStoreError() {
         when(member.memberId()).thenReturn("leader");
@@ -735,7 +780,7 @@ public class DistributedHerderTest {
         }).when(herder).validateConnectorConfig(eq(CONN2_CONFIG), 
validateCallback.capture());
 
         doThrow(new ConnectException("Error writing connector configuration to 
Kafka"))
-                .when(configBackingStore).putConnectorConfig(CONN2, 
CONN2_CONFIG);
+                .when(configBackingStore).putConnectorConfig(eq(CONN2), 
eq(CONN2_CONFIG), isNull());
 
         // This will occur just before/during the second tick
         doNothing().when(member).ensureActive();
@@ -2184,7 +2229,7 @@ public class DistributedHerderTest {
             // Simulate response to writing config + waiting until end of log 
to be read
             configUpdateListener.onConnectorConfigUpdate(CONN1);
             return null;
-        }).when(configBackingStore).putConnectorConfig(eq(CONN1), 
eq(CONN1_CONFIG_UPDATED));
+        }).when(configBackingStore).putConnectorConfig(eq(CONN1), 
eq(CONN1_CONFIG_UPDATED), isNull());
 
         // As a result of reconfig, should need to update snapshot. With only 
connector updates, we'll just restart
         // connector without rebalance
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequestTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequestTest.java
new file mode 100644
index 00000000000..1d32479f82c
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequestTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import org.apache.kafka.connect.runtime.TargetState;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class CreateConnectorRequestTest {
+
+    @Test
+    public void testToTargetState() {
+        assertEquals(TargetState.STARTED, 
CreateConnectorRequest.InitialState.RUNNING.toTargetState());
+        assertEquals(TargetState.PAUSED, 
CreateConnectorRequest.InitialState.PAUSED.toTargetState());
+        assertEquals(TargetState.STOPPED, 
CreateConnectorRequest.InitialState.STOPPED.toTargetState());
+
+        CreateConnectorRequest createConnectorRequest = new 
CreateConnectorRequest("test-name", Collections.emptyMap(), null);
+        assertNull(createConnectorRequest.initialTargetState());
+    }
+
+    @Test
+    public void testForValue() {
+        assertEquals(CreateConnectorRequest.InitialState.RUNNING, 
CreateConnectorRequest.InitialState.forValue("running"));
+        assertEquals(CreateConnectorRequest.InitialState.RUNNING, 
CreateConnectorRequest.InitialState.forValue("Running"));
+        assertEquals(CreateConnectorRequest.InitialState.RUNNING, 
CreateConnectorRequest.InitialState.forValue("RUNNING"));
+
+        assertEquals(CreateConnectorRequest.InitialState.PAUSED, 
CreateConnectorRequest.InitialState.forValue("paused"));
+        assertEquals(CreateConnectorRequest.InitialState.PAUSED, 
CreateConnectorRequest.InitialState.forValue("Paused"));
+        assertEquals(CreateConnectorRequest.InitialState.PAUSED, 
CreateConnectorRequest.InitialState.forValue("PAUSED"));
+
+        assertEquals(CreateConnectorRequest.InitialState.STOPPED, 
CreateConnectorRequest.InitialState.forValue("stopped"));
+        assertEquals(CreateConnectorRequest.InitialState.STOPPED, 
CreateConnectorRequest.InitialState.forValue("Stopped"));
+        assertEquals(CreateConnectorRequest.InitialState.STOPPED, 
CreateConnectorRequest.InitialState.forValue("STOPPED"));
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 0183e251600..aed081cf4d6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.connect.runtime.AbstractStatus;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.RestartRequest;
+import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
 import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
@@ -272,23 +273,64 @@ public class ConnectorsResourceTest {
 
     @Test
     public void testCreateConnector() throws Throwable {
-        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME, 
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME,
+            Collections.singletonMap(ConnectorConfig.NAME_CONFIG, 
CONNECTOR_NAME), null);
 
         final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = 
ArgumentCaptor.forClass(Callback.class);
         expectAndCallbackResult(cb, new Herder.Created<>(true, new 
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
             CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
-        ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), 
eq(body.config()), eq(false), cb.capture());
+        ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), 
eq(body.config()), isNull(), eq(false), cb.capture());
+
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
+    }
+
+    @Test
+    public void testCreateConnectorWithPausedInitialState() throws Throwable {
+        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME,
+            Collections.singletonMap(ConnectorConfig.NAME_CONFIG, 
CONNECTOR_NAME), CreateConnectorRequest.InitialState.PAUSED);
+
+        final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = 
ArgumentCaptor.forClass(Callback.class);
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new 
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
+            CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
+        ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), 
eq(body.config()), eq(TargetState.PAUSED), eq(false), cb.capture());
+
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
+    }
+
+    @Test
+    public void testCreateConnectorWithStoppedInitialState() throws Throwable {
+        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME,
+            Collections.singletonMap(ConnectorConfig.NAME_CONFIG, 
CONNECTOR_NAME), CreateConnectorRequest.InitialState.STOPPED);
+
+        final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = 
ArgumentCaptor.forClass(Callback.class);
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new 
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
+            CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
+        ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), 
eq(body.config()), eq(TargetState.STOPPED), eq(false), cb.capture());
+
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
+    }
+
+    @Test
+    public void testCreateConnectorWithRunningInitialState() throws Throwable {
+        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME,
+            Collections.singletonMap(ConnectorConfig.NAME_CONFIG, 
CONNECTOR_NAME), CreateConnectorRequest.InitialState.RUNNING);
+
+        final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = 
ArgumentCaptor.forClass(Callback.class);
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new 
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
+            CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
+        ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), 
eq(body.config()), eq(TargetState.STARTED), eq(false), cb.capture());
 
         connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
     }
 
     @Test
     public void testCreateConnectorNotLeader() throws Throwable {
-        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME, 
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME,
+            Collections.singletonMap(ConnectorConfig.NAME_CONFIG, 
CONNECTOR_NAME), null);
 
         final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = 
ArgumentCaptor.forClass(Callback.class);
         expectAndCallbackNotLeaderException(cb).when(herder)
-            .putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), 
eq(false), cb.capture());
+            .putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), 
isNull(), eq(false), cb.capture());
 
         when(restClient.httpRequest(eq(LEADER_URL + 
"connectors?forward=false"), eq("POST"), isNull(), eq(body), any()))
                 .thenReturn(new RestClient.HttpResponse<>(201, new 
HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, 
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
@@ -297,11 +339,12 @@ public class ConnectorsResourceTest {
 
     @Test
     public void testCreateConnectorWithHeaders() throws Throwable {
-        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME, 
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME,
+            Collections.singletonMap(ConnectorConfig.NAME_CONFIG, 
CONNECTOR_NAME), null);
         final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = 
ArgumentCaptor.forClass(Callback.class);
         HttpHeaders httpHeaders = mock(HttpHeaders.class);
         expectAndCallbackNotLeaderException(cb)
-            .when(herder).putConnectorConfig(eq(CONNECTOR_NAME), 
eq(body.config()), eq(false), cb.capture());
+            .when(herder).putConnectorConfig(eq(CONNECTOR_NAME), 
eq(body.config()), isNull(), eq(false), cb.capture());
 
         when(restClient.httpRequest(eq(LEADER_URL + 
"connectors?forward=false"), eq("POST"), eq(httpHeaders), any(), any()))
                 .thenReturn(new RestClient.HttpResponse<>(202, new 
HashMap<>(), null));
@@ -310,11 +353,12 @@ public class ConnectorsResourceTest {
 
     @Test
     public void testCreateConnectorExists() {
-        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME, 
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME,
+            Collections.singletonMap(ConnectorConfig.NAME_CONFIG, 
CONNECTOR_NAME), null);
 
         final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = 
ArgumentCaptor.forClass(Callback.class);
         expectAndCallbackException(cb, new AlreadyExistsException("already 
exists"))
-            .when(herder).putConnectorConfig(eq(CONNECTOR_NAME), 
eq(body.config()), eq(false), cb.capture());
+            .when(herder).putConnectorConfig(eq(CONNECTOR_NAME), 
eq(body.config()), isNull(), eq(false), cb.capture());
         assertThrows(AlreadyExistsException.class, () -> 
connectorsResource.createConnector(FORWARD, NULL_HEADERS, body));
     }
 
@@ -323,13 +367,13 @@ public class ConnectorsResourceTest {
         // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes 
it (puts the name in it) and this
         // will affect later tests
         Map<String, String> inputConfig = 
getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
-        final CreateConnectorRequest bodyIn = new 
CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES, inputConfig);
-        final CreateConnectorRequest bodyOut = new 
CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG);
+        final CreateConnectorRequest bodyIn = new 
CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES, inputConfig, null);
+        final CreateConnectorRequest bodyOut = new 
CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG, null);
 
         final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = 
ArgumentCaptor.forClass(Callback.class);
         expectAndCallbackResult(cb, new Herder.Created<>(true, new 
ConnectorInfo(bodyOut.name(), bodyOut.config(),
             CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
-        ).when(herder).putConnectorConfig(eq(bodyOut.name()), 
eq(bodyOut.config()), eq(false), cb.capture());
+        ).when(herder).putConnectorConfig(eq(bodyOut.name()), 
eq(bodyOut.config()), isNull(), eq(false), cb.capture());
 
         connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
     }
@@ -339,13 +383,13 @@ public class ConnectorsResourceTest {
         // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes 
it (puts the name in it) and this
         // will affect later tests
         Map<String, String> inputConfig = 
getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
-        final CreateConnectorRequest bodyIn = new 
CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES, inputConfig);
-        final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", 
CONNECTOR_CONFIG_WITH_EMPTY_NAME);
+        final CreateConnectorRequest bodyIn = new 
CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES, inputConfig, null);
+        final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", 
CONNECTOR_CONFIG_WITH_EMPTY_NAME, null);
 
         final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = 
ArgumentCaptor.forClass(Callback.class);
         expectAndCallbackResult(cb, new Herder.Created<>(true, new 
ConnectorInfo(bodyOut.name(), bodyOut.config(),
             CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
-        ).when(herder).putConnectorConfig(eq(bodyOut.name()), 
eq(bodyOut.config()), eq(false), cb.capture());
+        ).when(herder).putConnectorConfig(eq(bodyOut.name()), 
eq(bodyOut.config()), isNull(), eq(false), cb.capture());
 
         connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
     }
@@ -355,13 +399,13 @@ public class ConnectorsResourceTest {
         // Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes 
it (puts the name in it) and this
         // will affect later tests
         Map<String, String> inputConfig = 
getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
-        final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null, 
inputConfig);
-        final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", 
CONNECTOR_CONFIG_WITH_EMPTY_NAME);
+        final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null, 
inputConfig, null);
+        final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", 
CONNECTOR_CONFIG_WITH_EMPTY_NAME, null);
 
         final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = 
ArgumentCaptor.forClass(Callback.class);
         expectAndCallbackResult(cb, new Herder.Created<>(true, new 
ConnectorInfo(bodyOut.name(), bodyOut.config(),
             CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
-        ).when(herder).putConnectorConfig(eq(bodyOut.name()), 
eq(bodyOut.config()), eq(false), cb.capture());
+        ).when(herder).putConnectorConfig(eq(bodyOut.name()), 
eq(bodyOut.config()), isNull(), eq(false), cb.capture());
 
         connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
     }
@@ -476,12 +520,13 @@ public class ConnectorsResourceTest {
 
     @Test
     public void testCreateConnectorWithSpecialCharsInName() throws Throwable {
-        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME_SPECIAL_CHARS, 
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, 
CONNECTOR_NAME_SPECIAL_CHARS));
+        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME_SPECIAL_CHARS,
+            Collections.singletonMap(ConnectorConfig.NAME_CONFIG, 
CONNECTOR_NAME_SPECIAL_CHARS), null);
 
         final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = 
ArgumentCaptor.forClass(Callback.class);
         expectAndCallbackResult(cb, new Herder.Created<>(true, new 
ConnectorInfo(CONNECTOR_NAME_SPECIAL_CHARS, CONNECTOR_CONFIG,
             CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
-        ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_SPECIAL_CHARS), 
eq(body.config()), eq(false), cb.capture());
+        ).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_SPECIAL_CHARS), 
eq(body.config()), isNull(), eq(false), cb.capture());
 
         String rspLocation = connectorsResource.createConnector(FORWARD, 
NULL_HEADERS, body).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
@@ -490,12 +535,13 @@ public class ConnectorsResourceTest {
 
     @Test
     public void testCreateConnectorWithControlSequenceInName() throws 
Throwable {
-        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME_CONTROL_SEQUENCES1, 
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, 
CONNECTOR_NAME_CONTROL_SEQUENCES1));
+        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME_CONTROL_SEQUENCES1,
+            Collections.singletonMap(ConnectorConfig.NAME_CONFIG, 
CONNECTOR_NAME_CONTROL_SEQUENCES1), null);
 
         final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = 
ArgumentCaptor.forClass(Callback.class);
         expectAndCallbackResult(cb, new Herder.Created<>(true, new 
ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1, CONNECTOR_CONFIG,
             CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
-        
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_CONTROL_SEQUENCES1), 
eq(body.config()), eq(false), cb.capture());
+        
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_CONTROL_SEQUENCES1), 
eq(body.config()), isNull(), eq(false), cb.capture());
 
         String rspLocation = connectorsResource.createConnector(FORWARD, 
NULL_HEADERS, body).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
@@ -540,7 +586,7 @@ public class ConnectorsResourceTest {
     public void testCreateConnectorConfigNameMismatch() {
         Map<String, String> connConfig = new HashMap<>();
         connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
-        CreateConnectorRequest request = new 
CreateConnectorRequest(CONNECTOR_NAME, connConfig);
+        CreateConnectorRequest request = new 
CreateConnectorRequest(CONNECTOR_NAME, connConfig, null);
         assertThrows(BadRequestException.class, () -> 
connectorsResource.createConnector(FORWARD, NULL_HEADERS, request));
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index d9c64f5e36c..7e4126aa141 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -247,6 +247,37 @@ public class StandaloneHerderTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testCreateConnectorWithStoppedInitialState() throws Exception {
+        connector = PowerMock.createMock(BogusSinkConnector.class);
+        Map<String, String> config = connectorConfig(SourceSink.SINK);
+        Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+        expectConfigValidation(connectorMock, false, config);
+        
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
+
+        // Only the connector should be created; we expect no tasks to be 
spawned for a connector created with a paused or stopped initial state
+        Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
+        worker.startConnector(eq(CONNECTOR_NAME), eq(config), 
EasyMock.anyObject(HerderConnectorContext.class),
+            eq(herder), eq(TargetState.STOPPED), EasyMock.capture(onStart));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            onStart.getValue().onCompletion(null, TargetState.STOPPED);
+            return true;
+        });
+        
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true).anyTimes();
+        
EasyMock.expect(herder.connectorType(anyObject())).andReturn(ConnectorType.SINK);
+
+        PowerMock.replayAll();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, config, TargetState.STOPPED, 
false, createCallback);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        assertEquals(
+            new ConnectorInfo(CONNECTOR_NAME, 
connectorConfig(SourceSink.SINK), Collections.emptyList(), ConnectorType.SINK),
+            connectorInfo.result()
+        );
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testDestroyConnector() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 59420e8faf0..a0322414951 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -24,11 +24,11 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Field;
@@ -77,12 +77,12 @@ import static 
org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
+import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
 import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG;
 import static 
org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME;
 import static 
org.apache.kafka.connect.storage.KafkaConfigBackingStore.ONLY_FAILED_FIELD_NAME;
 import static 
org.apache.kafka.connect.storage.KafkaConfigBackingStore.READ_WRITE_TOTAL_TIMEOUT_MS;
 import static 
org.apache.kafka.connect.storage.KafkaConfigBackingStore.RESTART_KEY;
-import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
@@ -177,6 +177,10 @@ public class KafkaConfigBackingStoreTest {
             "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), 
"config-bytes-9".getBytes()
     );
 
+    private static final List<byte[]> TARGET_STATES_SERIALIZED = Arrays.asList(
+        "started".getBytes(), "paused".getBytes(), "stopped".getBytes()
+    );
+
     @Mock
     private Converter converter;
     @Mock
@@ -320,14 +324,14 @@ public class KafkaConfigBackingStoreTest {
         assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
 
         // Writing should block until it is written and read back from Kafka
-        configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0));
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0), null);
         configState = configStorage.snapshot();
         assertEquals(1, configState.offset());
         assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
         assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
 
         // Second should also block and all configs should still be available
-        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), 
SAMPLE_CONFIGS.get(1));
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), 
SAMPLE_CONFIGS.get(1), null);
         configState = configStorage.snapshot();
         assertEquals(2, configState.offset());
         assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
@@ -346,6 +350,55 @@ public class KafkaConfigBackingStoreTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testPutConnectorConfigWithTargetState() throws Exception {
+        expectConfigure();
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+
+        // We expect to write the target state first, followed by the config 
write and then a read to end
+
+        expectConvertWriteRead(
+            TARGET_STATE_KEYS.get(0), KafkaConfigBackingStore.TARGET_STATE_V1, 
TARGET_STATES_SERIALIZED.get(2),
+            "state.v2", TargetState.STOPPED.name());
+        // We don't expect the config update listener's 
onConnectorTargetStateChange hook to be invoked
+
+        expectConvertWriteRead(
+            CONNECTOR_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+            "properties", SAMPLE_CONFIGS.get(0));
+        configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
+        EasyMock.expectLastCall();
+
+        LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
+        recordsToRead.put(TARGET_STATE_KEYS.get(0), 
TARGET_STATES_SERIALIZED.get(2));
+        recordsToRead.put(CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
+        expectReadToEnd(recordsToRead);
+
+        expectPartitionCount(1);
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        configStorage.start();
+
+        // Null before writing
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.targetState(CONNECTOR_IDS.get(0)));
+
+        // Writing should block until it is written and read back from Kafka
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0), TargetState.STOPPED);
+        configState = configStorage.snapshot();
+        assertEquals(2, configState.offset());
+        assertEquals(TargetState.STOPPED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testPutConnectorConfigProducerError() throws Exception {
         expectConfigure();
@@ -373,7 +426,8 @@ public class KafkaConfigBackingStoreTest {
         assertEquals(0, configState.connectors().size());
 
         // verify that the producer exception from KafkaBasedLog::send is 
propagated
-        ConnectException e = assertThrows(ConnectException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0)));
+        ConnectException e = assertThrows(ConnectException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
+            SAMPLE_CONFIGS.get(0), null));
         assertTrue(e.getMessage().contains("Error writing connector 
configuration to Kafka"));
         configStorage.stop();
 
@@ -505,16 +559,16 @@ public class KafkaConfigBackingStoreTest {
         configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6);
 
         // Should fail again when we get fenced out
-        assertThrows(PrivilegedWriteException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0)));
+        assertThrows(PrivilegedWriteException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), 
null));
         // Should fail if we retry without reclaiming write privileges
-        assertThrows(IllegalStateException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0)));
+        assertThrows(IllegalStateException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), 
null));
 
         // Should succeed even without write privileges (target states can be 
written by anyone)
         configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED);
 
         // Should succeed if we re-claim write privileges
         configStorage.claimWritePrivileges();
-        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), 
SAMPLE_CONFIGS.get(0));
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), 
SAMPLE_CONFIGS.get(0), null);
 
         configStorage.stop();
 
@@ -891,7 +945,6 @@ public class KafkaConfigBackingStoreTest {
         expectRead(serializedAfterStartup, deserializedAfterStartup);
 
         
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
-        
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(1));
         EasyMock.expectLastCall();
 
         expectPartitionCount(1);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java
index 3e449b44cf9..185cb604cbf 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryConfigBackingStoreTest.java
@@ -40,6 +40,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -66,7 +67,7 @@ public class MemoryConfigBackingStoreTest {
 
     @Test
     public void testPutConnectorConfig() {
-        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0));
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0), null);
         ClusterConfigState configState = configStore.snapshot();
 
         assertTrue(configState.contains(CONNECTOR_IDS.get(0)));
@@ -78,9 +79,24 @@ public class MemoryConfigBackingStoreTest {
         
verify(configUpdateListener).onConnectorConfigUpdate(eq(CONNECTOR_IDS.get(0)));
     }
 
+    @Test
+    public void testPutConnectorConfigWithTargetState() {
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0), TargetState.PAUSED);
+        ClusterConfigState configState = configStore.snapshot();
+
+        assertTrue(configState.contains(CONNECTOR_IDS.get(0)));
+        assertEquals(TargetState.PAUSED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertEquals(1, configState.connectors().size());
+
+        
verify(configUpdateListener).onConnectorConfigUpdate(eq(CONNECTOR_IDS.get(0)));
+        // onConnectorTargetStateChange hook shouldn't be called when a 
connector is created with a specific initial target state
+        verify(configUpdateListener, 
never()).onConnectorTargetStateChange(eq(CONNECTOR_IDS.get(0)));
+    }
+
     @Test
     public void testPutConnectorConfigUpdateExisting() {
-        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0));
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0), null);
         ClusterConfigState configState = configStore.snapshot();
 
         assertTrue(configState.contains(CONNECTOR_IDS.get(0)));
@@ -89,7 +105,7 @@ public class MemoryConfigBackingStoreTest {
         assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
         assertEquals(1, configState.connectors().size());
 
-        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(1));
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(1), null);
         configState = configStore.snapshot();
         assertEquals(SAMPLE_CONFIGS.get(1), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
 
@@ -98,8 +114,8 @@ public class MemoryConfigBackingStoreTest {
 
     @Test
     public void testRemoveConnectorConfig() {
-        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0));
-        configStore.putConnectorConfig(CONNECTOR_IDS.get(1), 
SAMPLE_CONFIGS.get(1));
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0), null);
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(1), 
SAMPLE_CONFIGS.get(1), null);
         ClusterConfigState configState = configStore.snapshot();
 
         Set<String> expectedConnectors = new HashSet<>();
@@ -124,7 +140,7 @@ public class MemoryConfigBackingStoreTest {
         assertThrows(IllegalArgumentException.class,
             () -> configStore.putTaskConfigs(CONNECTOR_IDS.get(0), 
Collections.singletonList(SAMPLE_CONFIGS.get(1))));
 
-        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0));
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0), null);
         configStore.putTaskConfigs(CONNECTOR_IDS.get(0), 
Collections.singletonList(SAMPLE_CONFIGS.get(1)));
         ClusterConfigState configState = configStore.snapshot();
 
@@ -151,7 +167,7 @@ public class MemoryConfigBackingStoreTest {
             return null;
         }).when(configUpdateListener).onTaskConfigUpdate(anySet());
 
-        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0));
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0), null);
         configStore.putTaskConfigs(CONNECTOR_IDS.get(0), 
Collections.singletonList(SAMPLE_CONFIGS.get(1)));
         configStore.removeTaskConfigs(CONNECTOR_IDS.get(0));
         ClusterConfigState configState = configStore.snapshot();
@@ -171,7 +187,7 @@ public class MemoryConfigBackingStoreTest {
         // Can't write target state for non-existent connector
         assertThrows(IllegalArgumentException.class, () -> 
configStore.putTargetState(CONNECTOR_IDS.get(0), TargetState.PAUSED));
 
-        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0));
+        configStore.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0), null);
         configStore.putTargetState(CONNECTOR_IDS.get(0), TargetState.PAUSED);
         // Ensure that 
ConfigBackingStore.UpdateListener::onConnectorTargetStateChange is called only 
once if the same state is written twice
         configStore.putTargetState(CONNECTOR_IDS.get(0), TargetState.PAUSED);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
index 3b8c504e60e..147e435adf6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
 import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@@ -187,7 +188,7 @@ abstract class EmbeddedConnect {
      *
      * @param connName   the name of the connector
      * @param connConfig the intended configuration
-     * @throws ConnectRestException if the REST api returns error status
+     * @throws ConnectRestException if the REST API returns error status
      * @throws ConnectException if the configuration fails to be serialized or 
if the request could not be sent
      */
     public String configureConnector(String connName, Map<String, String> 
connConfig) {
@@ -195,6 +196,36 @@ abstract class EmbeddedConnect {
         return putConnectorConfig(url, connConfig);
     }
 
+    /**
+     * Configure a new connector using the <strong><em>POST 
/connectors</em></strong> endpoint. If the connector already exists, a
+     * {@link ConnectRestException} will be thrown.
+     *
+     * @param createConnectorRequest the connector creation request
+     * @throws ConnectRestException if the REST API returns error status
+     * @throws ConnectException if the request could not be sent
+     */
+    public String configureConnector(CreateConnectorRequest 
createConnectorRequest) {
+        String url = endpointForResource("connectors");
+        ObjectMapper objectMapper = new ObjectMapper();
+
+        String requestBody;
+        try {
+            requestBody = 
objectMapper.writeValueAsString(createConnectorRequest);
+        } catch (IOException e) {
+            throw new ConnectException("Failed to serialize connector creation 
request: " + createConnectorRequest);
+        }
+
+        Response response = requestPost(url, requestBody, 
Collections.emptyMap());
+        if (response.getStatus() < 
Response.Status.BAD_REQUEST.getStatusCode()) {
+            return responseToString(response);
+        } else {
+            throw new ConnectRestException(
+                response.getStatus(),
+                "Could not execute 'POST /connectors' request. Error response: 
" + responseToString(response)
+            );
+        }
+    }
+
     /**
      * Validate a given connector configuration. If the configuration 
validates or
      * has a configuration error, an instance of {@link ConfigInfos} is 
returned. If the validation fails
diff --git a/docs/connect.html b/docs/connect.html
index 16f268e78b0..10b423aa380 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -41,7 +41,7 @@
     <p>In standalone mode all work is performed in a single process. This 
configuration is simpler to setup and get started with and may be useful in 
situations where only one worker makes sense (e.g. collecting log files), but 
it does not benefit from some of the features of Kafka Connect such as fault 
tolerance. You can start a standalone process with the following command:</p>
 
     <pre class="brush: bash;">
-&gt; bin/connect-standalone.sh config/connect-standalone.properties 
[connector1.properties connector2.properties ...]</pre>
+&gt; bin/connect-standalone.sh config/connect-standalone.properties 
[connector1.properties connector2.json ...]</pre>
 
     <p>The first parameter is the configuration for the worker. This includes 
settings such as the Kafka connection parameters, serialization format, and how 
frequently to commit offsets. The provided example should work well with a 
local cluster running with the default configuration provided by 
<code>config/server.properties</code>. It will require tweaking to use with a 
different configuration or production deployment. All workers (both standalone 
and distributed) require a few configs:</p>
     <ul>
@@ -60,7 +60,7 @@
     
     <p>Starting with 2.3.0, client configuration overrides can be configured 
individually per connector by using the prefixes 
<code>producer.override.</code> and <code>consumer.override.</code> for Kafka 
sources or Kafka sinks respectively. These overrides are included with the rest 
of the connector's configuration properties.</p>
 
-    <p>The remaining parameters are connector configuration files. You may 
include as many as you want, but all will execute within the same process (on 
different threads). You can also choose not to specify any connector 
configuration files on the command line, and instead use the REST API to create 
connectors at runtime after your standalone worker starts.</p>
+    <p>The remaining parameters are connector configuration files. Each file 
may either be a Java Properties file or a JSON file containing an object with 
the same structure as the request body of either the <code>POST 
/connectors</code> endpoint or the <code>PUT /connectors/{name}/config</code> 
endpoint (see the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI 
documentation</a>). You may include as many as you want, but all will execute 
within the same process (on different th [...]
 
     <p>Distributed mode handles automatic balancing of work, allows you to 
scale up (or down) dynamically, and offers fault tolerance both in the active 
tasks and for configuration and offset commit data. Execution is very similar 
to standalone mode:</p>
 
@@ -293,7 +293,7 @@ listeners=http://localhost:8080,https://localhost:8443</pre>
 
     <ul>
         <li><code>GET /connectors</code> - return a list of active 
connectors</li>
-        <li><code>POST /connectors</code> - create a new connector; the 
request body should be a JSON object containing a string <code>name</code> 
field and an object <code>config</code> field with the connector configuration 
parameters</li>
+        <li><code>POST /connectors</code> - create a new connector; the 
request body should be a JSON object containing a string <code>name</code> 
field and an object <code>config</code> field with the connector configuration 
parameters. The JSON object may also optionally contain a string 
<code>initial_state</code> field which can take the following values - 
<code>STOPPED</code>, <code>PAUSED</code> or <code>RUNNING</code> (the default 
value)</li>
         <li><code>GET /connectors/{name}</code> - get information about a 
specific connector</li>
         <li><code>GET /connectors/{name}/config</code> - get the configuration 
parameters for a specific connector</li>
         <li><code>PUT /connectors/{name}/config</code> - update the 
configuration parameters for a specific connector</li>

Reply via email to