This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e49a5a265fd KAFKA-14783 (KIP-875): New STOPPED state for connectors 
(#13424)
e49a5a265fd is described below

commit e49a5a265fd2d60d197b940b7c2a6867f7b90cb1
Author: Chris Egerton <[email protected]>
AuthorDate: Tue Apr 11 06:37:26 2023 -0700

    KAFKA-14783 (KIP-875): New STOPPED state for connectors (#13424)
    
    Reviewers: Mickael Maison <[email protected]>, Yash Mayya 
<[email protected]>, Greg Harris <[email protected]>
---
 .../kafka/connect/runtime/AbstractHerder.java      |   6 +
 .../kafka/connect/runtime/AbstractStatus.java      |   3 +-
 .../kafka/connect/runtime/ConnectorStatus.java     |   6 +
 .../org/apache/kafka/connect/runtime/Herder.java   |  14 ++
 .../apache/kafka/connect/runtime/StateTracker.java |  19 +-
 .../apache/kafka/connect/runtime/TargetState.java  |   6 +-
 .../kafka/connect/runtime/WorkerConnector.java     |  70 +++++--
 .../kafka/connect/runtime/WorkerMetricsGroup.java  |   5 +
 .../apache/kafka/connect/runtime/WorkerTask.java   |  26 ++-
 .../runtime/distributed/DistributedHerder.java     | 133 +++++++++-----
 .../runtime/rest/resources/ConnectorsResource.java |  13 ++
 .../runtime/standalone/StandaloneHerder.java       |  16 +-
 .../connect/storage/KafkaConfigBackingStore.java   |  33 +++-
 .../integration/ConnectWorkerIntegrationTest.java  | 175 ++++++++++++++++++
 .../ConnectorTopicsIntegrationTest.java            |   4 +-
 .../integration/ErrorHandlingIntegrationTest.java  |   4 +-
 .../integration/MonitorableSourceConnector.java    |   3 +
 .../RebalanceSourceConnectorsIntegrationTest.java  |   2 +-
 .../kafka/connect/runtime/WorkerConnectorTest.java | 170 +++++++++++++++--
 .../runtime/distributed/DistributedHerderTest.java | 203 +++++++++++++++++++++
 .../runtime/standalone/StandaloneHerderTest.java   |  55 ++++++
 .../storage/KafkaConfigBackingStoreTest.java       |  56 ++++--
 .../util/clusters/EmbeddedConnectCluster.java      |  65 +++++++
 .../clusters/EmbeddedConnectClusterAssertions.java |  67 ++++++-
 gradle/spotbugs-exclude.xml                        |   1 -
 25 files changed, 1037 insertions(+), 118 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 125b93dab73..84bd0f81108 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -171,6 +171,12 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
                 workerId, generation()));
     }
 
+    @Override
+    public void onStop(String connector) {
+        statusBackingStore.put(new ConnectorStatus(connector, 
AbstractStatus.State.STOPPED,
+                workerId, generation()));
+    }
+
     @Override
     public void onPause(String connector) {
         statusBackingStore.put(new ConnectorStatus(connector, 
ConnectorStatus.State.PAUSED,
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
index c5e07029a42..76036d610d7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
@@ -25,8 +25,9 @@ public abstract class AbstractStatus<T> {
         RUNNING,
         PAUSED,
         FAILED,
-        DESTROYED,
+        DESTROYED, // Never visible to users; destroyed Connector and Task 
instances are not shown
         RESTARTING,
+        STOPPED, // Only ever visible to users for Connector instances; never 
for Task instances
     }
 
     private final T id;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
index 6da4a7db0fe..10ed188cdf8 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
@@ -45,6 +45,12 @@ public class ConnectorStatus extends AbstractStatus<String> {
          */
         void onFailure(String connector, Throwable cause);
 
+        /**
+         * Invoked when the connector is stopped through the REST API
+         * @param connector The connector name
+         */
+        void onStop(String connector);
+
         /**
          * Invoked when the connector is paused through the REST API
          * @param connector The connector name
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 007b0d190c2..099a012be3f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -246,9 +246,23 @@ public interface Herder {
      */
     void restartConnectorAndTasks(RestartRequest request, 
Callback<ConnectorStateInfo> cb);
 
+    /**
+     * Stop the connector. This call will asynchronously suspend processing by 
the connector and
+     * shut down all of its tasks.
+     * @param connector name of the connector
+     * @param cb callback to invoke upon completion
+     */
+    void stopConnector(String connector, Callback<Void> cb);
+
     /**
      * Pause the connector. This call will asynchronously suspend processing 
by the connector and all
      * of its tasks.
+     * <p>
+     * Note that, unlike {@link #stopConnector(String, Callback)}, tasks for 
this connector will not
+     * be shut down and none of their resources will be de-allocated. Instead, 
they will be left in an
+     * "idling" state where no data is polled from them (if source tasks) or 
given to them (if sink tasks),
+     * but all internal state kept by the tasks and their resources is left 
intact and ready to begin
+     * processing records again as soon as the connector is {@link 
#resumeConnector(String) resumed}.
      * @param connector name of the connector
      */
     void pauseConnector(String connector);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
index 297d47384c7..23ac3eac32f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
@@ -73,6 +73,7 @@ public class StateTracker {
         private final long unassignedTotalTimeMs;
         private final long runningTotalTimeMs;
         private final long pausedTotalTimeMs;
+        private final long stoppedTotalTimeMs;
         private final long failedTotalTimeMs;
         private final long destroyedTotalTimeMs;
         private final long restartingTotalTimeMs;
@@ -81,16 +82,17 @@ public class StateTracker {
          * The initial StateChange instance before any state has changed.
          */
         StateChange() {
-            this(null, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
+            this(null, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
         }
 
-        StateChange(State state, long startTime, long unassignedTotalTimeMs, 
long runningTotalTimeMs,
-                            long pausedTotalTimeMs, long failedTotalTimeMs, 
long destroyedTotalTimeMs, long restartingTotalTimeMs) {
+        StateChange(State state, long startTime, long unassignedTotalTimeMs, 
long runningTotalTimeMs, long pausedTotalTimeMs,
+                            long stoppedTotalTimeMs, long failedTotalTimeMs, 
long destroyedTotalTimeMs, long restartingTotalTimeMs) {
             this.state = state;
             this.startTime = startTime;
             this.unassignedTotalTimeMs = unassignedTotalTimeMs;
             this.runningTotalTimeMs = runningTotalTimeMs;
             this.pausedTotalTimeMs = pausedTotalTimeMs;
+            this.stoppedTotalTimeMs  = stoppedTotalTimeMs;
             this.failedTotalTimeMs = failedTotalTimeMs;
             this.destroyedTotalTimeMs = destroyedTotalTimeMs;
             this.restartingTotalTimeMs = restartingTotalTimeMs;
@@ -106,7 +108,7 @@ public class StateTracker {
          */
         public StateChange newState(State state, long now) {
             if (this.state == null) {
-                return new StateChange(state, now, 0L, 0L, 0L, 0L, 0L, 0L);
+                return new StateChange(state, now, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
             }
             if (state == this.state) {
                 return this;
@@ -114,6 +116,7 @@ public class StateTracker {
             long unassignedTime = this.unassignedTotalTimeMs;
             long runningTime = this.runningTotalTimeMs;
             long pausedTime = this.pausedTotalTimeMs;
+            long stoppedTime = this.stoppedTotalTimeMs;
             long failedTime = this.failedTotalTimeMs;
             long destroyedTime = this.destroyedTotalTimeMs;
             long restartingTime = this.restartingTotalTimeMs;
@@ -128,6 +131,9 @@ public class StateTracker {
                 case PAUSED:
                     pausedTime += duration;
                     break;
+                case STOPPED:
+                    stoppedTime += duration;
+                    break;
                 case FAILED:
                     failedTime += duration;
                     break;
@@ -138,7 +144,7 @@ public class StateTracker {
                     restartingTime += duration;
                     break;
             }
-            return new StateChange(state, now, unassignedTime, runningTime, 
pausedTime, failedTime, destroyedTime, restartingTime);
+            return new StateChange(state, now, unassignedTime, runningTime, 
pausedTime, stoppedTime, failedTime, destroyedTime, restartingTime);
         }
 
         /**
@@ -164,6 +170,9 @@ public class StateTracker {
                 case PAUSED:
                     durationDesired += pausedTotalTimeMs;
                     break;
+                case STOPPED:
+                    durationDesired += stoppedTotalTimeMs;
+                    break;
                 case FAILED:
                     durationDesired += failedTotalTimeMs;
                     break;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java
index f2486a6a0eb..07f8372e969 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java
@@ -22,7 +22,10 @@ package org.apache.kafka.connect.runtime;
  * target state is "STARTED." This does not mean it has actually started, just 
that
  * the Connect framework will attempt to start it after its tasks have been 
assigned.
  * After the connector has been paused, the target state will change to PAUSED,
- * and all the tasks will stop doing work.
+ * and all the tasks will stop doing work. A target state of STOPPED is 
similar to
+ * PAUSED, but is also accompanied by a full shutdown of the connector's tasks,
+ * including deallocation of any Kafka clients, SMTs, and other resources 
brought
+ * up for or by that task.
  * <p>
  * Target states are persisted in the config topic, which is read by all of the
  * workers in the group. When a worker sees a new target state for a connector 
which
@@ -33,4 +36,5 @@ package org.apache.kafka.connect.runtime;
 public enum TargetState {
     STARTED,
     PAUSED,
+    STOPPED,
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 50bf3e672c2..680cb7a6f29 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -57,7 +57,8 @@ public class WorkerConnector implements Runnable {
 
     private enum State {
         INIT,    // initial state before startup
-        STOPPED, // the connector has been stopped/paused.
+        PAUSED,  // The connector has been paused.
+        STOPPED, // the connector has been stopped.
         STARTED, // the connector has been started/resumed.
         FAILED,  // the connector has failed (no further transitions are 
possible after this state)
     }
@@ -186,6 +187,7 @@ public class WorkerConnector implements Runnable {
                     return false;
 
                 case INIT:
+                case PAUSED:
                 case STOPPED:
                     connector.start(config);
                     this.state = State.STARTED;
@@ -220,29 +222,40 @@ public class WorkerConnector implements Runnable {
         return state == State.STARTED;
     }
 
-    @SuppressWarnings("fallthrough")
-    private void pause() {
+    private void suspend(boolean paused) {
+        State newState = paused ? State.PAUSED : State.STOPPED;
         try {
-            switch (state) {
-                case STOPPED:
-                    return;
+            if (state == newState) {
+                // Already in the desired state
+                return;
+            }
 
-                case STARTED:
-                    connector.stop();
-                    // fall through
+            if (state == State.STARTED) {
+                connector.stop();
+            }
 
-                case INIT:
-                    statusListener.onPause(connName);
-                    this.state = State.STOPPED;
-                    break;
+            if (state == State.FAILED && newState != State.STOPPED) {
+                throw new IllegalArgumentException("Cannot transition to 
non-stopped state when connector has already failed");
+            }
 
-                default:
-                    throw new IllegalArgumentException("Cannot pause connector 
in state " + state);
+            if (paused) {
+                statusListener.onPause(connName);
+            } else {
+                statusListener.onStop(connName);
             }
+
+            this.state = newState;
         } catch (Throwable t) {
-            log.error("{} Error while shutting down connector", this, t);
-            statusListener.onFailure(connName, t);
-            this.state = State.FAILED;
+            log.error("{} Error while {} connector", this, paused ? "pausing" 
: "stopping", t);
+            if (paused) {
+                statusListener.onFailure(connName, t);
+                this.state = State.FAILED;
+            } else {
+                // We say the connector is STOPPED even if it fails at this 
point
+                this.state = State.STOPPED;
+                // One more try to make sure the status is updated correctly
+                statusListener.onStop(connName);
+            }
         }
     }
 
@@ -332,7 +345,8 @@ public class WorkerConnector implements Runnable {
     }
 
     void doTransitionTo(TargetState targetState, Callback<TargetState> 
stateChangeCallback) {
-        if (state == State.FAILED) {
+        // Edge case: we don't do transitions most of the time if we've 
already failed, but for the STOPPED state, it's fine
+        if (state == State.FAILED && targetState != TargetState.STOPPED) {
             stateChangeCallback.onCompletion(
                     new ConnectException(this + " Cannot transition connector 
to " + targetState + " since it has failed"),
                     null);
@@ -354,7 +368,9 @@ public class WorkerConnector implements Runnable {
     private void doTransitionTo(TargetState targetState) throws Throwable {
         log.debug("{} Transition connector to {}", this, targetState);
         if (targetState == TargetState.PAUSED) {
-            pause();
+            suspend(true);
+        } else if (targetState == TargetState.STOPPED) {
+            suspend(false);
         } else if (targetState == TargetState.STARTED) {
             if (state == State.INIT)
                 start();
@@ -448,6 +464,16 @@ public class WorkerConnector implements Runnable {
             }
         }
 
+        @Override
+        public void onStop(String connector) {
+            state = AbstractStatus.State.STOPPED;
+            synchronized (this) {
+                if (!cancelled) {
+                    delegate.onStop(connector);
+                }
+            }
+        }
+
         @Override
         public void onPause(String connector) {
             state = AbstractStatus.State.PAUSED;
@@ -502,6 +528,10 @@ public class WorkerConnector implements Runnable {
             return state == AbstractStatus.State.PAUSED;
         }
 
+        boolean isStopped() {
+            return state == AbstractStatus.State.STOPPED;
+        }
+
         boolean isFailed() {
             return state == AbstractStatus.State.FAILED;
         }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
index f03bc4f2b79..35f2f634059 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
@@ -128,6 +128,11 @@ class WorkerMetricsGroup {
             delegateListener.onStartup(connector);
         }
 
+        @Override
+        public void onStop(final String connector) {
+            delegateListener.onStop(connector);
+        }
+
         @Override
         public void onPause(final String connector) {
             delegateListener.onPause(connector);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index a669f6a040f..ce40814a575 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -169,7 +169,9 @@ abstract class WorkerTask implements Runnable {
     }
 
     protected boolean isStopping() {
-        return stopping;
+        // The target state should never be STOPPED, but if things go wrong 
and it somehow is,
+        // we handle that identically to a request to shut down the task
+        return stopping || targetState == TargetState.STOPPED;
     }
 
     protected boolean isCancelled() {
@@ -188,7 +190,7 @@ abstract class WorkerTask implements Runnable {
     private void doRun() throws InterruptedException {
         try {
             synchronized (this) {
-                if (stopping)
+                if (isStopping())
                     return;
 
                 if (targetState == TargetState.PAUSED) {
@@ -204,7 +206,7 @@ abstract class WorkerTask implements Runnable {
             failed = true;
             if (cancelled) {
                 log.warn("{} After being scheduled for shutdown, the orphan 
task threw an uncaught exception. A newer instance of this task might be 
already running", this, t);
-            } else if (stopping) {
+            } else if (isStopping()) {
                 log.warn("{} After being scheduled for shutdown, task threw an 
uncaught exception.", this, t);
             } else {
                 log.error("{} Task threw an uncaught and unrecoverable 
exception. Task is being killed and will not recover until manually restarted", 
this, t);
@@ -281,7 +283,7 @@ abstract class WorkerTask implements Runnable {
     protected boolean awaitUnpause() throws InterruptedException {
         synchronized (this) {
             while (targetState == TargetState.PAUSED) {
-                if (stopping)
+                if (isStopping())
                     return false;
                 this.wait();
             }
@@ -291,9 +293,21 @@ abstract class WorkerTask implements Runnable {
 
     public void transitionTo(TargetState state) {
         synchronized (this) {
-            // ignore the state change if we are stopping
-            if (stopping)
+            // Ignore the state change if we are stopping.
+            // This has the consequence that, if we ever transition to the 
STOPPED target state (which
+            // should never happen since whole point of that state is that it 
comes with a complete
+            // shutdown of all the tasks for the connector), we will never be 
able to transition out of it.
+            // Since part of transitioning to the STOPPED state is that we 
shut down the task and all of
+            // its resources (Kafka clients, SMTs, etc.), this is a reasonable 
way to do things; otherwise,
+            // we'd have to re-instantiate all of those resources to be able 
to resume (or even just pause)
+            // the task .
+            if (isStopping()) {
+                log.debug("{} Ignoring request to transition stopped task {} 
to state {}", this, id, state);
                 return;
+            }
+
+            if (targetState == TargetState.STOPPED)
+                log.warn("{} Received unexpected request to transition task {} 
to state {}; will shut down in response", this, id, TargetState.STOPPED);
 
             this.targetState = state;
             this.notifyAll();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 89d2bbbefed..b5f92386ab0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1094,6 +1094,38 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         );
     }
 
+    @Override
+    public void stopConnector(final String connName, final Callback<Void> 
callback) {
+        log.trace("Submitting request to transition connector {} to STOPPED 
state", connName);
+
+        addRequest(
+                () -> {
+                    if (!configState.contains(connName))
+                        throw new NotFoundException("Unknown connector " + 
connName);
+
+                    // We only allow the leader to handle this request since 
it involves writing task configs to the config topic
+                    if (!isLeader()) {
+                        callback.onCompletion(new NotLeaderException("Only the 
leader can transition connectors to the STOPPED state.", leaderUrl()), null);
+                        return null;
+                    }
+
+                    // We write the task configs first since, if we fail 
between then and writing the target state, the
+                    // cluster is still kept in a healthy state. A RUNNING 
connector with zero tasks is acceptable (although,
+                    // if the connector is reassigned during the ensuing 
rebalance, it is likely that it will immediately generate
+                    // a non-empty set of task configs). A STOPPED connector 
with a non-empty set of tasks is less acceptable
+                    // and likely to confuse users.
+                    writeTaskConfigs(connName, Collections.emptyList());
+                    configBackingStore.putTargetState(connName, 
TargetState.STOPPED);
+                    // Force a read of the new target state for the connector
+                    refreshConfigSnapshot(workerSyncTimeoutMs);
+
+                    callback.onCompletion(null, null);
+                    return null;
+                },
+                forwardErrorCallback(callback)
+        );
+    }
+
     @Override
     public void requestTaskReconfiguration(final String connName) {
         log.trace("Submitting connector task reconfiguration request {}", 
connName);
@@ -1151,7 +1183,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                 else if (!configState.contains(connName))
                     callback.onCompletion(new NotFoundException("Connector " + 
connName + " not found"), null);
                 else {
-                    writeToConfigTopicAsLeader(() -> 
configBackingStore.putTaskConfigs(connName, configs));
+                    writeTaskConfigs(connName, configs);
                     callback.onCompletion(null, null);
                 }
                 return null;
@@ -1963,6 +1995,9 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
             if (!worker.isRunning(connName)) {
                 log.info("Skipping reconfiguration of connector {} since it is 
not running", connName);
                 return;
+            } else if (configState.targetState(connName) != 
TargetState.STARTED) {
+                log.info("Skipping reconfiguration of connector {} since its 
target state is {}", connName, configState.targetState(connName));
+                return;
             }
 
             Map<String, String> configs = 
configState.connectorConfig(connName);
@@ -1975,52 +2010,66 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
             }
 
             final List<Map<String, String>> taskProps = 
worker.connectorTaskConfigs(connName, connConfig);
-            if (taskConfigsChanged(configState, connName, taskProps)) {
-                List<Map<String, String>> rawTaskProps = 
reverseTransform(connName, configState, taskProps);
-                if (isLeader()) {
-                    writeToConfigTopicAsLeader(() -> 
configBackingStore.putTaskConfigs(connName, rawTaskProps));
-                    cb.onCompletion(null, null);
-                } else if (restClient == null) {
-                    throw new NotLeaderException("This worker is not able to 
communicate with the leader of the cluster, "
-                            + "which is required for dynamically-reconfiguring 
connectors. If running MirrorMaker 2 "
-                            + "in dedicated mode, consider enabling 
inter-worker communication via the "
-                            + "'dedicated.mode.enable.internal.rest' 
property.",
-                            leaderUrl()
-                    );
-                } else {
-                    // We cannot forward the request on the same thread 
because this reconfiguration can happen as a result of connector
-                    // addition or removal. If we blocked waiting for the 
response from leader, we may be kicked out of the worker group.
-                    forwardRequestExecutor.submit(() -> {
-                        try {
-                            String leaderUrl = leaderUrl();
-                            if (Utils.isBlank(leaderUrl)) {
-                                cb.onCompletion(new ConnectException("Request 
to leader to " +
-                                        "reconfigure connector tasks failed " +
-                                        "because the URL of the leader's REST 
interface is empty!"), null);
-                                return;
-                            }
-                            String reconfigUrl = namespacedUrl(leaderUrl)
-                                    .path("connectors")
-                                    .path(connName)
-                                    .path("tasks")
-                                    .build()
-                                    .toString();
-                            log.trace("Forwarding task configurations for 
connector {} to leader", connName);
-                            restClient.httpRequest(reconfigUrl, "POST", null, 
rawTaskProps, null, sessionKey, requestSignatureAlgorithm);
-                            cb.onCompletion(null, null);
-                            log.trace("Request to leader to reconfigure 
connector tasks succeeded");
-                        } catch (ConnectException e) {
-                            log.error("Request to leader to reconfigure 
connector tasks failed", e);
-                            cb.onCompletion(e, null);
-                        }
-                    });
-                }
-            }
+            publishConnectorTaskConfigs(connName, taskProps, cb);
         } catch (Throwable t) {
             cb.onCompletion(t, null);
         }
     }
 
+    private void publishConnectorTaskConfigs(String connName, List<Map<String, 
String>> taskProps, Callback<Void> cb) {
+        if (!taskConfigsChanged(configState, connName, taskProps)) {
+            return;
+        }
+
+        List<Map<String, String>> rawTaskProps = reverseTransform(connName, 
configState, taskProps);
+        if (isLeader()) {
+            writeTaskConfigs(connName, rawTaskProps);
+            cb.onCompletion(null, null);
+        } else if (restClient == null) {
+            throw new NotLeaderException("This worker is not able to 
communicate with the leader of the cluster, "
+                    + "which is required for dynamically-reconfiguring 
connectors. If running MirrorMaker 2 "
+                    + "in dedicated mode, consider enabling inter-worker 
communication via the "
+                    + "'dedicated.mode.enable.internal.rest' property.",
+                    leaderUrl()
+            );
+        } else {
+            // We cannot forward the request on the same thread because this 
reconfiguration can happen as a result of connector
+            // addition or removal. If we blocked waiting for the response 
from leader, we may be kicked out of the worker group.
+            forwardRequestExecutor.submit(() -> {
+                try {
+                    String leaderUrl = leaderUrl();
+                    if (Utils.isBlank(leaderUrl)) {
+                        cb.onCompletion(new ConnectException("Request to 
leader to " +
+                                "reconfigure connector tasks failed " +
+                                "because the URL of the leader's REST 
interface is empty!"), null);
+                        return;
+                    }
+                    String reconfigUrl = UriBuilder.fromUri(leaderUrl)
+                            .path("connectors")
+                            .path(connName)
+                            .path("tasks")
+                            .build()
+                            .toString();
+                    log.trace("Forwarding task configurations for connector {} 
to leader", connName);
+                    restClient.httpRequest(reconfigUrl, "POST", null, 
rawTaskProps, null, sessionKey, requestSignatureAlgorithm);
+                    cb.onCompletion(null, null);
+                } catch (ConnectException e) {
+                    log.error("Request to leader to reconfigure connector 
tasks failed", e);
+                    cb.onCompletion(e, null);
+                }
+            });
+        }
+    }
+
+    private void writeTaskConfigs(String connName, List<Map<String, String>> 
taskConfigs) {
+        if (!taskConfigs.isEmpty()) {
+            if (configState.targetState(connName) == TargetState.STOPPED)
+                throw new BadRequestException("Cannot submit non-empty set of 
task configs for stopped connector " + connName);
+        }
+
+        writeToConfigTopicAsLeader(() -> 
configBackingStore.putTaskConfigs(connName, taskConfigs));
+    }
+
     // Invoked by exactly-once worker source tasks after they have 
successfully initialized their transactional
     // producer to ensure that it is still safe to bring up the task
     private void verifyTaskGenerationAndOwnership(ConnectorTaskId id, int 
initialTaskGen) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 086b98dc653..fd53f5bb1de 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -266,6 +266,19 @@ public class ConnectorsResource implements ConnectResource 
{
         return Response.accepted().entity(stateInfo).build();
     }
 
+    @PUT
+    @Path("/{connector}/stop")
+    @Operation(summary = "Stop the specified connector",
+               description = "This operation is idempotent and has no effects 
if the connector is already stopped")
+    public void stopConnector(
+            @PathParam("connector") String connector,
+            final @Context HttpHeaders headers,
+            final @Parameter(hidden = true) @QueryParam("forward") Boolean 
forward) throws Throwable {
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herder.stopConnector(connector, cb);
+        requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector 
+ "/stop", "PUT", headers, null, forward);
+    }
+
     @PUT
     @Path("/{connector}/pause")
     @Operation(summary = "Pause the specified connector",
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 3b3c3ced06f..b3c83e3dd78 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -238,6 +238,17 @@ public class StandaloneHerder extends AbstractHerder {
         }
     }
 
+    @Override
+    public synchronized void stopConnector(String connName, Callback<Void> 
callback) {
+        try {
+            removeConnectorTasks(connName);
+            configBackingStore.putTargetState(connName, TargetState.STOPPED);
+            callback.onCompletion(null, null);
+        } catch (Throwable t) {
+            callback.onCompletion(t, null);
+        }
+    }
+
     @Override
     public synchronized void requestTaskReconfiguration(String connName) {
         if (!worker.connectorNames().contains(connName)) {
@@ -429,7 +440,10 @@ public class StandaloneHerder extends AbstractHerder {
 
     private void updateConnectorTasks(String connName) {
         if (!worker.isRunning(connName)) {
-            log.info("Skipping update of connector {} since it is not 
running", connName);
+            log.info("Skipping update of tasks for connector {} since it is 
not running", connName);
+            return;
+        } else if (configState.targetState(connName) != TargetState.STARTED) {
+            log.info("Skipping update of tasks for connector {} since its 
target state is {}", connName, configState.targetState(connName));
             return;
         }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 9952ef9d66b..36b39bdb58c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -79,6 +79,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
+import static org.apache.kafka.connect.runtime.TargetState.PAUSED;
+import static org.apache.kafka.connect.runtime.TargetState.STOPPED;
 import static org.apache.kafka.connect.util.ConnectUtils.className;
 
 /**
@@ -240,6 +242,10 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
     public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct()
             .field("state", Schema.STRING_SCHEMA)
             .build();
+    public static final Schema TARGET_STATE_V1 = SchemaBuilder.struct()
+            .field("state", Schema.STRING_SCHEMA)
+            .field("state.v2", Schema.OPTIONAL_STRING_SCHEMA)
+            .build();
     public static final Schema TASK_COUNT_RECORD_V0 = SchemaBuilder.struct()
             .field("task-count", Schema.INT32_SCHEMA)
             .build();
@@ -633,9 +639,11 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
      */
     @Override
     public void putTargetState(String connector, TargetState state) {
-        Struct connectTargetState = new Struct(TARGET_STATE_V0);
-        connectTargetState.put("state", state.name());
-        byte[] serializedTargetState = converter.fromConnectData(topic, 
TARGET_STATE_V0, connectTargetState);
+        Struct connectTargetState = new Struct(TARGET_STATE_V1);
+        // Older workers don't support the STOPPED state; fall back on PAUSED
+        connectTargetState.put("state", state == STOPPED ? PAUSED.name() : 
state.name());
+        connectTargetState.put("state.v2", state.name());
+        byte[] serializedTargetState = converter.fromConnectData(topic, 
TARGET_STATE_V1, connectTargetState);
         log.debug("Writing target state {} for connector {}", state, 
connector);
         try {
             configLog.send(TARGET_STATE_KEY(connector), 
serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
@@ -928,11 +936,22 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
                     return;
                 }
                 @SuppressWarnings("unchecked")
-                Object targetState = ((Map<String, Object>) 
value.value()).get("state");
-                if (!(targetState instanceof String)) {
-                    log.error("Invalid data for target state for connector 
'{}': 'state' field should be a String but is {}",
+                Map<String, Object> valueMap = (Map<String, Object>) 
value.value();
+                Object targetState = valueMap.get("state.v2");
+                if (targetState != null && !(targetState instanceof String)) {
+                    log.error("Invalid data for target state for connector 
'{}': 'state.v2' field should be a String but is {}",
                             connectorName, className(targetState));
-                    return;
+                    // We don't return here; it's still possible that there's 
a value we can use in the older state field
+                    targetState = null;
+                }
+                if (targetState == null) {
+                    // This record may have been written by an older worker; 
fall back on the older state field
+                    targetState = valueMap.get("state");
+                    if (!(targetState instanceof String)) {
+                        log.error("Invalid data for target state for connector 
'{}': 'state' field should be a String but is {}",
+                                connectorName, className(targetState));
+                        return;
+                    }
                 }
 
                 try {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 5cd794e7c83..65ecaa2b2c8 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -325,6 +325,181 @@ public class ConnectWorkerIntegrationTest {
         assertTrue("Connector and all tasks were not stopped in time", 
stopCounter.await(1, TimeUnit.MINUTES));
     }
 
+    /**
+     * Verify that the target state (started, paused, stopped) of a connector 
can be updated, with
+     * an emphasis on ensuring that the transitions between each state are 
correct.
+     * <p>
+     * The transitions we need to cover are:
+     * <ol>
+     *     <li>RUNNING -> PAUSED</li>
+     *     <li>RUNNING -> STOPPED</li>
+     *     <li>PAUSED -> RUNNING</li>
+     *     <li>PAUSED -> STOPPED</li>
+     *     <li>STOPPED -> RUNNING</li>
+     *     <li>STOPPED -> PAUSED</li>
+     * </ol>
+     * With some reordering, we can perform each transition just once:
+     * <ul>
+     *     <li>Start with RUNNING</li>
+     *     <li>Transition to STOPPED (2)</li>
+     *     <li>Transition to RUNNING (5)</li>
+     *     <li>Transition to PAUSED (1)</li>
+     *     <li>Transition to STOPPED (4)</li>
+     *     <li>Transition to PAUSED (6)</li>
+     *     <li>Transition to RUNNING (3)</li>
+     * </ul>
+     */
+    @Test
+    public void testPauseStopResume() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
+
+        // Want to make sure to use multiple tasks
+        final int numTasks = 4;
+        Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+        props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks));
+
+        // Start with RUNNING
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not start in time"
+        );
+
+        // Transition to STOPPED
+        connect.stopConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Transition to RUNNING
+        connect.resumeConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not resume in time"
+        );
+
+        // Transition to PAUSED
+        connect.pauseConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.pauseConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector did not pause in time"
+        );
+
+        // Transition to STOPPED
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Transition to PAUSED
+        connect.pauseConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+                CONNECTOR_NAME,
+                0,
+                "Connector did not pause in time"
+        );
+
+        // Transition to RUNNING
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not resume in time"
+        );
+
+        // Delete the connector
+        connect.deleteConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndTasksAreNotRunning(
+                CONNECTOR_NAME,
+                "Connector tasks were not destroyed in time"
+        );
+    }
+
+    /**
+     * Test out the {@code STOPPED} state introduced in
+     * <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Newtargetstate:STOPPED";>KIP-875</a>,
+     * with an emphasis on correctly handling errors thrown from the connector.
+     */
+    @Test
+    public void testStoppedState() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
+
+        Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+        // Fail the connector on startup
+        props.put("connector.start.inject.error", "true");
+
+        // Start the connector (should fail immediately and generate no tasks)
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+                CONNECTOR_NAME,
+                0,
+                "Connector should have failed and not generated any tasks"
+        );
+
+        // Stopping a failed connector updates its state to STOPPED in the 
REST API
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Can resume a connector after its Connector has failed before 
shutdown after receiving a stop request
+        props.remove("connector.start.inject.error");
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                NUM_TASKS,
+                "Connector or tasks did not start running healthily in time"
+        );
+
+        // Fail the connector on shutdown
+        props.put("connector.stop.inject.error", "true");
+        // Stopping a connector that fails during shutdown after receiving a 
stop request updates its state to STOPPED in the REST API
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Can resume a connector after its Connector has failed during 
shutdown after receiving a stop request
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                NUM_TASKS,
+                "Connector or tasks did not start running healthily in time"
+        );
+
+        // Can delete a stopped connector
+        connect.deleteConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndTasksAreNotRunning(
+                CONNECTOR_NAME,
+                "Connector and all of its tasks should no longer be running"
+        );
+    }
+
     private Map<String, String> defaultSourceConnectorProps(String topic) {
         // setup up props for the source connector
         Map<String, String> props = new HashMap<>();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
index 8c4e1566931..a8b812f8c31 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
@@ -150,7 +150,7 @@ public class ConnectorTopicsIntegrationTest {
         // deleting a connector resets its active topics
         connect.deleteConnector(BAR_CONNECTOR);
 
-        connect.assertions().assertConnectorAndTasksAreStopped(BAR_CONNECTOR,
+        
connect.assertions().assertConnectorAndTasksAreNotRunning(BAR_CONNECTOR,
                 "Connector tasks did not stop in time.");
 
         connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR, 
Collections.emptyList(),
@@ -205,7 +205,7 @@ public class ConnectorTopicsIntegrationTest {
         // deleting a connector resets its active topics
         connect.deleteConnector(FOO_CONNECTOR);
 
-        connect.assertions().assertConnectorAndTasksAreStopped(FOO_CONNECTOR,
+        
connect.assertions().assertConnectorAndTasksAreNotRunning(FOO_CONNECTOR,
                 "Connector tasks did not stop in time.");
 
         connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, 
Collections.emptyList(),
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 5bc5fcdbd25..7fc03852363 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -178,7 +178,7 @@ public class ErrorHandlingIntegrationTest {
         }
 
         connect.deleteConnector(CONNECTOR_NAME);
-        connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME,
+        
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
                 "Connector tasks did not stop in time.");
 
     }
@@ -247,7 +247,7 @@ public class ErrorHandlingIntegrationTest {
         ConsumerRecords<byte[], byte[]> messages = 
connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, 
DLQ_TOPIC);
 
         connect.deleteConnector(CONNECTOR_NAME);
-        connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME,
+        
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
             "Connector tasks did not stop in time.");
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index 1bbff23e953..cf7a1f408f1 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -104,6 +104,9 @@ public class MonitorableSourceConnector extends 
SampleSourceConnector {
     public void stop() {
         log.info("Stopped {} connector {}", this.getClass().getSimpleName(), 
connectorName);
         connectorHandle.recordConnectorStop();
+        if 
(Boolean.parseBoolean(commonConfigs.getOrDefault("connector.stop.inject.error", 
"false"))) {
+            throw new RuntimeException("Injecting errors during connector 
stop");
+        }
     }
 
     @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index ebc53edee2d..04e12ea41e0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -206,7 +206,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         // delete connector
         connect.deleteConnector(CONNECTOR_NAME + 3);
 
-        connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME 
+ 3,
+        
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME + 3,
                 "Connector tasks did not stop in time.");
 
         waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
index e716efc091d..cf07d5d2e29 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -103,7 +103,7 @@ public class WorkerConnectorTest {
         assertFailedMetric(workerConnector);
         workerConnector.shutdown();
         workerConnector.doShutdown();
-        assertStoppedMetric(workerConnector);
+        assertDestroyedMetric(workerConnector);
 
         verifyInitialize();
         verify(listener).onFailure(CONNECTOR, exception);
@@ -127,7 +127,7 @@ public class WorkerConnectorTest {
         assertFailedMetric(workerConnector);
         workerConnector.shutdown();
         workerConnector.doShutdown();
-        assertStoppedMetric(workerConnector);
+        assertDestroyedMetric(workerConnector);
 
         verifyInitialize();
         verify(listener).onFailure(CONNECTOR, exception);
@@ -153,7 +153,7 @@ public class WorkerConnectorTest {
         assertRunningMetric(workerConnector);
         workerConnector.shutdown();
         workerConnector.doShutdown();
-        assertStoppedMetric(workerConnector);
+        assertDestroyedMetric(workerConnector);
 
         verifyInitialize();
         verify(connector).start(CONFIG);
@@ -181,7 +181,7 @@ public class WorkerConnectorTest {
         assertPausedMetric(workerConnector);
         workerConnector.shutdown();
         workerConnector.doShutdown();
-        assertStoppedMetric(workerConnector);
+        assertDestroyedMetric(workerConnector);
 
         verifyInitialize();
         verify(connector).start(CONFIG);
@@ -195,6 +195,37 @@ public class WorkerConnectorTest {
         verifyNoMoreInteractions(onStateChange);
     }
 
+    @Test
+    public void testStartupAndStop() {
+        connector = sinkConnector;
+        when(connector.version()).thenReturn(VERSION);
+
+        Callback<TargetState> onStateChange = mockCallback();
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
+
+        workerConnector.initialize();
+        assertInitializedSinkMetric(workerConnector);
+
+        workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
+        assertRunningMetric(workerConnector);
+        workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
+        assertStoppedMetric(workerConnector);
+        workerConnector.shutdown();
+        workerConnector.doShutdown();
+        assertDestroyedMetric(workerConnector);
+
+        verifyInitialize();
+        verify(connector).start(CONFIG);
+        verify(listener).onStartup(CONNECTOR);
+        verify(listener).onStop(CONNECTOR);
+        verifyCleanShutdown(true);
+
+        InOrder inOrder = inOrder(onStateChange);
+        inOrder.verify(onStateChange).onCompletion(isNull(), 
eq(TargetState.STARTED));
+        inOrder.verify(onStateChange).onCompletion(isNull(), 
eq(TargetState.STOPPED));
+        verifyNoMoreInteractions(onStateChange);
+    }
+
     @Test
     public void testOnResume() {
         connector = sourceConnector;
@@ -213,7 +244,7 @@ public class WorkerConnectorTest {
         assertRunningMetric(workerConnector);
         workerConnector.shutdown();
         workerConnector.doShutdown();
-        assertStoppedMetric(workerConnector);
+        assertDestroyedMetric(workerConnector);
 
         verifyInitialize();
         verify(listener).onPause(CONNECTOR);
@@ -241,7 +272,7 @@ public class WorkerConnectorTest {
         assertPausedMetric(workerConnector);
         workerConnector.shutdown();
         workerConnector.doShutdown();
-        assertStoppedMetric(workerConnector);
+        assertDestroyedMetric(workerConnector);
 
         verifyInitialize();
         // connector never gets started
@@ -252,6 +283,31 @@ public class WorkerConnectorTest {
         verifyNoMoreInteractions(onStateChange);
     }
 
+    @Test
+    public void testStartupStopped() {
+        connector = sinkConnector;
+        when(connector.version()).thenReturn(VERSION);
+
+        Callback<TargetState> onStateChange = mockCallback();
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
+
+        workerConnector.initialize();
+        assertInitializedSinkMetric(workerConnector);
+        workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
+        assertStoppedMetric(workerConnector);
+        workerConnector.shutdown();
+        workerConnector.doShutdown();
+        assertDestroyedMetric(workerConnector);
+
+        verifyInitialize();
+        // connector never gets started
+        verify(listener).onStop(CONNECTOR);
+        verifyCleanShutdown(false);
+
+        verify(onStateChange).onCompletion(isNull(), eq(TargetState.STOPPED));
+        verifyNoMoreInteractions(onStateChange);
+    }
+
     @Test
     public void testStartupFailure() {
         RuntimeException exception = new RuntimeException();
@@ -269,7 +325,7 @@ public class WorkerConnectorTest {
         assertFailedMetric(workerConnector);
         workerConnector.shutdown();
         workerConnector.doShutdown();
-        assertStoppedMetric(workerConnector);
+        assertDestroyedMetric(workerConnector);
 
         verifyInitialize();
         verify(connector).start(CONFIG);
@@ -280,6 +336,49 @@ public class WorkerConnectorTest {
         verifyNoMoreInteractions(onStateChange);
     }
 
+    @Test
+    public void testStopFailure() {
+        RuntimeException exception = new RuntimeException();
+        connector = sourceConnector;
+
+        when(connector.version()).thenReturn(VERSION);
+
+        // Fail during the first call to stop, then succeed for the next 
attempt
+        doThrow(exception).doNothing().when(connector).stop();
+
+        Callback<TargetState> onFirstStateChange = mockCallback();
+        Callback<TargetState> onSecondStateChange = mockCallback();
+        Callback<TargetState> onThirdStateChange = mockCallback();
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, 
offsetStore, classLoader);
+
+        workerConnector.initialize();
+        assertInitializedSourceMetric(workerConnector);
+        workerConnector.doTransitionTo(TargetState.STARTED, 
onFirstStateChange);
+        assertRunningMetric(workerConnector);
+        workerConnector.doTransitionTo(TargetState.STOPPED, 
onSecondStateChange);
+        assertStoppedMetric(workerConnector);
+        workerConnector.doTransitionTo(TargetState.STARTED, 
onThirdStateChange);
+        assertRunningMetric(workerConnector);
+        workerConnector.shutdown();
+        workerConnector.doShutdown();
+        assertDestroyedMetric(workerConnector);
+
+        verifyInitialize();
+        verify(connector, times(2)).start(CONFIG);
+        verify(listener).onStartup(CONNECTOR);
+        verify(listener).onResume(CONNECTOR);
+        verify(listener).onStop(CONNECTOR);
+        verify(onFirstStateChange).onCompletion(isNull(), 
eq(TargetState.STARTED));
+        verifyNoMoreInteractions(onFirstStateChange);
+        // We swallow failures when transitioning to the STOPPED state
+        verify(onSecondStateChange).onCompletion(isNull(), 
eq(TargetState.STOPPED));
+        verifyNoMoreInteractions(onSecondStateChange);
+        verify(onThirdStateChange).onCompletion(isNull(), 
eq(TargetState.STARTED));
+        verifyNoMoreInteractions(onThirdStateChange);
+        verifyShutdown(2, true, true);
+        verifyNoMoreInteractions(listener);
+    }
+
     @Test
     public void testShutdownFailure() {
         RuntimeException exception = new RuntimeException();
@@ -327,7 +426,7 @@ public class WorkerConnectorTest {
         assertRunningMetric(workerConnector);
         workerConnector.shutdown();
         workerConnector.doShutdown();
-        assertStoppedMetric(workerConnector);
+        assertDestroyedMetric(workerConnector);
 
         verifyInitialize();
         verify(connector).start(CONFIG);
@@ -356,7 +455,7 @@ public class WorkerConnectorTest {
         assertPausedMetric(workerConnector);
         workerConnector.shutdown();
         workerConnector.doShutdown();
-        assertStoppedMetric(workerConnector);
+        assertDestroyedMetric(workerConnector);
 
         verifyInitialize();
         verify(connector).start(CONFIG);
@@ -370,6 +469,38 @@ public class WorkerConnectorTest {
         verifyNoMoreInteractions(onStateChange);
     }
 
+    @Test
+    public void testTransitionStoppedToStopped() {
+        connector = sourceConnector;
+        when(connector.version()).thenReturn(VERSION);
+
+        Callback<TargetState> onStateChange = mockCallback();
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, 
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader, 
offsetStore, classLoader);
+
+        workerConnector.initialize();
+        assertInitializedSourceMetric(workerConnector);
+        workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
+        assertRunningMetric(workerConnector);
+        workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
+        assertStoppedMetric(workerConnector);
+        workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
+        assertStoppedMetric(workerConnector);
+        workerConnector.shutdown();
+        workerConnector.doShutdown();
+        assertDestroyedMetric(workerConnector);
+
+        verifyInitialize();
+        verify(connector).start(CONFIG);
+        verify(listener).onStartup(CONNECTOR);
+        verify(listener).onStop(CONNECTOR);
+        verifyCleanShutdown(true);
+
+        InOrder inOrder = inOrder(onStateChange);
+        inOrder.verify(onStateChange).onCompletion(isNull(), 
eq(TargetState.STARTED));
+        inOrder.verify(onStateChange, times(2)).onCompletion(isNull(), 
eq(TargetState.STOPPED));
+        verifyNoMoreInteractions(onStateChange);
+    }
+
     @Test
     public void testFailConnectorThatIsNeitherSourceNorSink() {
         connector = mock(Connector.class);
@@ -389,13 +520,23 @@ public class WorkerConnectorTest {
     protected void assertFailedMetric(WorkerConnector workerConnector) {
         assertFalse(workerConnector.metrics().isUnassigned());
         assertTrue(workerConnector.metrics().isFailed());
+        assertFalse(workerConnector.metrics().isStopped());
+        assertFalse(workerConnector.metrics().isPaused());
+        assertFalse(workerConnector.metrics().isRunning());
+    }
+
+    protected void assertStoppedMetric(WorkerConnector workerConnector) {
+        assertFalse(workerConnector.metrics().isUnassigned());
+        assertFalse(workerConnector.metrics().isFailed());
         assertFalse(workerConnector.metrics().isPaused());
+        assertTrue(workerConnector.metrics().isStopped());
         assertFalse(workerConnector.metrics().isRunning());
     }
 
     protected void assertPausedMetric(WorkerConnector workerConnector) {
         assertFalse(workerConnector.metrics().isUnassigned());
         assertFalse(workerConnector.metrics().isFailed());
+        assertFalse(workerConnector.metrics().isStopped());
         assertTrue(workerConnector.metrics().isPaused());
         assertFalse(workerConnector.metrics().isRunning());
     }
@@ -403,13 +544,15 @@ public class WorkerConnectorTest {
     protected void assertRunningMetric(WorkerConnector workerConnector) {
         assertFalse(workerConnector.metrics().isUnassigned());
         assertFalse(workerConnector.metrics().isFailed());
+        assertFalse(workerConnector.metrics().isStopped());
         assertFalse(workerConnector.metrics().isPaused());
         assertTrue(workerConnector.metrics().isRunning());
     }
 
-    protected void assertStoppedMetric(WorkerConnector workerConnector) {
+    protected void assertDestroyedMetric(WorkerConnector workerConnector) {
         assertTrue(workerConnector.metrics().isUnassigned());
         assertFalse(workerConnector.metrics().isFailed());
+        assertFalse(workerConnector.metrics().isStopped());
         assertFalse(workerConnector.metrics().isPaused());
         assertFalse(workerConnector.metrics().isRunning());
     }
@@ -425,6 +568,7 @@ public class WorkerConnectorTest {
     protected void assertInitializedMetric(WorkerConnector workerConnector, 
String expectedType) {
         assertTrue(workerConnector.metrics().isUnassigned());
         assertFalse(workerConnector.metrics().isFailed());
+        assertFalse(workerConnector.metrics().isStopped());
         assertFalse(workerConnector.metrics().isPaused());
         assertFalse(workerConnector.metrics().isRunning());
         MetricGroup metricGroup = workerConnector.metrics().metricGroup();
@@ -457,6 +601,10 @@ public class WorkerConnectorTest {
     }
 
     private void verifyShutdown(boolean clean, boolean started) {
+        verifyShutdown(1, clean, started);
+    }
+
+    private void verifyShutdown(int connectorStops, boolean clean, boolean 
started) {
         verify(ctx).close();
         if (connector instanceof SourceConnector) {
             verify(offsetStorageReader).close();
@@ -466,7 +614,7 @@ public class WorkerConnectorTest {
             verify(listener).onShutdown(CONNECTOR);
         }
         if (started) {
-            verify(connector).stop();
+            verify(connector, times(connectorStops)).stop();
         }
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 7ad75ae0b1b..33b8f966e2e 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -217,6 +217,17 @@ public class DistributedHerderTest {
             Collections.emptyMap(),
             Collections.emptySet(),
             Collections.emptySet());
+    private static final ClusterConfigState SNAPSHOT_STOPPED_CONN1 = new 
ClusterConfigState(
+            1,
+            null,
+            Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG),
+            Collections.singletonMap(CONN1, TargetState.STOPPED),
+            Collections.emptyMap(), // Stopped connectors should have an empty 
set of task configs
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptySet(),
+            Collections.emptySet());
     private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = 
new ClusterConfigState(
             1,
             null,
@@ -2161,6 +2172,65 @@ public class DistributedHerderTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testConnectorStopped() throws Exception {
+        // ensure that target state changes are propagated to the worker
+
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // join
+        expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList());
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        Capture<Callback<TargetState>> onStart = newCapture();
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), 
EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), 
capture(onStart));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            onStart.getValue().onCompletion(null, TargetState.STARTED);
+            return true;
+        });
+        member.wakeup();
+        PowerMock.expectLastCall();
+        expectExecuteTaskReconfiguration(true, conn1SinkConfig, () -> 
TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // handle the state change
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        
EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_STOPPED_CONN1);
+
+        Capture<Callback<TargetState>> onStop = newCapture();
+        worker.setTargetState(EasyMock.eq(CONN1), 
EasyMock.eq(TargetState.STOPPED), capture(onStop));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            onStart.getValue().onCompletion(null, TargetState.STOPPED);
+            return null;
+        });
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // These will occur just before/during the third tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        configUpdateListener.onConnectorTargetStateChange(CONN1); // state 
changes to stopped
+        herder.tick(); // worker should apply the state change
+        herder.tick();
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testUnknownConnectorPaused() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("member");
@@ -2197,6 +2267,139 @@ public class DistributedHerderTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testStopConnector() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // join as leader
+        expectRebalance(1, Collections.emptyList(), singletonList(TASK0), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // handle stop request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        configBackingStore.putTaskConfigs(CONN1, Collections.emptyList());
+        PowerMock.expectLastCall();
+        configBackingStore.putTargetState(CONN1, TargetState.STOPPED);
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        FutureCallback<Void> cb = new FutureCallback<>();
+
+        herder.tick(); // join
+        herder.stopConnector(CONN1, cb); // external request
+        herder.tick(); // continue
+
+        assertTrue("Callback should already have been invoked by herder", 
cb.isDone());
+        cb.get(0, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testStopConnectorNotLeader() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // join as leader
+        expectRebalance(1, Collections.emptyList(), singletonList(TASK0));
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // handle stop request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        FutureCallback<Void> cb = new FutureCallback<>();
+
+        herder.tick(); // join
+        herder.stopConnector(CONN1, cb); // external request
+        herder.tick(); // continue
+
+        assertTrue("Callback should already have been invoked by herder", 
cb.isDone());
+        ExecutionException e = assertThrows(
+                "Should not be able to handle request to stop connector when 
not leader",
+                ExecutionException.class,
+                () -> cb.get(0, TimeUnit.SECONDS)
+        );
+        assertTrue(e.getCause() instanceof NotLeaderException);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testStopConnectorFailToWriteTaskConfigs() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // join as leader
+        expectRebalance(1, Collections.emptyList(), singletonList(TASK0), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        ConnectException taskConfigsWriteException = new 
ConnectException("Could not write task configs to config topic");
+
+        // handle stop request
+        member.wakeup();
+        PowerMock.expectLastCall();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        configBackingStore.putTaskConfigs(CONN1, Collections.emptyList());
+        // We do not expect configBackingStore::putTargetState to be invoked, 
which
+        // is intentional since that call should only take place if we are 
first able to
+        // successfully write the empty list of task configs
+        PowerMock.expectLastCall().andThrow(taskConfigsWriteException);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        FutureCallback<Void> cb = new FutureCallback<>();
+
+        herder.tick(); // join
+        herder.stopConnector(CONN1, cb); // external request
+        herder.tick(); // continue
+
+        assertTrue("Callback should already have been invoked by herder", 
cb.isDone());
+        ExecutionException e = assertThrows(
+                "Should not be able to handle request to stop connector when 
not leader",
+                ExecutionException.class,
+                () -> cb.get(0, TimeUnit.SECONDS)
+        );
+        assertEquals(e.getCause(), taskConfigsWriteException);
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testConnectorPausedRunningTaskOnly() throws Exception {
         // even if we don't own the connector, we should still propagate 
target state
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index b76d8f83824..9eb29d5de53 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -86,6 +86,7 @@ import static java.util.Collections.singletonMap;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -733,6 +734,7 @@ public class StandaloneHerderTest {
         expectStop();
 
         statusBackingStore.put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
+        EasyMock.expectLastCall();
 
         statusBackingStore.stop();
         EasyMock.expectLastCall();
@@ -937,6 +939,50 @@ public class StandaloneHerderTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTargetStates() throws Exception {
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+        expectAdd(SourceSink.SOURCE);
+
+        Map<String, String> connectorConfig = 
connectorConfig(SourceSink.SOURCE);
+        Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, connectorConfig);
+
+        // We pause, then stop, the connector
+        expectTargetState(CONNECTOR_NAME, TargetState.PAUSED);
+        expectTargetState(CONNECTOR_NAME, TargetState.STOPPED);
+
+        // herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
+        expectStop();
+
+        statusBackingStore.put(new TaskStatus(new 
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID, 
0));
+        EasyMock.expectLastCall();
+
+        statusBackingStore.stop();
+        EasyMock.expectLastCall();
+        worker.stop();
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        FutureCallback<Void> stopCallback = new FutureCallback<>();
+        FutureCallback<List<TaskInfo>> taskConfigsCallback = new 
FutureCallback<>();
+
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false, 
createCallback);
+        Herder.Created<ConnectorInfo> connectorInfo = 
createCallback.get(1000L, TimeUnit.SECONDS);
+        assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
+        herder.pauseConnector(CONNECTOR_NAME);
+        herder.stopConnector(CONNECTOR_NAME, stopCallback);
+        stopCallback.get(10L, TimeUnit.SECONDS);
+        herder.taskConfigs(CONNECTOR_NAME, taskConfigsCallback);
+        assertEquals(Collections.emptyList(), taskConfigsCallback.get(1, 
TimeUnit.SECONDS));
+
+        herder.stop();
+        assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
+
+        PowerMock.verifyAll();
+    }
+
     private void expectAdd(SourceSink sourceSink) {
         Map<String, String> connectorProps = connectorConfig(sourceSink);
         ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?
@@ -996,6 +1042,15 @@ public class StandaloneHerderTest {
         PowerMock.expectLastCall().andReturn(sourceSink == 
SourceSink.SINK).anyTimes();
     }
 
+    private void expectTargetState(String connector, TargetState state) {
+        Capture<Callback<TargetState>> stateChangeCallback = 
Capture.newInstance();
+        worker.setTargetState(eq(connector), eq(state), 
capture(stateChangeCallback));
+        EasyMock.expectLastCall().andAnswer(() -> {
+            stateChangeCallback.getValue().onCompletion(null, state);
+            return null;
+        });
+    }
+
     private ConnectorInfo createdInfo(SourceSink sourceSink) {
         return new ConnectorInfo(CONNECTOR_NAME, connectorConfig(sourceSink),
             Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)),
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 63fe460685f..f06bdee9041 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -61,6 +61,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
@@ -145,8 +146,15 @@ public class KafkaConfigBackingStoreTest {
             new 
Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6),
             new 
Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9)
     );
-    private static final Struct TARGET_STATE_PAUSED = new 
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED");
     private static final Struct TARGET_STATE_STARTED = new 
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
+    private static final Struct TARGET_STATE_PAUSED_LEGACY = new 
Struct(KafkaConfigBackingStore.TARGET_STATE_V0)
+            .put("state", "PAUSED");
+    private static final Struct TARGET_STATE_PAUSED = new 
Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
+            .put("state", "PAUSED")
+            .put("state.v2", "PAUSED");
+    private static final Struct TARGET_STATE_STOPPED = new 
Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
+            .put("state", "PAUSED")
+            .put("state.v2", "STOPPED");
 
     private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
             = new 
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
@@ -460,7 +468,7 @@ public class KafkaConfigBackingStoreTest {
         expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, 
CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(1));
 
         // In the meantime, write a target state (which doesn't require write 
privileges)
-        expectConvert(KafkaConfigBackingStore.TARGET_STATE_V0, 
TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
+        expectConvert(KafkaConfigBackingStore.TARGET_STATE_V1, 
TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
         storeLog.send("target-state-" + CONNECTOR_IDS.get(1), 
CONFIGS_SERIALIZED.get(1));
         EasyMock.expectLastCall().andReturn(producerFuture);
         producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
@@ -811,14 +819,18 @@ public class KafkaConfigBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
                         CONFIGS_SERIALIZED.get(3), new RecordHeaders(), 
Optional.empty()),
                 new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
-                        CONFIGS_SERIALIZED.get(4), new RecordHeaders(), 
Optional.empty()));
+                        CONFIGS_SERIALIZED.get(4), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, 
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(5), new RecordHeaders(), 
Optional.empty()));
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED);
+        // A worker running an older version wrote this target state; make 
sure we can handle it correctly
+        deserialized.put(CONFIGS_SERIALIZED.get(3), 
TARGET_STATE_PAUSED_LEGACY);
         deserialized.put(CONFIGS_SERIALIZED.get(4), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
-        logOffset = 5;
+        deserialized.put(CONFIGS_SERIALIZED.get(5), TARGET_STATE_STOPPED);
+        logOffset = 6;
 
         expectStart(existingRecords, deserialized);
 
@@ -834,9 +846,10 @@ public class KafkaConfigBackingStoreTest {
 
         // Should see a single connector with initial state paused
         ClusterConfigState configState = configStorage.snapshot();
-        assertEquals(5, configState.offset()); // Should always be next to be 
read, even if uncommitted
+        assertEquals(6, configState.offset()); // Should always be next to be 
read, even if uncommitted
         assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
         assertEquals(TargetState.PAUSED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+        assertEquals(TargetState.STOPPED, 
configState.targetState(CONNECTOR_IDS.get(1)));
 
         configStorage.stop();
 
@@ -857,18 +870,27 @@ public class KafkaConfigBackingStoreTest {
                         CONFIGS_SERIALIZED.get(2), new RecordHeaders(), 
Optional.empty()),
                 new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
                         CONFIGS_SERIALIZED.get(3), new RecordHeaders(), 
Optional.empty()));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
-        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(3), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        LinkedHashMap<byte[], Struct> deserializedOnStartup = new 
LinkedHashMap<>();
+        deserializedOnStartup.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserializedOnStartup.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserializedOnStartup.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserializedOnStartup.put(CONFIGS_SERIALIZED.get(3), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
         logOffset = 5;
 
-        expectStart(existingRecords, deserialized);
+        expectStart(existingRecords, deserializedOnStartup);
+
+        LinkedHashMap<String, byte[]> serializedAfterStartup = new 
LinkedHashMap<>();
+        serializedAfterStartup.put(TARGET_STATE_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
+        serializedAfterStartup.put(TARGET_STATE_KEYS.get(1), 
CONFIGS_SERIALIZED.get(1));
 
-        expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), 
TARGET_STATE_PAUSED);
+        Map<String, Struct> deserializedAfterStartup = new HashMap<>();
+        deserializedAfterStartup.put(TARGET_STATE_KEYS.get(0), 
TARGET_STATE_PAUSED);
+        deserializedAfterStartup.put(TARGET_STATE_KEYS.get(1), 
TARGET_STATE_STOPPED);
+
+        expectRead(serializedAfterStartup, deserializedAfterStartup);
 
         
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
+        
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(1));
         EasyMock.expectLastCall();
 
         expectPartitionCount(1);
@@ -879,11 +901,17 @@ public class KafkaConfigBackingStoreTest {
         configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
         configStorage.start();
 
-        // Should see a single connector with initial state paused
+        // Should see a single connector with initial state started
         ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), 
configStorage.connectorTargetStates.keySet());
         assertEquals(TargetState.STARTED, 
configState.targetState(CONNECTOR_IDS.get(0)));
 
+        // Should see two connectors now, one paused and one stopped
         configStorage.refresh(0, TimeUnit.SECONDS);
+        configState = configStorage.snapshot();
+        assertEquals(new HashSet<>(CONNECTOR_IDS), 
configStorage.connectorTargetStates.keySet());
+        assertEquals(TargetState.PAUSED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+        assertEquals(TargetState.STOPPED, 
configState.targetState(CONNECTOR_IDS.get(1)));
 
         configStorage.stop();
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 97f8581e9d9..e849476f457 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@@ -391,6 +392,22 @@ public class EmbeddedConnectCluster {
         }
     }
 
+    /**
+     * Stop an existing connector.
+     *
+     * @param connName name of the connector to be paused
+     * @throws ConnectRestException if the REST API returns error status
+     * @throws ConnectException for any other error.
+     */
+    public void stopConnector(String connName) {
+        String url = endpointForResource(String.format("connectors/%s/stop", 
connName));
+        Response response = requestPut(url, "");
+        if (response.getStatus() >= 
Response.Status.BAD_REQUEST.getStatusCode()) {
+            throw new ConnectRestException(response.getStatus(),
+                    "Could not execute PUT request. Error response: " + 
responseToString(response));
+        }
+    }
+
     /**
      * Pause an existing connector.
      *
@@ -554,6 +571,54 @@ public class EmbeddedConnectCluster {
                 "Could not read connector state. Error response: " + 
responseToString(response));
     }
 
+    /**
+     * Get the info of a connector running in this cluster (retrieved via the 
<code>GET /connectors/{connector}</code> endpoint).
+
+     * @param connectorName name of the connector
+     * @return an instance of {@link ConnectorInfo} populated with state 
information of the connector and its tasks.
+     */
+    public ConnectorInfo connectorInfo(String connectorName) {
+        ObjectMapper mapper = new ObjectMapper();
+        String url = endpointForResource(String.format("connectors/%s", 
connectorName));
+        Response response = requestGet(url);
+        try {
+            if (response.getStatus() < 
Response.Status.BAD_REQUEST.getStatusCode()) {
+                return mapper.readValue(responseToString(response), 
ConnectorInfo.class);
+            }
+        } catch (IOException e) {
+            log.error("Could not read connector info from response: {}",
+                    responseToString(response), e);
+            throw new ConnectException("Could not not parse connector info", 
e);
+        }
+        throw new ConnectRestException(response.getStatus(),
+                "Could not read connector info. Error response: " + 
responseToString(response));
+    }
+
+    /**
+     * Get the task configs of a connector running in this cluster.
+
+     * @param connectorName name of the connector
+     * @return a map from task ID (connector name + "-" + task number) to task 
config
+     */
+    public Map<String, Map<String, String>> taskConfigs(String connectorName) {
+        ObjectMapper mapper = new ObjectMapper();
+        String url = 
endpointForResource(String.format("connectors/%s/tasks-config", connectorName));
+        Response response = requestGet(url);
+        try {
+            if (response.getStatus() < 
Response.Status.BAD_REQUEST.getStatusCode()) {
+                // We use String instead of ConnectorTaskId as the key here 
since the latter can't be automatically
+                // deserialized by Jackson when used as a JSON object key 
(i.e., when it's serialized as a JSON string)
+                return mapper.readValue(responseToString(response), new 
TypeReference<Map<String, Map<String, String>>>() { });
+            }
+        } catch (IOException e) {
+            log.error("Could not read task configs from response: {}",
+                    responseToString(response), e);
+            throw new ConnectException("Could not not parse task configs", e);
+        }
+        throw new ConnectRestException(response.getStatus(),
+                "Could not read task configs. Error response: " + 
responseToString(response));
+    }
+
     /**
      * Reset the set of active topics of a connector running in this cluster.
      *
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
index c026cb72903..d0518566220 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 import javax.ws.rs.core.Response;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
@@ -37,6 +38,7 @@ import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
 
 /**
  * A set of common assertions that can be applied to a Connect cluster during 
integration testing
@@ -301,7 +303,8 @@ public class EmbeddedConnectClusterAssertions {
     }
 
     /**
-     * Assert that a connector is running with at least the given number of 
tasks all in running state
+     * Assert that a connector is running, that it has a specific number of 
tasks, and that all of
+     * its tasks are in the RUNNING state.
      *
      * @param connectorName the connector name
      * @param numTasks the number of tasks
@@ -326,6 +329,33 @@ public class EmbeddedConnectClusterAssertions {
         }
     }
 
+    /**
+     * Assert that a connector is paused, that it has a specific number of 
tasks, and that all of
+     * its tasks are in the PAUSED state.
+     *
+     * @param connectorName the connector name
+     * @param numTasks the number of tasks
+     * @param detailMessage the assertion message
+     * @throws InterruptedException
+     */
+    public void assertConnectorAndExactlyNumTasksArePaused(String 
connectorName, int numTasks, String detailMessage)
+            throws InterruptedException {
+        try {
+            waitForCondition(
+                    () -> checkConnectorState(
+                            connectorName,
+                            AbstractStatus.State.PAUSED,
+                            numTasks,
+                            AbstractStatus.State.PAUSED,
+                            Integer::equals
+                    ).orElse(false),
+                    CONNECTOR_SHUTDOWN_DURATION_MS,
+                    "The connector or exactly " + numTasks + " tasks are not 
paused.");
+        } catch (AssertionError e) {
+            throw new AssertionError(detailMessage, e);
+        }
+    }
+
     /**
      * Assert that a connector is running, that it has a specific number of 
tasks, and that all of
      * its tasks are in the FAILED state.
@@ -415,11 +445,11 @@ public class EmbeddedConnectClusterAssertions {
      * @param detailMessage the assertion message
      * @throws InterruptedException
      */
-    public void assertConnectorAndTasksAreStopped(String connectorName, String 
detailMessage)
+    public void assertConnectorAndTasksAreNotRunning(String connectorName, 
String detailMessage)
             throws InterruptedException {
         try {
             waitForCondition(
-                () -> checkConnectorAndTasksAreStopped(connectorName),
+                () -> checkConnectorAndTasksAreNotRunning(connectorName),
                 CONNECTOR_SETUP_DURATION_MS,
                 "At least the connector or one of its tasks is still running");
         } catch (AssertionError e) {
@@ -433,7 +463,7 @@ public class EmbeddedConnectClusterAssertions {
      * @param connectorName the connector
      * @return true if the connector and all the tasks are not in RUNNING 
state; false otherwise
      */
-    protected boolean checkConnectorAndTasksAreStopped(String connectorName) {
+    protected boolean checkConnectorAndTasksAreNotRunning(String 
connectorName) {
         ConnectorStateInfo info;
         try {
             info = connect.connectorStatus(connectorName);
@@ -450,6 +480,35 @@ public class EmbeddedConnectClusterAssertions {
                 && info.tasks().stream().noneMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));
     }
 
+
+    /**
+     * Assert that a connector is in the stopped state and has no tasks.
+     *
+     * @param connectorName the connector name
+     * @param detailMessage the assertion message
+     * @throws InterruptedException
+     */
+    public void assertConnectorIsStopped(String connectorName, String 
detailMessage)
+            throws InterruptedException {
+        try {
+            waitForCondition(
+                    () -> checkConnectorState(
+                            connectorName,
+                            AbstractStatus.State.STOPPED,
+                            0,
+                            null,
+                            Integer::equals
+                    ).orElse(false),
+                    CONNECTOR_SHUTDOWN_DURATION_MS,
+                    "At least the connector or one of its tasks is still 
running");
+            // If the connector is truly stopped, we should also see an empty 
set of tasks and task configs
+            assertEquals(Collections.emptyList(), 
connect.connectorInfo(connectorName).tasks());
+            assertEquals(Collections.emptyMap(), 
connect.taskConfigs(connectorName));
+        } catch (AssertionError e) {
+            throw new AssertionError(detailMessage, e);
+        }
+    }
+
     /**
      * Check whether the given connector state matches the current state of 
the connector and
      * whether it has at least the given number of tasks, with all the tasks 
matching the given
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index e519909b1d0..a659240da41 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -303,7 +303,6 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
         <Class name="org.apache.kafka.connect.runtime.WorkerConnector"/>
         <Or>
             <Method name="doStart"/>
-            <Method name="pause"/>
         </Or>
         <Bug pattern="SF_SWITCH_FALLTHROUGH"/>
     </Match>

Reply via email to