C0urante commented on code in PR #13424:
URL: https://github.com/apache/kafka/pull/13424#discussion_r1146276523


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -221,28 +223,44 @@ public boolean isRunning() {
     }
 
     @SuppressWarnings("fallthrough")
-    private void pause() {
+    private void stop(boolean paused) {
+        State newState = paused ? State.PAUSED : State.STOPPED;
         try {
-            switch (state) {
-                case STOPPED:
-                    return;
+            if ((state == State.STOPPED || state == State.PAUSED) && state == 
newState) {
+                // Already in the desired state
+                return;
+            }
 
-                case STARTED:
-                    connector.stop();
-                    // fall through
+            if (state == State.STARTED) {
+                connector.stop();
+            }
 
-                case INIT:
-                    statusListener.onPause(connName);
-                    this.state = State.STOPPED;
-                    break;
+            if (state == State.FAILED && newState != State.STOPPED) {
+                throw new IllegalArgumentException("Cannot transition to 
non-stopped state when connector has already failed");
+            }
 
-                default:
-                    throw new IllegalArgumentException("Cannot pause connector 
in state " + state);
+            if (paused) {
+                statusListener.onPause(connName);
+            } else {
+                statusListener.onStop(connName);
             }
+
+            this.state = newState;
         } catch (Throwable t) {
-            log.error("{} Error while shutting down connector", this, t);
-            statusListener.onFailure(connName, t);
-            this.state = State.FAILED;
+            log.error("{} Error while {} connector", this, paused ? "pausing" 
: "stopping", t);
+            if (paused) {
+                statusListener.onFailure(connName, t);
+                this.state = State.FAILED;
+            } else {
+                // We say the connector is STOPPED even if it fails at this 
point
+                this.state = State.STOPPED;
+                // One more try to make sure the status is updated correctly
+                try {
+                    statusListener.onStop(connName);
+                } catch (Throwable t2) {
+                    log.error("{} Error during failover attempt to stop 
connector", this, t2);
+                }

Review Comment:
   Not sure I follow? This line logs any exception that arises during line 259; 
we won't have access to that exception (since it may not exist) if we use a 
`finally` block.
   
   Anyways, after thinking about this part a bit more--I don't like this level 
of paranoia and don't think it's necessary to wrap line 259 in a try/catch (or 
try/finally) block at all. If we can't even update the connector's status, it 
probably makes sense to just fail since there's something seriously wrong with 
the worker at that point. This also follows the [precedent with the existing 
`pause` 
method](https://github.com/apache/kafka/blob/45ecae6a28fe820eb2698c8a375c83ee15036f5c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L244),
 where we wrap things in try/catch but don't wrap the call to 
`statusListener::onFailure` that takes place inside the catch body with its own 
try/catch.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java:
##########
@@ -291,9 +293,21 @@ protected boolean awaitUnpause() throws 
InterruptedException {
 
     public void transitionTo(TargetState state) {
         synchronized (this) {
-            // ignore the state change if we are stopping
-            if (stopping)
+            // Ignore the state change if we are stopping.

Review Comment:
   Yes, it's to minimize the impact of potential bugs. It will technically run 
the risk of masking them since they'll surface as WARN-level logs instead of 
failed tasks in the REST API; I think it's better to take the less drastic 
approach here since users are probably going to get scared if they see failed 
tasks.
   
   I was thinking of [this log 
message](https://github.com/apache/kafka/blob/45ecae6a28fe820eb2698c8a375c83ee15036f5c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L407)
 when considering how to handle this scenario, since it can be emitted 
spuriously under certain circumstances. Although that's not great, the fact 
that the only fallout of being in that unexpected state is a warning log 
message has had some advantages, since it tends not to alarm users (and 
possibly act as a red herring).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to