C0urante commented on code in PR #13424: URL: https://github.com/apache/kafka/pull/13424#discussion_r1146276523
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java: ########## @@ -221,28 +223,44 @@ public boolean isRunning() { } @SuppressWarnings("fallthrough") - private void pause() { + private void stop(boolean paused) { + State newState = paused ? State.PAUSED : State.STOPPED; try { - switch (state) { - case STOPPED: - return; + if ((state == State.STOPPED || state == State.PAUSED) && state == newState) { + // Already in the desired state + return; + } - case STARTED: - connector.stop(); - // fall through + if (state == State.STARTED) { + connector.stop(); + } - case INIT: - statusListener.onPause(connName); - this.state = State.STOPPED; - break; + if (state == State.FAILED && newState != State.STOPPED) { + throw new IllegalArgumentException("Cannot transition to non-stopped state when connector has already failed"); + } - default: - throw new IllegalArgumentException("Cannot pause connector in state " + state); + if (paused) { + statusListener.onPause(connName); + } else { + statusListener.onStop(connName); } + + this.state = newState; } catch (Throwable t) { - log.error("{} Error while shutting down connector", this, t); - statusListener.onFailure(connName, t); - this.state = State.FAILED; + log.error("{} Error while {} connector", this, paused ? "pausing" : "stopping", t); + if (paused) { + statusListener.onFailure(connName, t); + this.state = State.FAILED; + } else { + // We say the connector is STOPPED even if it fails at this point + this.state = State.STOPPED; + // One more try to make sure the status is updated correctly + try { + statusListener.onStop(connName); + } catch (Throwable t2) { + log.error("{} Error during failover attempt to stop connector", this, t2); + } Review Comment: Not sure I follow? This line logs any exception that arises during line 259; we won't have access to that exception (since it may not exist) if we use a `finally` block. Anyways, after thinking about this part a bit more--I don't like this level of paranoia and don't think it's necessary to wrap line 259 in a try/catch (or try/finally) block at all. If we can't even update the connector's status, it probably makes sense to just fail since there's something seriously wrong with the worker at that point. This also follows the [precedent with the existing `pause` method](https://github.com/apache/kafka/blob/45ecae6a28fe820eb2698c8a375c83ee15036f5c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L244), where we wrap things in try/catch but don't wrap the call to `statusListener::onFailure` that takes place inside the catch body with its own try/catch. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java: ########## @@ -291,9 +293,21 @@ protected boolean awaitUnpause() throws InterruptedException { public void transitionTo(TargetState state) { synchronized (this) { - // ignore the state change if we are stopping - if (stopping) + // Ignore the state change if we are stopping. Review Comment: Yes, it's to minimize the impact of potential bugs. It will technically run the risk of masking them since they'll surface as WARN-level logs instead of failed tasks in the REST API; I think it's better to take the less drastic approach here since users are probably going to get scared if they see failed tasks. I was thinking of [this log message](https://github.com/apache/kafka/blob/45ecae6a28fe820eb2698c8a375c83ee15036f5c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L407) when considering how to handle this scenario, since it can be emitted spuriously under certain circumstances. Although that's not great, the fact that the only fallout of being in that unexpected state is a warning log message has had some advantages, since it tends not to alarm users (and possibly act as a red herring). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org