C0urante commented on code in PR #13424:
URL: https://github.com/apache/kafka/pull/13424#discussion_r1146277661


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -274,6 +274,19 @@ public Response restartConnector(final 
@PathParam("connector") String connector,
         return Response.accepted().entity(stateInfo).build();
     }
 
+    @PUT
+    @Path("/{connector}/stop")
+    @Operation(summary = "Stop the specified connector",
+               description = "This operation is idempotent and has no effects 
if the connector is already stopped")
+    public void stopConnector(

Review Comment:
   Good eye. The difference between pause/resume and stop is that the latter 
may require forwarding to the leader, which (right now) breaks for requests 
that 1) don't return a body and 2) return a 2xx HTTP status that isn't 204.
   
   This is also why requests to, e.g., delete a connector return 204 instead of 
202.
   
   It wouldn't be terribly difficult to fix the `RestClient` class to be able 
to handle empty-body 202 responses correctly, but since I doubt the 
inconsistency between the pause/resume and stop endpoints is going to cause 
serious headaches, I figured we could go with the simpler approach.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -325,6 +325,181 @@ public void 
testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce
         assertTrue("Connector and all tasks were not stopped in time", 
stopCounter.await(1, TimeUnit.MINUTES));
     }
 
+    /**
+     * Verify that the target state (started, paused, stopped) of a connector 
can be updated, with
+     * an emphasis on ensuring that the transitions between each state are 
correct.
+     * <p>
+     * The transitions we need to cover are:
+     * <ol>
+     *     <li>RUNNING -> PAUSED</li>
+     *     <li>RUNNING -> STOPPED</li>
+     *     <li>PAUSED -> RUNNING</li>
+     *     <li>PAUSED -> STOPPED</li>
+     *     <li>STOPPED -> RUNNING</li>
+     *     <li>STOPPED -> PAUSED</li>
+     * </ol>
+     * With some reordering, we can perform each transition just once:
+     * <ul>
+     *     <li>Start with RUNNING</li>
+     *     <li>Transition to STOPPED (2)</li>
+     *     <li>Transition to RUNNING (5)</li>
+     *     <li>Transition to PAUSED (1)</li>
+     *     <li>Transition to STOPPED (4)</li>
+     *     <li>Transition to PAUSED (6)</li>
+     *     <li>Transition to RUNNING (3)</li>
+     * </ul>
+     */
+    @Test
+    public void testPauseStopResume() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
+
+        // Want to make sure to use multiple tasks
+        final int numTasks = 4;
+        Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+        props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks));
+
+        // Start with RUNNING
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not start in time"
+        );
+
+        // Transition to STOPPED
+        connect.stopConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Transition to RUNNING
+        connect.resumeConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not resume in time"
+        );
+
+        // Transition to PAUSED
+        connect.pauseConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.pauseConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector did not pause in time"
+        );
+
+        // Transition to STOPPED
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Transition to PAUSED
+        connect.pauseConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+                CONNECTOR_NAME,
+                0,
+                "Connector did not pause in time"
+        );
+
+        // Transition to RUNNING
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not resume in time"
+        );
+
+        // Delete the connector
+        connect.deleteConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndTasksAreNotRunning(

Review Comment:
   I noticed that too while working on this part of the tests. It does seem 
like a stronger assertion would be useful, but I'd like to pursue that as a 
follow-up since it would probably benefit more than just this test, and we may 
be able to completely get rid of the `assertConnectorAndTasksAreNotRunning` 
method if we can determine that the stronger assertion is always suitable in 
its place.
   
   If that makes sense to you, I can write up a Jira ticket to look into this. 
And (not mutually exclusive), if you think it's better to add the stronger 
assertion variant now, even just to use it in the tests added in this PR and 
nowhere else, I can also do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to