yashmayya commented on code in PR #13424: URL: https://github.com/apache/kafka/pull/13424#discussion_r1144593645
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java: ########## @@ -245,6 +245,14 @@ default void validateConnectorConfig(Map<String, String> connectorConfig, Callba */ void restartConnectorAndTasks(RestartRequest request, Callback<ConnectorStateInfo> cb); + /** + * Stop the conector. This call will asynchronously suspend processing by the connector and all + * of its tasks. Review Comment: > This call will asynchronously suspend processing by the connector and all of its tasks. This is the same as the description for `Herder::pauseConnector`. It might be worth briefly outlining the difference between the two (tasks stop etc.)? ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -325,6 +325,181 @@ public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce assertTrue("Connector and all tasks were not stopped in time", stopCounter.await(1, TimeUnit.MINUTES)); } + /** + * Verify that the target state (started, paused, stopped) of a connector can be updated, with + * an emphasis on ensuring that the transitions between each state are correct. + * <p> + * The transitions we need to cover are: + * <ol> + * <li>RUNNING -> PAUSED</li> + * <li>RUNNING -> STOPPED</li> + * <li>PAUSED -> RUNNING</li> + * <li>PAUSED -> STOPPED</li> + * <li>STOPPED -> RUNNING</li> + * <li>STOPPED -> PAUSED</li> + * </ol> + * With some reordering, we can perform each transition just once: + * <ul> + * <li>Start with RUNNING</li> + * <li>Transition to STOPPED (2)</li> + * <li>Transition to RUNNING (5)</li> + * <li>Transition to PAUSED (1)</li> + * <li>Transition to STOPPED (4)</li> + * <li>Transition to PAUSED (6)</li> + * <li>Transition to RUNNING (3)</li> + * </ul> Review Comment: Nice! ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java: ########## @@ -291,9 +293,21 @@ protected boolean awaitUnpause() throws InterruptedException { public void transitionTo(TargetState state) { synchronized (this) { - // ignore the state change if we are stopping - if (stopping) + // Ignore the state change if we are stopping. Review Comment: Is there any particular reason we're trying to handle a case that shouldn't arise here in the `WorkerTask`? I'm just curious; is it to minimize the impact of potential future bugs in implementation? If so, wouldn't this just help mask the hypothetical bug for longer? ########## gradle/spotbugs-exclude.xml: ########## @@ -303,7 +303,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read <Class name="org.apache.kafka.connect.runtime.WorkerConnector"/> <Or> <Method name="doStart"/> - <Method name="pause"/> Review Comment: The comment above seems to indicate that this is for switch statement fallthrough. Your changes refactored it to use if else statements instead, so I guess this can be removed? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ########## @@ -221,28 +223,44 @@ public boolean isRunning() { } @SuppressWarnings("fallthrough") Review Comment: ```suggestion ``` I guess this can be removed now? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ########## @@ -221,28 +223,44 @@ public boolean isRunning() { } @SuppressWarnings("fallthrough") - private void pause() { + private void stop(boolean paused) { + State newState = paused ? State.PAUSED : State.STOPPED; try { - switch (state) { - case STOPPED: - return; + if ((state == State.STOPPED || state == State.PAUSED) && state == newState) { + // Already in the desired state + return; + } - case STARTED: - connector.stop(); - // fall through + if (state == State.STARTED) { + connector.stop(); + } - case INIT: - statusListener.onPause(connName); - this.state = State.STOPPED; - break; + if (state == State.FAILED && newState != State.STOPPED) { + throw new IllegalArgumentException("Cannot transition to non-stopped state when connector has already failed"); + } - default: - throw new IllegalArgumentException("Cannot pause connector in state " + state); + if (paused) { + statusListener.onPause(connName); + } else { + statusListener.onStop(connName); } + + this.state = newState; } catch (Throwable t) { - log.error("{} Error while shutting down connector", this, t); - statusListener.onFailure(connName, t); - this.state = State.FAILED; + log.error("{} Error while {} connector", this, paused ? "pausing" : "stopping", t); + if (paused) { + statusListener.onFailure(connName, t); + this.state = State.FAILED; + } else { + // We say the connector is STOPPED even if it fails at this point + this.state = State.STOPPED; + // One more try to make sure the status is updated correctly + try { + statusListener.onStop(connName); + } catch (Throwable t2) { + log.error("{} Error during failover attempt to stop connector", this, t2); + } Review Comment: Can we use a `finally` block instead here? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -1090,6 +1090,39 @@ public void putConnectorConfig(final String connName, final Map<String, String> ); } + @Override + public void stopConnector(final String connName, final Callback<Void> callback) { + log.trace("Submitting request to transition connector {} to STOPPED state", connName); + + addRequest( + () -> { + refreshConfigSnapshot(workerSyncTimeoutMs); Review Comment: Why do we need to read to the end of the config topic here? Shouldn't the regular reads in the tick loop be good enough? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ########## @@ -274,6 +274,19 @@ public Response restartConnector(final @PathParam("connector") String connector, return Response.accepted().entity(stateInfo).build(); } + @PUT + @Path("/{connector}/stop") + @Operation(summary = "Stop the specified connector", + description = "This operation is idempotent and has no effects if the connector is already stopped") + public void stopConnector( Review Comment: This will lead to a `204 No Content` response (assuming no exceptions thrown). It might be more appropriate to return a `202 Accepted` response similar to the existing pause and resume APIs? ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java: ########## @@ -450,6 +480,35 @@ protected boolean checkConnectorAndTasksAreStopped(String connectorName) { && info.tasks().stream().noneMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString())); } + + /** + * Assert that a connector is in the stopped state and has no tasks. + * + * @param connectorName the connector name + * @param detailMessage the assertion message + * @throws InterruptedException + */ + public void assertConnectorIsStopped(String connectorName, String detailMessage) Review Comment: nit: `detailMessage` is always `"Connector did not stop in time"`. Do we really need it as a method parameter? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java: ########## @@ -424,6 +435,9 @@ private void updateConnectorTasks(String connName) { if (!worker.isRunning(connName)) { log.info("Skipping update of connector {} since it is not running", connName); return; + } else if (configState.targetState(connName) != TargetState.STARTED) { + log.info("Skipping update of connector {} since its target state is {}", connName, configState.targetState(connName)); Review Comment: ```suggestion log.info("Skipping update of connector tasks for connector {} since its target state is {}", connName, configState.targetState(connName)); ``` nit: `Skipping update of connector` sounds ambiguous ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java: ########## @@ -553,6 +570,54 @@ public ActiveTopicsInfo connectorTopics(String connectorName) { "Could not read connector state. Error response: " + responseToString(response)); } + /** + * Get the info of a connector running in this cluster (retrieved via the <code>GET /connectors/{connector}</code> endpoint). + + * @param connectorName name of the connector + * @return an instance of {@link ConnectorInfo} populated with state information of the connector and its tasks. + */ + public ConnectorInfo connectorInfo(String connectorName) { + ObjectMapper mapper = new ObjectMapper(); + String url = endpointForResource(String.format("connectors/%s", connectorName)); + Response response = requestGet(url); + try { + if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { + return mapper.readValue(responseToString(response), ConnectorInfo.class); + } + } catch (IOException e) { + log.error("Could not read connector info from response: {}", + responseToString(response), e); + throw new ConnectException("Could not not parse connector info", e); + } + throw new ConnectRestException(response.getStatus(), + "Could not read connector info. Error response: " + responseToString(response)); + } + + /** + * Get the task configs of a connector running in this cluster. + + * @param connectorName name of the connector + * @return a map from task ID (connector name + "-" + task number) to task config + */ + public Map<String, Map<String, String>> taskConfigs(String connectorName) { + ObjectMapper mapper = new ObjectMapper(); + String url = endpointForResource(String.format("connectors/%s/tasks-config", connectorName)); Review Comment: I'm a little confused as to why we have a `GET /connectors/{connector}/tasks` as well as a `GET /connectors/{connector}/tasks-config` API? Looks like the only difference between them is that the former returns "raw" task configurations (i.e. before externalized secrets are replaced using config transformers) - in which case the names of the two APIs don't really help distinguish between them much (same with the Herder methods - `Herder::taskConfigs` and `Herder.tasksConfig`). ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -325,6 +325,181 @@ public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce assertTrue("Connector and all tasks were not stopped in time", stopCounter.await(1, TimeUnit.MINUTES)); } + /** + * Verify that the target state (started, paused, stopped) of a connector can be updated, with + * an emphasis on ensuring that the transitions between each state are correct. + * <p> + * The transitions we need to cover are: + * <ol> + * <li>RUNNING -> PAUSED</li> + * <li>RUNNING -> STOPPED</li> + * <li>PAUSED -> RUNNING</li> + * <li>PAUSED -> STOPPED</li> + * <li>STOPPED -> RUNNING</li> + * <li>STOPPED -> PAUSED</li> + * </ol> + * With some reordering, we can perform each transition just once: + * <ul> + * <li>Start with RUNNING</li> + * <li>Transition to STOPPED (2)</li> + * <li>Transition to RUNNING (5)</li> + * <li>Transition to PAUSED (1)</li> + * <li>Transition to STOPPED (4)</li> + * <li>Transition to PAUSED (6)</li> + * <li>Transition to RUNNING (3)</li> + * </ul> + */ + @Test + public void testPauseStopResume() throws Exception { + connect = connectBuilder.build(); + // start the clusters + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, + "Initial group of workers did not start in time."); + + // Want to make sure to use multiple tasks + final int numTasks = 4; + Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME); + props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks)); + + // Start with RUNNING + connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "Connector tasks did not start in time" + ); + + // Transition to STOPPED + connect.stopConnector(CONNECTOR_NAME); + // Issue a second request to ensure that this operation is idempotent + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Transition to RUNNING + connect.resumeConnector(CONNECTOR_NAME); + // Issue a second request to ensure that this operation is idempotent + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "Connector tasks did not resume in time" + ); + + // Transition to PAUSED + connect.pauseConnector(CONNECTOR_NAME); + // Issue a second request to ensure that this operation is idempotent + connect.pauseConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksArePaused( + CONNECTOR_NAME, + numTasks, + "Connector did not pause in time" + ); + + // Transition to STOPPED + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Transition to PAUSED + connect.pauseConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksArePaused( + CONNECTOR_NAME, + 0, + "Connector did not pause in time" + ); + + // Transition to RUNNING + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "Connector tasks did not resume in time" + ); + + // Delete the connector + connect.deleteConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndTasksAreNotRunning( Review Comment: `assertConnectorAndTasksAreNotRunning` asserts whether the status endpoint returns a 404 response **OR** the connector and tasks are all have a non-running status. Should we add a stronger assertion for deletion (perhaps just checking if the status endpoint returns a 404 response)? ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -325,6 +325,181 @@ public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce assertTrue("Connector and all tasks were not stopped in time", stopCounter.await(1, TimeUnit.MINUTES)); } + /** + * Verify that the target state (started, paused, stopped) of a connector can be updated, with + * an emphasis on ensuring that the transitions between each state are correct. + * <p> + * The transitions we need to cover are: + * <ol> + * <li>RUNNING -> PAUSED</li> + * <li>RUNNING -> STOPPED</li> + * <li>PAUSED -> RUNNING</li> + * <li>PAUSED -> STOPPED</li> + * <li>STOPPED -> RUNNING</li> + * <li>STOPPED -> PAUSED</li> + * </ol> + * With some reordering, we can perform each transition just once: + * <ul> + * <li>Start with RUNNING</li> + * <li>Transition to STOPPED (2)</li> + * <li>Transition to RUNNING (5)</li> + * <li>Transition to PAUSED (1)</li> + * <li>Transition to STOPPED (4)</li> + * <li>Transition to PAUSED (6)</li> + * <li>Transition to RUNNING (3)</li> + * </ul> + */ + @Test + public void testPauseStopResume() throws Exception { + connect = connectBuilder.build(); + // start the clusters + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, + "Initial group of workers did not start in time."); + + // Want to make sure to use multiple tasks + final int numTasks = 4; + Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME); + props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks)); Review Comment: > Want to make sure to use multiple tasks `tasks.max` is already set to 4 in `defaultSourceConnectorProps` though? Or did you mean something like even if we wanted to change the default max tasks for other tests in this class, this one shouldn't be touched? ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -325,6 +325,181 @@ public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce assertTrue("Connector and all tasks were not stopped in time", stopCounter.await(1, TimeUnit.MINUTES)); } + /** + * Verify that the target state (started, paused, stopped) of a connector can be updated, with + * an emphasis on ensuring that the transitions between each state are correct. + * <p> + * The transitions we need to cover are: + * <ol> + * <li>RUNNING -> PAUSED</li> + * <li>RUNNING -> STOPPED</li> + * <li>PAUSED -> RUNNING</li> + * <li>PAUSED -> STOPPED</li> + * <li>STOPPED -> RUNNING</li> + * <li>STOPPED -> PAUSED</li> + * </ol> + * With some reordering, we can perform each transition just once: + * <ul> + * <li>Start with RUNNING</li> + * <li>Transition to STOPPED (2)</li> + * <li>Transition to RUNNING (5)</li> + * <li>Transition to PAUSED (1)</li> + * <li>Transition to STOPPED (4)</li> + * <li>Transition to PAUSED (6)</li> + * <li>Transition to RUNNING (3)</li> + * </ul> + */ + @Test + public void testPauseStopResume() throws Exception { + connect = connectBuilder.build(); + // start the clusters + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, + "Initial group of workers did not start in time."); + + // Want to make sure to use multiple tasks + final int numTasks = 4; + Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME); + props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks)); + + // Start with RUNNING + connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "Connector tasks did not start in time" + ); + + // Transition to STOPPED + connect.stopConnector(CONNECTOR_NAME); + // Issue a second request to ensure that this operation is idempotent + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Transition to RUNNING + connect.resumeConnector(CONNECTOR_NAME); + // Issue a second request to ensure that this operation is idempotent + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "Connector tasks did not resume in time" + ); + + // Transition to PAUSED + connect.pauseConnector(CONNECTOR_NAME); + // Issue a second request to ensure that this operation is idempotent + connect.pauseConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksArePaused( + CONNECTOR_NAME, + numTasks, + "Connector did not pause in time" + ); + + // Transition to STOPPED + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Transition to PAUSED + connect.pauseConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksArePaused( + CONNECTOR_NAME, + 0, + "Connector did not pause in time" + ); + + // Transition to RUNNING + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + numTasks, + "Connector tasks did not resume in time" + ); + + // Delete the connector + connect.deleteConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndTasksAreNotRunning( + CONNECTOR_NAME, + "Connector tasks were not destroyed in time" + ); + } + + /** + * Test out the {@code STOPPED} state introduced in + * <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Newtargetstate:STOPPED">KIP-875</a>, + * with an emphasis on correctly handling errors thrown from the connector. + */ + @Test + public void testStoppedState() throws Exception { + connect = connectBuilder.build(); + // start the clusters + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, + "Initial group of workers did not start in time."); + + Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME); + // Fail the connector on startup + props.put("connector.start.inject.error", "true"); + + // Start the connector (should fail immediately and generate no tasks) + connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorIsFailedAndTasksHaveFailed( + CONNECTOR_NAME, + 0, + "Connector should have failed and not generated any tasks" + ); + + // Stopping a failed connector updates its state to STOPPED in the REST API + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Can resume a connector after its Connector has failed before shutdown after receiving a stop request + props.remove("connector.start.inject.error"); + connect.configureConnector(CONNECTOR_NAME, props); + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + NUM_TASKS, + "Connector or tasks did not start running healthily in time" + ); + + // Fail the connector on shutdown + props.put("connector.stop.inject.error", "true"); + // Stopping a connector that fails during shutdown after receiving a stop request updates its state to STOPPED in the REST API + connect.configureConnector(CONNECTOR_NAME, props); + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped( + CONNECTOR_NAME, + "Connector did not stop in time" + ); + + // Can resume a connector after is Connector has failed during shutdown after receiving a stop request Review Comment: ```suggestion // Can resume a connector after its Connector has failed during shutdown after receiving a stop request ``` Did you mean to differentiate between a logical "connector" (Connector + Tasks) and a "Connector" instance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org