This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e49a5a265fd KAFKA-14783 (KIP-875): New STOPPED state for connectors
(#13424)
e49a5a265fd is described below
commit e49a5a265fd2d60d197b940b7c2a6867f7b90cb1
Author: Chris Egerton <[email protected]>
AuthorDate: Tue Apr 11 06:37:26 2023 -0700
KAFKA-14783 (KIP-875): New STOPPED state for connectors (#13424)
Reviewers: Mickael Maison <[email protected]>, Yash Mayya
<[email protected]>, Greg Harris <[email protected]>
---
.../kafka/connect/runtime/AbstractHerder.java | 6 +
.../kafka/connect/runtime/AbstractStatus.java | 3 +-
.../kafka/connect/runtime/ConnectorStatus.java | 6 +
.../org/apache/kafka/connect/runtime/Herder.java | 14 ++
.../apache/kafka/connect/runtime/StateTracker.java | 19 +-
.../apache/kafka/connect/runtime/TargetState.java | 6 +-
.../kafka/connect/runtime/WorkerConnector.java | 70 +++++--
.../kafka/connect/runtime/WorkerMetricsGroup.java | 5 +
.../apache/kafka/connect/runtime/WorkerTask.java | 26 ++-
.../runtime/distributed/DistributedHerder.java | 133 +++++++++-----
.../runtime/rest/resources/ConnectorsResource.java | 13 ++
.../runtime/standalone/StandaloneHerder.java | 16 +-
.../connect/storage/KafkaConfigBackingStore.java | 33 +++-
.../integration/ConnectWorkerIntegrationTest.java | 175 ++++++++++++++++++
.../ConnectorTopicsIntegrationTest.java | 4 +-
.../integration/ErrorHandlingIntegrationTest.java | 4 +-
.../integration/MonitorableSourceConnector.java | 3 +
.../RebalanceSourceConnectorsIntegrationTest.java | 2 +-
.../kafka/connect/runtime/WorkerConnectorTest.java | 170 +++++++++++++++--
.../runtime/distributed/DistributedHerderTest.java | 203 +++++++++++++++++++++
.../runtime/standalone/StandaloneHerderTest.java | 55 ++++++
.../storage/KafkaConfigBackingStoreTest.java | 56 ++++--
.../util/clusters/EmbeddedConnectCluster.java | 65 +++++++
.../clusters/EmbeddedConnectClusterAssertions.java | 67 ++++++-
gradle/spotbugs-exclude.xml | 1 -
25 files changed, 1037 insertions(+), 118 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 125b93dab73..84bd0f81108 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -171,6 +171,12 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
workerId, generation()));
}
+ @Override
+ public void onStop(String connector) {
+ statusBackingStore.put(new ConnectorStatus(connector,
AbstractStatus.State.STOPPED,
+ workerId, generation()));
+ }
+
@Override
public void onPause(String connector) {
statusBackingStore.put(new ConnectorStatus(connector,
ConnectorStatus.State.PAUSED,
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
index c5e07029a42..76036d610d7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
@@ -25,8 +25,9 @@ public abstract class AbstractStatus<T> {
RUNNING,
PAUSED,
FAILED,
- DESTROYED,
+ DESTROYED, // Never visible to users; destroyed Connector and Task
instances are not shown
RESTARTING,
+ STOPPED, // Only ever visible to users for Connector instances; never
for Task instances
}
private final T id;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
index 6da4a7db0fe..10ed188cdf8 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
@@ -45,6 +45,12 @@ public class ConnectorStatus extends AbstractStatus<String> {
*/
void onFailure(String connector, Throwable cause);
+ /**
+ * Invoked when the connector is stopped through the REST API
+ * @param connector The connector name
+ */
+ void onStop(String connector);
+
/**
* Invoked when the connector is paused through the REST API
* @param connector The connector name
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 007b0d190c2..099a012be3f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -246,9 +246,23 @@ public interface Herder {
*/
void restartConnectorAndTasks(RestartRequest request,
Callback<ConnectorStateInfo> cb);
+ /**
+ * Stop the connector. This call will asynchronously suspend processing by
the connector and
+ * shut down all of its tasks.
+ * @param connector name of the connector
+ * @param cb callback to invoke upon completion
+ */
+ void stopConnector(String connector, Callback<Void> cb);
+
/**
* Pause the connector. This call will asynchronously suspend processing
by the connector and all
* of its tasks.
+ * <p>
+ * Note that, unlike {@link #stopConnector(String, Callback)}, tasks for
this connector will not
+ * be shut down and none of their resources will be de-allocated. Instead,
they will be left in an
+ * "idling" state where no data is polled from them (if source tasks) or
given to them (if sink tasks),
+ * but all internal state kept by the tasks and their resources is left
intact and ready to begin
+ * processing records again as soon as the connector is {@link
#resumeConnector(String) resumed}.
* @param connector name of the connector
*/
void pauseConnector(String connector);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
index 297d47384c7..23ac3eac32f 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/StateTracker.java
@@ -73,6 +73,7 @@ public class StateTracker {
private final long unassignedTotalTimeMs;
private final long runningTotalTimeMs;
private final long pausedTotalTimeMs;
+ private final long stoppedTotalTimeMs;
private final long failedTotalTimeMs;
private final long destroyedTotalTimeMs;
private final long restartingTotalTimeMs;
@@ -81,16 +82,17 @@ public class StateTracker {
* The initial StateChange instance before any state has changed.
*/
StateChange() {
- this(null, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
+ this(null, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
}
- StateChange(State state, long startTime, long unassignedTotalTimeMs,
long runningTotalTimeMs,
- long pausedTotalTimeMs, long failedTotalTimeMs,
long destroyedTotalTimeMs, long restartingTotalTimeMs) {
+ StateChange(State state, long startTime, long unassignedTotalTimeMs,
long runningTotalTimeMs, long pausedTotalTimeMs,
+ long stoppedTotalTimeMs, long failedTotalTimeMs,
long destroyedTotalTimeMs, long restartingTotalTimeMs) {
this.state = state;
this.startTime = startTime;
this.unassignedTotalTimeMs = unassignedTotalTimeMs;
this.runningTotalTimeMs = runningTotalTimeMs;
this.pausedTotalTimeMs = pausedTotalTimeMs;
+ this.stoppedTotalTimeMs = stoppedTotalTimeMs;
this.failedTotalTimeMs = failedTotalTimeMs;
this.destroyedTotalTimeMs = destroyedTotalTimeMs;
this.restartingTotalTimeMs = restartingTotalTimeMs;
@@ -106,7 +108,7 @@ public class StateTracker {
*/
public StateChange newState(State state, long now) {
if (this.state == null) {
- return new StateChange(state, now, 0L, 0L, 0L, 0L, 0L, 0L);
+ return new StateChange(state, now, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
}
if (state == this.state) {
return this;
@@ -114,6 +116,7 @@ public class StateTracker {
long unassignedTime = this.unassignedTotalTimeMs;
long runningTime = this.runningTotalTimeMs;
long pausedTime = this.pausedTotalTimeMs;
+ long stoppedTime = this.stoppedTotalTimeMs;
long failedTime = this.failedTotalTimeMs;
long destroyedTime = this.destroyedTotalTimeMs;
long restartingTime = this.restartingTotalTimeMs;
@@ -128,6 +131,9 @@ public class StateTracker {
case PAUSED:
pausedTime += duration;
break;
+ case STOPPED:
+ stoppedTime += duration;
+ break;
case FAILED:
failedTime += duration;
break;
@@ -138,7 +144,7 @@ public class StateTracker {
restartingTime += duration;
break;
}
- return new StateChange(state, now, unassignedTime, runningTime,
pausedTime, failedTime, destroyedTime, restartingTime);
+ return new StateChange(state, now, unassignedTime, runningTime,
pausedTime, stoppedTime, failedTime, destroyedTime, restartingTime);
}
/**
@@ -164,6 +170,9 @@ public class StateTracker {
case PAUSED:
durationDesired += pausedTotalTimeMs;
break;
+ case STOPPED:
+ durationDesired += stoppedTotalTimeMs;
+ break;
case FAILED:
durationDesired += failedTotalTimeMs;
break;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java
index f2486a6a0eb..07f8372e969 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java
@@ -22,7 +22,10 @@ package org.apache.kafka.connect.runtime;
* target state is "STARTED." This does not mean it has actually started, just
that
* the Connect framework will attempt to start it after its tasks have been
assigned.
* After the connector has been paused, the target state will change to PAUSED,
- * and all the tasks will stop doing work.
+ * and all the tasks will stop doing work. A target state of STOPPED is
similar to
+ * PAUSED, but is also accompanied by a full shutdown of the connector's tasks,
+ * including deallocation of any Kafka clients, SMTs, and other resources
brought
+ * up for or by that task.
* <p>
* Target states are persisted in the config topic, which is read by all of the
* workers in the group. When a worker sees a new target state for a connector
which
@@ -33,4 +36,5 @@ package org.apache.kafka.connect.runtime;
public enum TargetState {
STARTED,
PAUSED,
+ STOPPED,
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 50bf3e672c2..680cb7a6f29 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -57,7 +57,8 @@ public class WorkerConnector implements Runnable {
private enum State {
INIT, // initial state before startup
- STOPPED, // the connector has been stopped/paused.
+ PAUSED, // The connector has been paused.
+ STOPPED, // the connector has been stopped.
STARTED, // the connector has been started/resumed.
FAILED, // the connector has failed (no further transitions are
possible after this state)
}
@@ -186,6 +187,7 @@ public class WorkerConnector implements Runnable {
return false;
case INIT:
+ case PAUSED:
case STOPPED:
connector.start(config);
this.state = State.STARTED;
@@ -220,29 +222,40 @@ public class WorkerConnector implements Runnable {
return state == State.STARTED;
}
- @SuppressWarnings("fallthrough")
- private void pause() {
+ private void suspend(boolean paused) {
+ State newState = paused ? State.PAUSED : State.STOPPED;
try {
- switch (state) {
- case STOPPED:
- return;
+ if (state == newState) {
+ // Already in the desired state
+ return;
+ }
- case STARTED:
- connector.stop();
- // fall through
+ if (state == State.STARTED) {
+ connector.stop();
+ }
- case INIT:
- statusListener.onPause(connName);
- this.state = State.STOPPED;
- break;
+ if (state == State.FAILED && newState != State.STOPPED) {
+ throw new IllegalArgumentException("Cannot transition to
non-stopped state when connector has already failed");
+ }
- default:
- throw new IllegalArgumentException("Cannot pause connector
in state " + state);
+ if (paused) {
+ statusListener.onPause(connName);
+ } else {
+ statusListener.onStop(connName);
}
+
+ this.state = newState;
} catch (Throwable t) {
- log.error("{} Error while shutting down connector", this, t);
- statusListener.onFailure(connName, t);
- this.state = State.FAILED;
+ log.error("{} Error while {} connector", this, paused ? "pausing"
: "stopping", t);
+ if (paused) {
+ statusListener.onFailure(connName, t);
+ this.state = State.FAILED;
+ } else {
+ // We say the connector is STOPPED even if it fails at this
point
+ this.state = State.STOPPED;
+ // One more try to make sure the status is updated correctly
+ statusListener.onStop(connName);
+ }
}
}
@@ -332,7 +345,8 @@ public class WorkerConnector implements Runnable {
}
void doTransitionTo(TargetState targetState, Callback<TargetState>
stateChangeCallback) {
- if (state == State.FAILED) {
+ // Edge case: we don't do transitions most of the time if we've
already failed, but for the STOPPED state, it's fine
+ if (state == State.FAILED && targetState != TargetState.STOPPED) {
stateChangeCallback.onCompletion(
new ConnectException(this + " Cannot transition connector
to " + targetState + " since it has failed"),
null);
@@ -354,7 +368,9 @@ public class WorkerConnector implements Runnable {
private void doTransitionTo(TargetState targetState) throws Throwable {
log.debug("{} Transition connector to {}", this, targetState);
if (targetState == TargetState.PAUSED) {
- pause();
+ suspend(true);
+ } else if (targetState == TargetState.STOPPED) {
+ suspend(false);
} else if (targetState == TargetState.STARTED) {
if (state == State.INIT)
start();
@@ -448,6 +464,16 @@ public class WorkerConnector implements Runnable {
}
}
+ @Override
+ public void onStop(String connector) {
+ state = AbstractStatus.State.STOPPED;
+ synchronized (this) {
+ if (!cancelled) {
+ delegate.onStop(connector);
+ }
+ }
+ }
+
@Override
public void onPause(String connector) {
state = AbstractStatus.State.PAUSED;
@@ -502,6 +528,10 @@ public class WorkerConnector implements Runnable {
return state == AbstractStatus.State.PAUSED;
}
+ boolean isStopped() {
+ return state == AbstractStatus.State.STOPPED;
+ }
+
boolean isFailed() {
return state == AbstractStatus.State.FAILED;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
index f03bc4f2b79..35f2f634059 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
@@ -128,6 +128,11 @@ class WorkerMetricsGroup {
delegateListener.onStartup(connector);
}
+ @Override
+ public void onStop(final String connector) {
+ delegateListener.onStop(connector);
+ }
+
@Override
public void onPause(final String connector) {
delegateListener.onPause(connector);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index a669f6a040f..ce40814a575 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -169,7 +169,9 @@ abstract class WorkerTask implements Runnable {
}
protected boolean isStopping() {
- return stopping;
+ // The target state should never be STOPPED, but if things go wrong
and it somehow is,
+ // we handle that identically to a request to shut down the task
+ return stopping || targetState == TargetState.STOPPED;
}
protected boolean isCancelled() {
@@ -188,7 +190,7 @@ abstract class WorkerTask implements Runnable {
private void doRun() throws InterruptedException {
try {
synchronized (this) {
- if (stopping)
+ if (isStopping())
return;
if (targetState == TargetState.PAUSED) {
@@ -204,7 +206,7 @@ abstract class WorkerTask implements Runnable {
failed = true;
if (cancelled) {
log.warn("{} After being scheduled for shutdown, the orphan
task threw an uncaught exception. A newer instance of this task might be
already running", this, t);
- } else if (stopping) {
+ } else if (isStopping()) {
log.warn("{} After being scheduled for shutdown, task threw an
uncaught exception.", this, t);
} else {
log.error("{} Task threw an uncaught and unrecoverable
exception. Task is being killed and will not recover until manually restarted",
this, t);
@@ -281,7 +283,7 @@ abstract class WorkerTask implements Runnable {
protected boolean awaitUnpause() throws InterruptedException {
synchronized (this) {
while (targetState == TargetState.PAUSED) {
- if (stopping)
+ if (isStopping())
return false;
this.wait();
}
@@ -291,9 +293,21 @@ abstract class WorkerTask implements Runnable {
public void transitionTo(TargetState state) {
synchronized (this) {
- // ignore the state change if we are stopping
- if (stopping)
+ // Ignore the state change if we are stopping.
+ // This has the consequence that, if we ever transition to the
STOPPED target state (which
+ // should never happen since whole point of that state is that it
comes with a complete
+ // shutdown of all the tasks for the connector), we will never be
able to transition out of it.
+ // Since part of transitioning to the STOPPED state is that we
shut down the task and all of
+ // its resources (Kafka clients, SMTs, etc.), this is a reasonable
way to do things; otherwise,
+ // we'd have to re-instantiate all of those resources to be able
to resume (or even just pause)
+ // the task .
+ if (isStopping()) {
+ log.debug("{} Ignoring request to transition stopped task {}
to state {}", this, id, state);
return;
+ }
+
+ if (targetState == TargetState.STOPPED)
+ log.warn("{} Received unexpected request to transition task {}
to state {}; will shut down in response", this, id, TargetState.STOPPED);
this.targetState = state;
this.notifyAll();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 89d2bbbefed..b5f92386ab0 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1094,6 +1094,38 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
);
}
+ @Override
+ public void stopConnector(final String connName, final Callback<Void>
callback) {
+ log.trace("Submitting request to transition connector {} to STOPPED
state", connName);
+
+ addRequest(
+ () -> {
+ if (!configState.contains(connName))
+ throw new NotFoundException("Unknown connector " +
connName);
+
+ // We only allow the leader to handle this request since
it involves writing task configs to the config topic
+ if (!isLeader()) {
+ callback.onCompletion(new NotLeaderException("Only the
leader can transition connectors to the STOPPED state.", leaderUrl()), null);
+ return null;
+ }
+
+ // We write the task configs first since, if we fail
between then and writing the target state, the
+ // cluster is still kept in a healthy state. A RUNNING
connector with zero tasks is acceptable (although,
+ // if the connector is reassigned during the ensuing
rebalance, it is likely that it will immediately generate
+ // a non-empty set of task configs). A STOPPED connector
with a non-empty set of tasks is less acceptable
+ // and likely to confuse users.
+ writeTaskConfigs(connName, Collections.emptyList());
+ configBackingStore.putTargetState(connName,
TargetState.STOPPED);
+ // Force a read of the new target state for the connector
+ refreshConfigSnapshot(workerSyncTimeoutMs);
+
+ callback.onCompletion(null, null);
+ return null;
+ },
+ forwardErrorCallback(callback)
+ );
+ }
+
@Override
public void requestTaskReconfiguration(final String connName) {
log.trace("Submitting connector task reconfiguration request {}",
connName);
@@ -1151,7 +1183,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
else if (!configState.contains(connName))
callback.onCompletion(new NotFoundException("Connector " +
connName + " not found"), null);
else {
- writeToConfigTopicAsLeader(() ->
configBackingStore.putTaskConfigs(connName, configs));
+ writeTaskConfigs(connName, configs);
callback.onCompletion(null, null);
}
return null;
@@ -1963,6 +1995,9 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
if (!worker.isRunning(connName)) {
log.info("Skipping reconfiguration of connector {} since it is
not running", connName);
return;
+ } else if (configState.targetState(connName) !=
TargetState.STARTED) {
+ log.info("Skipping reconfiguration of connector {} since its
target state is {}", connName, configState.targetState(connName));
+ return;
}
Map<String, String> configs =
configState.connectorConfig(connName);
@@ -1975,52 +2010,66 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
}
final List<Map<String, String>> taskProps =
worker.connectorTaskConfigs(connName, connConfig);
- if (taskConfigsChanged(configState, connName, taskProps)) {
- List<Map<String, String>> rawTaskProps =
reverseTransform(connName, configState, taskProps);
- if (isLeader()) {
- writeToConfigTopicAsLeader(() ->
configBackingStore.putTaskConfigs(connName, rawTaskProps));
- cb.onCompletion(null, null);
- } else if (restClient == null) {
- throw new NotLeaderException("This worker is not able to
communicate with the leader of the cluster, "
- + "which is required for dynamically-reconfiguring
connectors. If running MirrorMaker 2 "
- + "in dedicated mode, consider enabling
inter-worker communication via the "
- + "'dedicated.mode.enable.internal.rest'
property.",
- leaderUrl()
- );
- } else {
- // We cannot forward the request on the same thread
because this reconfiguration can happen as a result of connector
- // addition or removal. If we blocked waiting for the
response from leader, we may be kicked out of the worker group.
- forwardRequestExecutor.submit(() -> {
- try {
- String leaderUrl = leaderUrl();
- if (Utils.isBlank(leaderUrl)) {
- cb.onCompletion(new ConnectException("Request
to leader to " +
- "reconfigure connector tasks failed " +
- "because the URL of the leader's REST
interface is empty!"), null);
- return;
- }
- String reconfigUrl = namespacedUrl(leaderUrl)
- .path("connectors")
- .path(connName)
- .path("tasks")
- .build()
- .toString();
- log.trace("Forwarding task configurations for
connector {} to leader", connName);
- restClient.httpRequest(reconfigUrl, "POST", null,
rawTaskProps, null, sessionKey, requestSignatureAlgorithm);
- cb.onCompletion(null, null);
- log.trace("Request to leader to reconfigure
connector tasks succeeded");
- } catch (ConnectException e) {
- log.error("Request to leader to reconfigure
connector tasks failed", e);
- cb.onCompletion(e, null);
- }
- });
- }
- }
+ publishConnectorTaskConfigs(connName, taskProps, cb);
} catch (Throwable t) {
cb.onCompletion(t, null);
}
}
+ private void publishConnectorTaskConfigs(String connName, List<Map<String,
String>> taskProps, Callback<Void> cb) {
+ if (!taskConfigsChanged(configState, connName, taskProps)) {
+ return;
+ }
+
+ List<Map<String, String>> rawTaskProps = reverseTransform(connName,
configState, taskProps);
+ if (isLeader()) {
+ writeTaskConfigs(connName, rawTaskProps);
+ cb.onCompletion(null, null);
+ } else if (restClient == null) {
+ throw new NotLeaderException("This worker is not able to
communicate with the leader of the cluster, "
+ + "which is required for dynamically-reconfiguring
connectors. If running MirrorMaker 2 "
+ + "in dedicated mode, consider enabling inter-worker
communication via the "
+ + "'dedicated.mode.enable.internal.rest' property.",
+ leaderUrl()
+ );
+ } else {
+ // We cannot forward the request on the same thread because this
reconfiguration can happen as a result of connector
+ // addition or removal. If we blocked waiting for the response
from leader, we may be kicked out of the worker group.
+ forwardRequestExecutor.submit(() -> {
+ try {
+ String leaderUrl = leaderUrl();
+ if (Utils.isBlank(leaderUrl)) {
+ cb.onCompletion(new ConnectException("Request to
leader to " +
+ "reconfigure connector tasks failed " +
+ "because the URL of the leader's REST
interface is empty!"), null);
+ return;
+ }
+ String reconfigUrl = UriBuilder.fromUri(leaderUrl)
+ .path("connectors")
+ .path(connName)
+ .path("tasks")
+ .build()
+ .toString();
+ log.trace("Forwarding task configurations for connector {}
to leader", connName);
+ restClient.httpRequest(reconfigUrl, "POST", null,
rawTaskProps, null, sessionKey, requestSignatureAlgorithm);
+ cb.onCompletion(null, null);
+ } catch (ConnectException e) {
+ log.error("Request to leader to reconfigure connector
tasks failed", e);
+ cb.onCompletion(e, null);
+ }
+ });
+ }
+ }
+
+ private void writeTaskConfigs(String connName, List<Map<String, String>>
taskConfigs) {
+ if (!taskConfigs.isEmpty()) {
+ if (configState.targetState(connName) == TargetState.STOPPED)
+ throw new BadRequestException("Cannot submit non-empty set of
task configs for stopped connector " + connName);
+ }
+
+ writeToConfigTopicAsLeader(() ->
configBackingStore.putTaskConfigs(connName, taskConfigs));
+ }
+
// Invoked by exactly-once worker source tasks after they have
successfully initialized their transactional
// producer to ensure that it is still safe to bring up the task
private void verifyTaskGenerationAndOwnership(ConnectorTaskId id, int
initialTaskGen) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 086b98dc653..fd53f5bb1de 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -266,6 +266,19 @@ public class ConnectorsResource implements ConnectResource
{
return Response.accepted().entity(stateInfo).build();
}
+ @PUT
+ @Path("/{connector}/stop")
+ @Operation(summary = "Stop the specified connector",
+ description = "This operation is idempotent and has no effects
if the connector is already stopped")
+ public void stopConnector(
+ @PathParam("connector") String connector,
+ final @Context HttpHeaders headers,
+ final @Parameter(hidden = true) @QueryParam("forward") Boolean
forward) throws Throwable {
+ FutureCallback<Void> cb = new FutureCallback<>();
+ herder.stopConnector(connector, cb);
+ requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector
+ "/stop", "PUT", headers, null, forward);
+ }
+
@PUT
@Path("/{connector}/pause")
@Operation(summary = "Pause the specified connector",
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 3b3c3ced06f..b3c83e3dd78 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -238,6 +238,17 @@ public class StandaloneHerder extends AbstractHerder {
}
}
+ @Override
+ public synchronized void stopConnector(String connName, Callback<Void>
callback) {
+ try {
+ removeConnectorTasks(connName);
+ configBackingStore.putTargetState(connName, TargetState.STOPPED);
+ callback.onCompletion(null, null);
+ } catch (Throwable t) {
+ callback.onCompletion(t, null);
+ }
+ }
+
@Override
public synchronized void requestTaskReconfiguration(String connName) {
if (!worker.connectorNames().contains(connName)) {
@@ -429,7 +440,10 @@ public class StandaloneHerder extends AbstractHerder {
private void updateConnectorTasks(String connName) {
if (!worker.isRunning(connName)) {
- log.info("Skipping update of connector {} since it is not
running", connName);
+ log.info("Skipping update of tasks for connector {} since it is
not running", connName);
+ return;
+ } else if (configState.targetState(connName) != TargetState.STARTED) {
+ log.info("Skipping update of tasks for connector {} since its
target state is {}", connName, configState.targetState(connName));
return;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 9952ef9d66b..36b39bdb58c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -79,6 +79,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
+import static org.apache.kafka.connect.runtime.TargetState.PAUSED;
+import static org.apache.kafka.connect.runtime.TargetState.STOPPED;
import static org.apache.kafka.connect.util.ConnectUtils.className;
/**
@@ -240,6 +242,10 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct()
.field("state", Schema.STRING_SCHEMA)
.build();
+ public static final Schema TARGET_STATE_V1 = SchemaBuilder.struct()
+ .field("state", Schema.STRING_SCHEMA)
+ .field("state.v2", Schema.OPTIONAL_STRING_SCHEMA)
+ .build();
public static final Schema TASK_COUNT_RECORD_V0 = SchemaBuilder.struct()
.field("task-count", Schema.INT32_SCHEMA)
.build();
@@ -633,9 +639,11 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
*/
@Override
public void putTargetState(String connector, TargetState state) {
- Struct connectTargetState = new Struct(TARGET_STATE_V0);
- connectTargetState.put("state", state.name());
- byte[] serializedTargetState = converter.fromConnectData(topic,
TARGET_STATE_V0, connectTargetState);
+ Struct connectTargetState = new Struct(TARGET_STATE_V1);
+ // Older workers don't support the STOPPED state; fall back on PAUSED
+ connectTargetState.put("state", state == STOPPED ? PAUSED.name() :
state.name());
+ connectTargetState.put("state.v2", state.name());
+ byte[] serializedTargetState = converter.fromConnectData(topic,
TARGET_STATE_V1, connectTargetState);
log.debug("Writing target state {} for connector {}", state,
connector);
try {
configLog.send(TARGET_STATE_KEY(connector),
serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
@@ -928,11 +936,22 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
return;
}
@SuppressWarnings("unchecked")
- Object targetState = ((Map<String, Object>)
value.value()).get("state");
- if (!(targetState instanceof String)) {
- log.error("Invalid data for target state for connector
'{}': 'state' field should be a String but is {}",
+ Map<String, Object> valueMap = (Map<String, Object>)
value.value();
+ Object targetState = valueMap.get("state.v2");
+ if (targetState != null && !(targetState instanceof String)) {
+ log.error("Invalid data for target state for connector
'{}': 'state.v2' field should be a String but is {}",
connectorName, className(targetState));
- return;
+ // We don't return here; it's still possible that there's
a value we can use in the older state field
+ targetState = null;
+ }
+ if (targetState == null) {
+ // This record may have been written by an older worker;
fall back on the older state field
+ targetState = valueMap.get("state");
+ if (!(targetState instanceof String)) {
+ log.error("Invalid data for target state for connector
'{}': 'state' field should be a String but is {}",
+ connectorName, className(targetState));
+ return;
+ }
}
try {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 5cd794e7c83..65ecaa2b2c8 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -325,6 +325,181 @@ public class ConnectWorkerIntegrationTest {
assertTrue("Connector and all tasks were not stopped in time",
stopCounter.await(1, TimeUnit.MINUTES));
}
+ /**
+ * Verify that the target state (started, paused, stopped) of a connector
can be updated, with
+ * an emphasis on ensuring that the transitions between each state are
correct.
+ * <p>
+ * The transitions we need to cover are:
+ * <ol>
+ * <li>RUNNING -> PAUSED</li>
+ * <li>RUNNING -> STOPPED</li>
+ * <li>PAUSED -> RUNNING</li>
+ * <li>PAUSED -> STOPPED</li>
+ * <li>STOPPED -> RUNNING</li>
+ * <li>STOPPED -> PAUSED</li>
+ * </ol>
+ * With some reordering, we can perform each transition just once:
+ * <ul>
+ * <li>Start with RUNNING</li>
+ * <li>Transition to STOPPED (2)</li>
+ * <li>Transition to RUNNING (5)</li>
+ * <li>Transition to PAUSED (1)</li>
+ * <li>Transition to STOPPED (4)</li>
+ * <li>Transition to PAUSED (6)</li>
+ * <li>Transition to RUNNING (3)</li>
+ * </ul>
+ */
+ @Test
+ public void testPauseStopResume() throws Exception {
+ connect = connectBuilder.build();
+ // start the clusters
+ connect.start();
+
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
+
+ // Want to make sure to use multiple tasks
+ final int numTasks = 4;
+ Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+ props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks));
+
+ // Start with RUNNING
+ connect.configureConnector(CONNECTOR_NAME, props);
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME,
+ numTasks,
+ "Connector tasks did not start in time"
+ );
+
+ // Transition to STOPPED
+ connect.stopConnector(CONNECTOR_NAME);
+ // Issue a second request to ensure that this operation is idempotent
+ connect.stopConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorIsStopped(
+ CONNECTOR_NAME,
+ "Connector did not stop in time"
+ );
+
+ // Transition to RUNNING
+ connect.resumeConnector(CONNECTOR_NAME);
+ // Issue a second request to ensure that this operation is idempotent
+ connect.resumeConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME,
+ numTasks,
+ "Connector tasks did not resume in time"
+ );
+
+ // Transition to PAUSED
+ connect.pauseConnector(CONNECTOR_NAME);
+ // Issue a second request to ensure that this operation is idempotent
+ connect.pauseConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+ CONNECTOR_NAME,
+ numTasks,
+ "Connector did not pause in time"
+ );
+
+ // Transition to STOPPED
+ connect.stopConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorIsStopped(
+ CONNECTOR_NAME,
+ "Connector did not stop in time"
+ );
+
+ // Transition to PAUSED
+ connect.pauseConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+ CONNECTOR_NAME,
+ 0,
+ "Connector did not pause in time"
+ );
+
+ // Transition to RUNNING
+ connect.resumeConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME,
+ numTasks,
+ "Connector tasks did not resume in time"
+ );
+
+ // Delete the connector
+ connect.deleteConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndTasksAreNotRunning(
+ CONNECTOR_NAME,
+ "Connector tasks were not destroyed in time"
+ );
+ }
+
+ /**
+ * Test out the {@code STOPPED} state introduced in
+ * <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Newtargetstate:STOPPED">KIP-875</a>,
+ * with an emphasis on correctly handling errors thrown from the connector.
+ */
+ @Test
+ public void testStoppedState() throws Exception {
+ connect = connectBuilder.build();
+ // start the clusters
+ connect.start();
+
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
+
+ Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+ // Fail the connector on startup
+ props.put("connector.start.inject.error", "true");
+
+ // Start the connector (should fail immediately and generate no tasks)
+ connect.configureConnector(CONNECTOR_NAME, props);
+ connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+ CONNECTOR_NAME,
+ 0,
+ "Connector should have failed and not generated any tasks"
+ );
+
+ // Stopping a failed connector updates its state to STOPPED in the
REST API
+ connect.stopConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorIsStopped(
+ CONNECTOR_NAME,
+ "Connector did not stop in time"
+ );
+
+ // Can resume a connector after its Connector has failed before
shutdown after receiving a stop request
+ props.remove("connector.start.inject.error");
+ connect.configureConnector(CONNECTOR_NAME, props);
+ connect.resumeConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME,
+ NUM_TASKS,
+ "Connector or tasks did not start running healthily in time"
+ );
+
+ // Fail the connector on shutdown
+ props.put("connector.stop.inject.error", "true");
+ // Stopping a connector that fails during shutdown after receiving a
stop request updates its state to STOPPED in the REST API
+ connect.configureConnector(CONNECTOR_NAME, props);
+ connect.stopConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorIsStopped(
+ CONNECTOR_NAME,
+ "Connector did not stop in time"
+ );
+
+ // Can resume a connector after its Connector has failed during
shutdown after receiving a stop request
+ connect.resumeConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME,
+ NUM_TASKS,
+ "Connector or tasks did not start running healthily in time"
+ );
+
+ // Can delete a stopped connector
+ connect.deleteConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndTasksAreNotRunning(
+ CONNECTOR_NAME,
+ "Connector and all of its tasks should no longer be running"
+ );
+ }
+
private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup up props for the source connector
Map<String, String> props = new HashMap<>();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
index 8c4e1566931..a8b812f8c31 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
@@ -150,7 +150,7 @@ public class ConnectorTopicsIntegrationTest {
// deleting a connector resets its active topics
connect.deleteConnector(BAR_CONNECTOR);
- connect.assertions().assertConnectorAndTasksAreStopped(BAR_CONNECTOR,
+
connect.assertions().assertConnectorAndTasksAreNotRunning(BAR_CONNECTOR,
"Connector tasks did not stop in time.");
connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR,
Collections.emptyList(),
@@ -205,7 +205,7 @@ public class ConnectorTopicsIntegrationTest {
// deleting a connector resets its active topics
connect.deleteConnector(FOO_CONNECTOR);
- connect.assertions().assertConnectorAndTasksAreStopped(FOO_CONNECTOR,
+
connect.assertions().assertConnectorAndTasksAreNotRunning(FOO_CONNECTOR,
"Connector tasks did not stop in time.");
connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR,
Collections.emptyList(),
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 5bc5fcdbd25..7fc03852363 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -178,7 +178,7 @@ public class ErrorHandlingIntegrationTest {
}
connect.deleteConnector(CONNECTOR_NAME);
- connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME,
+
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
"Connector tasks did not stop in time.");
}
@@ -247,7 +247,7 @@ public class ErrorHandlingIntegrationTest {
ConsumerRecords<byte[], byte[]> messages =
connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS,
DLQ_TOPIC);
connect.deleteConnector(CONNECTOR_NAME);
- connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME,
+
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
"Connector tasks did not stop in time.");
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index 1bbff23e953..cf7a1f408f1 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -104,6 +104,9 @@ public class MonitorableSourceConnector extends
SampleSourceConnector {
public void stop() {
log.info("Stopped {} connector {}", this.getClass().getSimpleName(),
connectorName);
connectorHandle.recordConnectorStop();
+ if
(Boolean.parseBoolean(commonConfigs.getOrDefault("connector.stop.inject.error",
"false"))) {
+ throw new RuntimeException("Injecting errors during connector
stop");
+ }
}
@Override
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index ebc53edee2d..04e12ea41e0 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -206,7 +206,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
// delete connector
connect.deleteConnector(CONNECTOR_NAME + 3);
- connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME
+ 3,
+
connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME + 3,
"Connector tasks did not stop in time.");
waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
index e716efc091d..cf07d5d2e29 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -103,7 +103,7 @@ public class WorkerConnectorTest {
assertFailedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
- assertStoppedMetric(workerConnector);
+ assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(listener).onFailure(CONNECTOR, exception);
@@ -127,7 +127,7 @@ public class WorkerConnectorTest {
assertFailedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
- assertStoppedMetric(workerConnector);
+ assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(listener).onFailure(CONNECTOR, exception);
@@ -153,7 +153,7 @@ public class WorkerConnectorTest {
assertRunningMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
- assertStoppedMetric(workerConnector);
+ assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
@@ -181,7 +181,7 @@ public class WorkerConnectorTest {
assertPausedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
- assertStoppedMetric(workerConnector);
+ assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
@@ -195,6 +195,37 @@ public class WorkerConnectorTest {
verifyNoMoreInteractions(onStateChange);
}
+ @Test
+ public void testStartupAndStop() {
+ connector = sinkConnector;
+ when(connector.version()).thenReturn(VERSION);
+
+ Callback<TargetState> onStateChange = mockCallback();
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
+
+ workerConnector.initialize();
+ assertInitializedSinkMetric(workerConnector);
+
+ workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
+ assertRunningMetric(workerConnector);
+ workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
+ assertStoppedMetric(workerConnector);
+ workerConnector.shutdown();
+ workerConnector.doShutdown();
+ assertDestroyedMetric(workerConnector);
+
+ verifyInitialize();
+ verify(connector).start(CONFIG);
+ verify(listener).onStartup(CONNECTOR);
+ verify(listener).onStop(CONNECTOR);
+ verifyCleanShutdown(true);
+
+ InOrder inOrder = inOrder(onStateChange);
+ inOrder.verify(onStateChange).onCompletion(isNull(),
eq(TargetState.STARTED));
+ inOrder.verify(onStateChange).onCompletion(isNull(),
eq(TargetState.STOPPED));
+ verifyNoMoreInteractions(onStateChange);
+ }
+
@Test
public void testOnResume() {
connector = sourceConnector;
@@ -213,7 +244,7 @@ public class WorkerConnectorTest {
assertRunningMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
- assertStoppedMetric(workerConnector);
+ assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(listener).onPause(CONNECTOR);
@@ -241,7 +272,7 @@ public class WorkerConnectorTest {
assertPausedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
- assertStoppedMetric(workerConnector);
+ assertDestroyedMetric(workerConnector);
verifyInitialize();
// connector never gets started
@@ -252,6 +283,31 @@ public class WorkerConnectorTest {
verifyNoMoreInteractions(onStateChange);
}
+ @Test
+ public void testStartupStopped() {
+ connector = sinkConnector;
+ when(connector.version()).thenReturn(VERSION);
+
+ Callback<TargetState> onStateChange = mockCallback();
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, null, null, classLoader);
+
+ workerConnector.initialize();
+ assertInitializedSinkMetric(workerConnector);
+ workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
+ assertStoppedMetric(workerConnector);
+ workerConnector.shutdown();
+ workerConnector.doShutdown();
+ assertDestroyedMetric(workerConnector);
+
+ verifyInitialize();
+ // connector never gets started
+ verify(listener).onStop(CONNECTOR);
+ verifyCleanShutdown(false);
+
+ verify(onStateChange).onCompletion(isNull(), eq(TargetState.STOPPED));
+ verifyNoMoreInteractions(onStateChange);
+ }
+
@Test
public void testStartupFailure() {
RuntimeException exception = new RuntimeException();
@@ -269,7 +325,7 @@ public class WorkerConnectorTest {
assertFailedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
- assertStoppedMetric(workerConnector);
+ assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
@@ -280,6 +336,49 @@ public class WorkerConnectorTest {
verifyNoMoreInteractions(onStateChange);
}
+ @Test
+ public void testStopFailure() {
+ RuntimeException exception = new RuntimeException();
+ connector = sourceConnector;
+
+ when(connector.version()).thenReturn(VERSION);
+
+ // Fail during the first call to stop, then succeed for the next
attempt
+ doThrow(exception).doNothing().when(connector).stop();
+
+ Callback<TargetState> onFirstStateChange = mockCallback();
+ Callback<TargetState> onSecondStateChange = mockCallback();
+ Callback<TargetState> onThirdStateChange = mockCallback();
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
+
+ workerConnector.initialize();
+ assertInitializedSourceMetric(workerConnector);
+ workerConnector.doTransitionTo(TargetState.STARTED,
onFirstStateChange);
+ assertRunningMetric(workerConnector);
+ workerConnector.doTransitionTo(TargetState.STOPPED,
onSecondStateChange);
+ assertStoppedMetric(workerConnector);
+ workerConnector.doTransitionTo(TargetState.STARTED,
onThirdStateChange);
+ assertRunningMetric(workerConnector);
+ workerConnector.shutdown();
+ workerConnector.doShutdown();
+ assertDestroyedMetric(workerConnector);
+
+ verifyInitialize();
+ verify(connector, times(2)).start(CONFIG);
+ verify(listener).onStartup(CONNECTOR);
+ verify(listener).onResume(CONNECTOR);
+ verify(listener).onStop(CONNECTOR);
+ verify(onFirstStateChange).onCompletion(isNull(),
eq(TargetState.STARTED));
+ verifyNoMoreInteractions(onFirstStateChange);
+ // We swallow failures when transitioning to the STOPPED state
+ verify(onSecondStateChange).onCompletion(isNull(),
eq(TargetState.STOPPED));
+ verifyNoMoreInteractions(onSecondStateChange);
+ verify(onThirdStateChange).onCompletion(isNull(),
eq(TargetState.STARTED));
+ verifyNoMoreInteractions(onThirdStateChange);
+ verifyShutdown(2, true, true);
+ verifyNoMoreInteractions(listener);
+ }
+
@Test
public void testShutdownFailure() {
RuntimeException exception = new RuntimeException();
@@ -327,7 +426,7 @@ public class WorkerConnectorTest {
assertRunningMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
- assertStoppedMetric(workerConnector);
+ assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
@@ -356,7 +455,7 @@ public class WorkerConnectorTest {
assertPausedMetric(workerConnector);
workerConnector.shutdown();
workerConnector.doShutdown();
- assertStoppedMetric(workerConnector);
+ assertDestroyedMetric(workerConnector);
verifyInitialize();
verify(connector).start(CONFIG);
@@ -370,6 +469,38 @@ public class WorkerConnectorTest {
verifyNoMoreInteractions(onStateChange);
}
+ @Test
+ public void testTransitionStoppedToStopped() {
+ connector = sourceConnector;
+ when(connector.version()).thenReturn(VERSION);
+
+ Callback<TargetState> onStateChange = mockCallback();
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR,
connector, connectorConfig, ctx, metrics, listener, offsetStorageReader,
offsetStore, classLoader);
+
+ workerConnector.initialize();
+ assertInitializedSourceMetric(workerConnector);
+ workerConnector.doTransitionTo(TargetState.STARTED, onStateChange);
+ assertRunningMetric(workerConnector);
+ workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
+ assertStoppedMetric(workerConnector);
+ workerConnector.doTransitionTo(TargetState.STOPPED, onStateChange);
+ assertStoppedMetric(workerConnector);
+ workerConnector.shutdown();
+ workerConnector.doShutdown();
+ assertDestroyedMetric(workerConnector);
+
+ verifyInitialize();
+ verify(connector).start(CONFIG);
+ verify(listener).onStartup(CONNECTOR);
+ verify(listener).onStop(CONNECTOR);
+ verifyCleanShutdown(true);
+
+ InOrder inOrder = inOrder(onStateChange);
+ inOrder.verify(onStateChange).onCompletion(isNull(),
eq(TargetState.STARTED));
+ inOrder.verify(onStateChange, times(2)).onCompletion(isNull(),
eq(TargetState.STOPPED));
+ verifyNoMoreInteractions(onStateChange);
+ }
+
@Test
public void testFailConnectorThatIsNeitherSourceNorSink() {
connector = mock(Connector.class);
@@ -389,13 +520,23 @@ public class WorkerConnectorTest {
protected void assertFailedMetric(WorkerConnector workerConnector) {
assertFalse(workerConnector.metrics().isUnassigned());
assertTrue(workerConnector.metrics().isFailed());
+ assertFalse(workerConnector.metrics().isStopped());
+ assertFalse(workerConnector.metrics().isPaused());
+ assertFalse(workerConnector.metrics().isRunning());
+ }
+
+ protected void assertStoppedMetric(WorkerConnector workerConnector) {
+ assertFalse(workerConnector.metrics().isUnassigned());
+ assertFalse(workerConnector.metrics().isFailed());
assertFalse(workerConnector.metrics().isPaused());
+ assertTrue(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isRunning());
}
protected void assertPausedMetric(WorkerConnector workerConnector) {
assertFalse(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
+ assertFalse(workerConnector.metrics().isStopped());
assertTrue(workerConnector.metrics().isPaused());
assertFalse(workerConnector.metrics().isRunning());
}
@@ -403,13 +544,15 @@ public class WorkerConnectorTest {
protected void assertRunningMetric(WorkerConnector workerConnector) {
assertFalse(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
+ assertFalse(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isPaused());
assertTrue(workerConnector.metrics().isRunning());
}
- protected void assertStoppedMetric(WorkerConnector workerConnector) {
+ protected void assertDestroyedMetric(WorkerConnector workerConnector) {
assertTrue(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
+ assertFalse(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isPaused());
assertFalse(workerConnector.metrics().isRunning());
}
@@ -425,6 +568,7 @@ public class WorkerConnectorTest {
protected void assertInitializedMetric(WorkerConnector workerConnector,
String expectedType) {
assertTrue(workerConnector.metrics().isUnassigned());
assertFalse(workerConnector.metrics().isFailed());
+ assertFalse(workerConnector.metrics().isStopped());
assertFalse(workerConnector.metrics().isPaused());
assertFalse(workerConnector.metrics().isRunning());
MetricGroup metricGroup = workerConnector.metrics().metricGroup();
@@ -457,6 +601,10 @@ public class WorkerConnectorTest {
}
private void verifyShutdown(boolean clean, boolean started) {
+ verifyShutdown(1, clean, started);
+ }
+
+ private void verifyShutdown(int connectorStops, boolean clean, boolean
started) {
verify(ctx).close();
if (connector instanceof SourceConnector) {
verify(offsetStorageReader).close();
@@ -466,7 +614,7 @@ public class WorkerConnectorTest {
verify(listener).onShutdown(CONNECTOR);
}
if (started) {
- verify(connector).stop();
+ verify(connector, times(connectorStops)).stop();
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 7ad75ae0b1b..33b8f966e2e 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -217,6 +217,17 @@ public class DistributedHerderTest {
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet());
+ private static final ClusterConfigState SNAPSHOT_STOPPED_CONN1 = new
ClusterConfigState(
+ 1,
+ null,
+ Collections.singletonMap(CONN1, 3),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
+ Collections.singletonMap(CONN1, TargetState.STOPPED),
+ Collections.emptyMap(), // Stopped connectors should have an empty
set of task configs
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG =
new ClusterConfigState(
1,
null,
@@ -2161,6 +2172,65 @@ public class DistributedHerderTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testConnectorStopped() throws Exception {
+ // ensure that target state changes are propagated to the worker
+
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+ // join
+ expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList());
+ expectConfigRefreshAndSnapshot(SNAPSHOT);
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+ Capture<Callback<TargetState>> onStart = newCapture();
+ worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(),
EasyMock.anyObject(),
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED),
capture(onStart));
+ PowerMock.expectLastCall().andAnswer(() -> {
+ onStart.getValue().onCompletion(null, TargetState.STARTED);
+ return true;
+ });
+ member.wakeup();
+ PowerMock.expectLastCall();
+ expectExecuteTaskReconfiguration(true, conn1SinkConfig, () ->
TASK_CONFIGS);
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ // handle the state change
+ member.wakeup();
+ PowerMock.expectLastCall();
+ member.ensureActive();
+ PowerMock.expectLastCall();
+
+
EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_STOPPED_CONN1);
+
+ Capture<Callback<TargetState>> onStop = newCapture();
+ worker.setTargetState(EasyMock.eq(CONN1),
EasyMock.eq(TargetState.STOPPED), capture(onStop));
+ PowerMock.expectLastCall().andAnswer(() -> {
+ onStart.getValue().onCompletion(null, TargetState.STOPPED);
+ return null;
+ });
+
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ // These will occur just before/during the third tick
+ member.ensureActive();
+ PowerMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ herder.tick(); // join
+ configUpdateListener.onConnectorTargetStateChange(CONN1); // state
changes to stopped
+ herder.tick(); // worker should apply the state change
+ herder.tick();
+
+ PowerMock.verifyAll();
+ }
+
@Test
public void testUnknownConnectorPaused() throws Exception {
EasyMock.expect(member.memberId()).andStubReturn("member");
@@ -2197,6 +2267,139 @@ public class DistributedHerderTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testStopConnector() throws Exception {
+ EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+ // join as leader
+ expectRebalance(1, Collections.emptyList(), singletonList(TASK0),
true);
+ expectConfigRefreshAndSnapshot(SNAPSHOT);
+ worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyObject(),
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+ PowerMock.expectLastCall().andReturn(true);
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ // handle stop request
+ member.wakeup();
+ PowerMock.expectLastCall();
+ member.ensureActive();
+ PowerMock.expectLastCall();
+ expectConfigRefreshAndSnapshot(SNAPSHOT);
+ configBackingStore.putTaskConfigs(CONN1, Collections.emptyList());
+ PowerMock.expectLastCall();
+ configBackingStore.putTargetState(CONN1, TargetState.STOPPED);
+ PowerMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ FutureCallback<Void> cb = new FutureCallback<>();
+
+ herder.tick(); // join
+ herder.stopConnector(CONN1, cb); // external request
+ herder.tick(); // continue
+
+ assertTrue("Callback should already have been invoked by herder",
cb.isDone());
+ cb.get(0, TimeUnit.MILLISECONDS);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testStopConnectorNotLeader() throws Exception {
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+ // join as leader
+ expectRebalance(1, Collections.emptyList(), singletonList(TASK0));
+ expectConfigRefreshAndSnapshot(SNAPSHOT);
+ worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyObject(),
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+ PowerMock.expectLastCall().andReturn(true);
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ // handle stop request
+ member.wakeup();
+ PowerMock.expectLastCall();
+ member.ensureActive();
+ PowerMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ FutureCallback<Void> cb = new FutureCallback<>();
+
+ herder.tick(); // join
+ herder.stopConnector(CONN1, cb); // external request
+ herder.tick(); // continue
+
+ assertTrue("Callback should already have been invoked by herder",
cb.isDone());
+ ExecutionException e = assertThrows(
+ "Should not be able to handle request to stop connector when
not leader",
+ ExecutionException.class,
+ () -> cb.get(0, TimeUnit.SECONDS)
+ );
+ assertTrue(e.getCause() instanceof NotLeaderException);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testStopConnectorFailToWriteTaskConfigs() throws Exception {
+ EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+ // join as leader
+ expectRebalance(1, Collections.emptyList(), singletonList(TASK0),
true);
+ expectConfigRefreshAndSnapshot(SNAPSHOT);
+ worker.startSourceTask(EasyMock.eq(TASK0), EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyObject(),
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+ PowerMock.expectLastCall().andReturn(true);
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ ConnectException taskConfigsWriteException = new
ConnectException("Could not write task configs to config topic");
+
+ // handle stop request
+ member.wakeup();
+ PowerMock.expectLastCall();
+ member.ensureActive();
+ PowerMock.expectLastCall();
+ configBackingStore.putTaskConfigs(CONN1, Collections.emptyList());
+ // We do not expect configBackingStore::putTargetState to be invoked,
which
+ // is intentional since that call should only take place if we are
first able to
+ // successfully write the empty list of task configs
+ PowerMock.expectLastCall().andThrow(taskConfigsWriteException);
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ FutureCallback<Void> cb = new FutureCallback<>();
+
+ herder.tick(); // join
+ herder.stopConnector(CONN1, cb); // external request
+ herder.tick(); // continue
+
+ assertTrue("Callback should already have been invoked by herder",
cb.isDone());
+ ExecutionException e = assertThrows(
+ "Should not be able to handle request to stop connector when
not leader",
+ ExecutionException.class,
+ () -> cb.get(0, TimeUnit.SECONDS)
+ );
+ assertEquals(e.getCause(), taskConfigsWriteException);
+
+ PowerMock.verifyAll();
+ }
+
@Test
public void testConnectorPausedRunningTaskOnly() throws Exception {
// even if we don't own the connector, we should still propagate
target state
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index b76d8f83824..9eb29d5de53 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -86,6 +86,7 @@ import static java.util.Collections.singletonMap;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.eq;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -733,6 +734,7 @@ public class StandaloneHerderTest {
expectStop();
statusBackingStore.put(new TaskStatus(new
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID,
0));
+ EasyMock.expectLastCall();
statusBackingStore.stop();
EasyMock.expectLastCall();
@@ -937,6 +939,50 @@ public class StandaloneHerderTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testTargetStates() throws Exception {
+ connector = PowerMock.createMock(BogusSourceConnector.class);
+ expectAdd(SourceSink.SOURCE);
+
+ Map<String, String> connectorConfig =
connectorConfig(SourceSink.SOURCE);
+ Connector connectorMock = PowerMock.createMock(SourceConnector.class);
+ expectConfigValidation(connectorMock, true, connectorConfig);
+
+ // We pause, then stop, the connector
+ expectTargetState(CONNECTOR_NAME, TargetState.PAUSED);
+ expectTargetState(CONNECTOR_NAME, TargetState.STOPPED);
+
+ // herder.stop() should stop any running connectors and tasks even if
destroyConnector was not invoked
+ expectStop();
+
+ statusBackingStore.put(new TaskStatus(new
ConnectorTaskId(CONNECTOR_NAME, 0), AbstractStatus.State.DESTROYED, WORKER_ID,
0));
+ EasyMock.expectLastCall();
+
+ statusBackingStore.stop();
+ EasyMock.expectLastCall();
+ worker.stop();
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ FutureCallback<Void> stopCallback = new FutureCallback<>();
+ FutureCallback<List<TaskInfo>> taskConfigsCallback = new
FutureCallback<>();
+
+ herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig, false,
createCallback);
+ Herder.Created<ConnectorInfo> connectorInfo =
createCallback.get(1000L, TimeUnit.SECONDS);
+ assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
+ herder.pauseConnector(CONNECTOR_NAME);
+ herder.stopConnector(CONNECTOR_NAME, stopCallback);
+ stopCallback.get(10L, TimeUnit.SECONDS);
+ herder.taskConfigs(CONNECTOR_NAME, taskConfigsCallback);
+ assertEquals(Collections.emptyList(), taskConfigsCallback.get(1,
TimeUnit.SECONDS));
+
+ herder.stop();
+ assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
+
+ PowerMock.verifyAll();
+ }
+
private void expectAdd(SourceSink sourceSink) {
Map<String, String> connectorProps = connectorConfig(sourceSink);
ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?
@@ -996,6 +1042,15 @@ public class StandaloneHerderTest {
PowerMock.expectLastCall().andReturn(sourceSink ==
SourceSink.SINK).anyTimes();
}
+ private void expectTargetState(String connector, TargetState state) {
+ Capture<Callback<TargetState>> stateChangeCallback =
Capture.newInstance();
+ worker.setTargetState(eq(connector), eq(state),
capture(stateChangeCallback));
+ EasyMock.expectLastCall().andAnswer(() -> {
+ stateChangeCallback.getValue().onCompletion(null, state);
+ return null;
+ });
+ }
+
private ConnectorInfo createdInfo(SourceSink sourceSink) {
return new ConnectorInfo(CONNECTOR_NAME, connectorConfig(sourceSink),
Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)),
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 63fe460685f..f06bdee9041 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -61,6 +61,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
@@ -145,8 +146,15 @@ public class KafkaConfigBackingStoreTest {
new
Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 6),
new
Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 9)
);
- private static final Struct TARGET_STATE_PAUSED = new
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED");
private static final Struct TARGET_STATE_STARTED = new
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "STARTED");
+ private static final Struct TARGET_STATE_PAUSED_LEGACY = new
Struct(KafkaConfigBackingStore.TARGET_STATE_V0)
+ .put("state", "PAUSED");
+ private static final Struct TARGET_STATE_PAUSED = new
Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
+ .put("state", "PAUSED")
+ .put("state.v2", "PAUSED");
+ private static final Struct TARGET_STATE_STOPPED = new
Struct(KafkaConfigBackingStore.TARGET_STATE_V1)
+ .put("state", "PAUSED")
+ .put("state.v2", "STOPPED");
private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
= new
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
@@ -460,7 +468,7 @@ public class KafkaConfigBackingStoreTest {
expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0,
CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(1));
// In the meantime, write a target state (which doesn't require write
privileges)
- expectConvert(KafkaConfigBackingStore.TARGET_STATE_V0,
TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
+ expectConvert(KafkaConfigBackingStore.TARGET_STATE_V1,
TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
storeLog.send("target-state-" + CONNECTOR_IDS.get(1),
CONFIGS_SERIALIZED.get(1));
EasyMock.expectLastCall().andReturn(producerFuture);
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
@@ -811,14 +819,18 @@ public class KafkaConfigBackingStoreTest {
new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 4, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
- CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()));
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L,
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(),
Optional.empty()));
LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
- deserialized.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED);
+ // A worker running an older version wrote this target state; make
sure we can handle it correctly
+ deserialized.put(CONFIGS_SERIALIZED.get(3),
TARGET_STATE_PAUSED_LEGACY);
deserialized.put(CONFIGS_SERIALIZED.get(4),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
- logOffset = 5;
+ deserialized.put(CONFIGS_SERIALIZED.get(5), TARGET_STATE_STOPPED);
+ logOffset = 6;
expectStart(existingRecords, deserialized);
@@ -834,9 +846,10 @@ public class KafkaConfigBackingStoreTest {
// Should see a single connector with initial state paused
ClusterConfigState configState = configStorage.snapshot();
- assertEquals(5, configState.offset()); // Should always be next to be
read, even if uncommitted
+ assertEquals(6, configState.offset()); // Should always be next to be
read, even if uncommitted
assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new
ArrayList<>(configState.connectors()));
assertEquals(TargetState.PAUSED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ assertEquals(TargetState.STOPPED,
configState.targetState(CONNECTOR_IDS.get(1)));
configStorage.stop();
@@ -857,18 +870,27 @@ public class KafkaConfigBackingStoreTest {
CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()));
- LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
- deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
- deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
- deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
- deserialized.put(CONFIGS_SERIALIZED.get(3),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ LinkedHashMap<byte[], Struct> deserializedOnStartup = new
LinkedHashMap<>();
+ deserializedOnStartup.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserializedOnStartup.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserializedOnStartup.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserializedOnStartup.put(CONFIGS_SERIALIZED.get(3),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
logOffset = 5;
- expectStart(existingRecords, deserialized);
+ expectStart(existingRecords, deserializedOnStartup);
+
+ LinkedHashMap<String, byte[]> serializedAfterStartup = new
LinkedHashMap<>();
+ serializedAfterStartup.put(TARGET_STATE_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ serializedAfterStartup.put(TARGET_STATE_KEYS.get(1),
CONFIGS_SERIALIZED.get(1));
- expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0),
TARGET_STATE_PAUSED);
+ Map<String, Struct> deserializedAfterStartup = new HashMap<>();
+ deserializedAfterStartup.put(TARGET_STATE_KEYS.get(0),
TARGET_STATE_PAUSED);
+ deserializedAfterStartup.put(TARGET_STATE_KEYS.get(1),
TARGET_STATE_STOPPED);
+
+ expectRead(serializedAfterStartup, deserializedAfterStartup);
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
+
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(1));
EasyMock.expectLastCall();
expectPartitionCount(1);
@@ -879,11 +901,17 @@ public class KafkaConfigBackingStoreTest {
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
- // Should see a single connector with initial state paused
+ // Should see a single connector with initial state started
ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)),
configStorage.connectorTargetStates.keySet());
assertEquals(TargetState.STARTED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ // Should see two connectors now, one paused and one stopped
configStorage.refresh(0, TimeUnit.SECONDS);
+ configState = configStorage.snapshot();
+ assertEquals(new HashSet<>(CONNECTOR_IDS),
configStorage.connectorTargetStates.keySet());
+ assertEquals(TargetState.PAUSED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ assertEquals(TargetState.STOPPED,
configState.targetState(CONNECTOR_IDS.get(1)));
configStorage.stop();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 97f8581e9d9..e849476f457 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@@ -391,6 +392,22 @@ public class EmbeddedConnectCluster {
}
}
+ /**
+ * Stop an existing connector.
+ *
+ * @param connName name of the connector to be paused
+ * @throws ConnectRestException if the REST API returns error status
+ * @throws ConnectException for any other error.
+ */
+ public void stopConnector(String connName) {
+ String url = endpointForResource(String.format("connectors/%s/stop",
connName));
+ Response response = requestPut(url, "");
+ if (response.getStatus() >=
Response.Status.BAD_REQUEST.getStatusCode()) {
+ throw new ConnectRestException(response.getStatus(),
+ "Could not execute PUT request. Error response: " +
responseToString(response));
+ }
+ }
+
/**
* Pause an existing connector.
*
@@ -554,6 +571,54 @@ public class EmbeddedConnectCluster {
"Could not read connector state. Error response: " +
responseToString(response));
}
+ /**
+ * Get the info of a connector running in this cluster (retrieved via the
<code>GET /connectors/{connector}</code> endpoint).
+
+ * @param connectorName name of the connector
+ * @return an instance of {@link ConnectorInfo} populated with state
information of the connector and its tasks.
+ */
+ public ConnectorInfo connectorInfo(String connectorName) {
+ ObjectMapper mapper = new ObjectMapper();
+ String url = endpointForResource(String.format("connectors/%s",
connectorName));
+ Response response = requestGet(url);
+ try {
+ if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
+ return mapper.readValue(responseToString(response),
ConnectorInfo.class);
+ }
+ } catch (IOException e) {
+ log.error("Could not read connector info from response: {}",
+ responseToString(response), e);
+ throw new ConnectException("Could not not parse connector info",
e);
+ }
+ throw new ConnectRestException(response.getStatus(),
+ "Could not read connector info. Error response: " +
responseToString(response));
+ }
+
+ /**
+ * Get the task configs of a connector running in this cluster.
+
+ * @param connectorName name of the connector
+ * @return a map from task ID (connector name + "-" + task number) to task
config
+ */
+ public Map<String, Map<String, String>> taskConfigs(String connectorName) {
+ ObjectMapper mapper = new ObjectMapper();
+ String url =
endpointForResource(String.format("connectors/%s/tasks-config", connectorName));
+ Response response = requestGet(url);
+ try {
+ if (response.getStatus() <
Response.Status.BAD_REQUEST.getStatusCode()) {
+ // We use String instead of ConnectorTaskId as the key here
since the latter can't be automatically
+ // deserialized by Jackson when used as a JSON object key
(i.e., when it's serialized as a JSON string)
+ return mapper.readValue(responseToString(response), new
TypeReference<Map<String, Map<String, String>>>() { });
+ }
+ } catch (IOException e) {
+ log.error("Could not read task configs from response: {}",
+ responseToString(response), e);
+ throw new ConnectException("Could not not parse task configs", e);
+ }
+ throw new ConnectRestException(response.getStatus(),
+ "Could not read task configs. Error response: " +
responseToString(response));
+ }
+
/**
* Reset the set of active topics of a connector running in this cluster.
*
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
index c026cb72903..d0518566220 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
@@ -37,6 +38,7 @@ import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
/**
* A set of common assertions that can be applied to a Connect cluster during
integration testing
@@ -301,7 +303,8 @@ public class EmbeddedConnectClusterAssertions {
}
/**
- * Assert that a connector is running with at least the given number of
tasks all in running state
+ * Assert that a connector is running, that it has a specific number of
tasks, and that all of
+ * its tasks are in the RUNNING state.
*
* @param connectorName the connector name
* @param numTasks the number of tasks
@@ -326,6 +329,33 @@ public class EmbeddedConnectClusterAssertions {
}
}
+ /**
+ * Assert that a connector is paused, that it has a specific number of
tasks, and that all of
+ * its tasks are in the PAUSED state.
+ *
+ * @param connectorName the connector name
+ * @param numTasks the number of tasks
+ * @param detailMessage the assertion message
+ * @throws InterruptedException
+ */
+ public void assertConnectorAndExactlyNumTasksArePaused(String
connectorName, int numTasks, String detailMessage)
+ throws InterruptedException {
+ try {
+ waitForCondition(
+ () -> checkConnectorState(
+ connectorName,
+ AbstractStatus.State.PAUSED,
+ numTasks,
+ AbstractStatus.State.PAUSED,
+ Integer::equals
+ ).orElse(false),
+ CONNECTOR_SHUTDOWN_DURATION_MS,
+ "The connector or exactly " + numTasks + " tasks are not
paused.");
+ } catch (AssertionError e) {
+ throw new AssertionError(detailMessage, e);
+ }
+ }
+
/**
* Assert that a connector is running, that it has a specific number of
tasks, and that all of
* its tasks are in the FAILED state.
@@ -415,11 +445,11 @@ public class EmbeddedConnectClusterAssertions {
* @param detailMessage the assertion message
* @throws InterruptedException
*/
- public void assertConnectorAndTasksAreStopped(String connectorName, String
detailMessage)
+ public void assertConnectorAndTasksAreNotRunning(String connectorName,
String detailMessage)
throws InterruptedException {
try {
waitForCondition(
- () -> checkConnectorAndTasksAreStopped(connectorName),
+ () -> checkConnectorAndTasksAreNotRunning(connectorName),
CONNECTOR_SETUP_DURATION_MS,
"At least the connector or one of its tasks is still running");
} catch (AssertionError e) {
@@ -433,7 +463,7 @@ public class EmbeddedConnectClusterAssertions {
* @param connectorName the connector
* @return true if the connector and all the tasks are not in RUNNING
state; false otherwise
*/
- protected boolean checkConnectorAndTasksAreStopped(String connectorName) {
+ protected boolean checkConnectorAndTasksAreNotRunning(String
connectorName) {
ConnectorStateInfo info;
try {
info = connect.connectorStatus(connectorName);
@@ -450,6 +480,35 @@ public class EmbeddedConnectClusterAssertions {
&& info.tasks().stream().noneMatch(s ->
s.state().equals(AbstractStatus.State.RUNNING.toString()));
}
+
+ /**
+ * Assert that a connector is in the stopped state and has no tasks.
+ *
+ * @param connectorName the connector name
+ * @param detailMessage the assertion message
+ * @throws InterruptedException
+ */
+ public void assertConnectorIsStopped(String connectorName, String
detailMessage)
+ throws InterruptedException {
+ try {
+ waitForCondition(
+ () -> checkConnectorState(
+ connectorName,
+ AbstractStatus.State.STOPPED,
+ 0,
+ null,
+ Integer::equals
+ ).orElse(false),
+ CONNECTOR_SHUTDOWN_DURATION_MS,
+ "At least the connector or one of its tasks is still
running");
+ // If the connector is truly stopped, we should also see an empty
set of tasks and task configs
+ assertEquals(Collections.emptyList(),
connect.connectorInfo(connectorName).tasks());
+ assertEquals(Collections.emptyMap(),
connect.taskConfigs(connectorName));
+ } catch (AssertionError e) {
+ throw new AssertionError(detailMessage, e);
+ }
+ }
+
/**
* Check whether the given connector state matches the current state of
the connector and
* whether it has at least the given number of tasks, with all the tasks
matching the given
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index e519909b1d0..a659240da41 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -303,7 +303,6 @@ For a detailed description of spotbugs bug categories, see
https://spotbugs.read
<Class name="org.apache.kafka.connect.runtime.WorkerConnector"/>
<Or>
<Method name="doStart"/>
- <Method name="pause"/>
</Or>
<Bug pattern="SF_SWITCH_FALLTHROUGH"/>
</Match>