C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r890817572


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection<String> 
connectors, Collection<Connect
         }
     }
 
+    private boolean isSourceConnector(String connName) {
+        return 
ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName)));
+    }
+
+    private boolean requestNotSignedProperly(InternalRequestSignature 
requestSignature, Callback<?> callback) {
+        if (internalRequestValidationEnabled()) {
+            ConnectRestException requestValidationError = null;
+            if (requestSignature == null) {
+                requestValidationError = new BadRequestException("Internal 
request missing required signature");
+            } else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+                requestValidationError = new BadRequestException(String.format(
+                        "This worker does not support the '%s' key signing 
algorithm used by other workers. "
+                                + "This worker is currently configured to use: 
%s. "
+                                + "Check that all workers' configuration files 
permit the same set of signature algorithms, "
+                                + "and correct any misconfigured worker and 
restart it.",
+                        requestSignature.keyAlgorithm(),
+                        keySignatureVerificationAlgorithms
+                ));
+            } else {
+                if (!requestSignature.isValid(sessionKey)) {
+                    requestValidationError = new ConnectRestException(
+                            Response.Status.FORBIDDEN,
+                            "Internal request contained invalid signature."
+                    );
+                }
+            }
+            if (requestValidationError != null) {
+                callback.onCompletion(requestValidationError, null);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Represents an active zombie fencing: that is, an in-progress attempt to 
invoke
+     * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, 
write a new task count
+     * record to the config topic.
+     */
+    class ZombieFencing {
+        private final String connName;
+        private final int tasksToRecord;
+        private final int taskGen;
+        private final FutureCallback<Void> fencingFollowup;
+        private final KafkaFuture<Void> fencingFuture;
+
+        public ZombieFencing(String connName, int tasksToFence, int 
tasksToRecord, int taskGen) {
+            this.connName = connName;
+            this.tasksToRecord = tasksToRecord;
+            this.taskGen = taskGen;
+            this.fencingFollowup = new FutureCallback<>();
+            this.fencingFuture = worker.fenceZombies(connName, tasksToFence, 
configState.connectorConfig(connName)).thenApply(ignored -> {
+                // This callback will be called on the same thread that 
invokes KafkaFuture::thenApply if
+                // the future is already completed. Since that thread is the 
herder tick thread, we don't need
+                // to perform follow-up logic through an additional herder 
request (and if we tried, it would lead
+                // to deadlock)
+                addOrRunRequest(
+                        this::onZombieFencingSuccess,
+                        fencingFollowup
+                );
+                awaitFollowup();
+                return null;
+            });
+        }
+
+        // Invoked after the worker has successfully fenced out the producers 
of old task generations using an admin client
+        // Note that work here will be performed on the herder's tick thread, 
so it should not block for very long
+        private Void onZombieFencingSuccess() throws TimeoutException {
+            configBackingStore.refresh(1, TimeUnit.MINUTES);
+            configState = configBackingStore.snapshot();
+            if (taskGen < configState.taskConfigGeneration(connName)) {
+                throw new ConnectRestException(
+                    Response.Status.CONFLICT.getStatusCode(),
+                    "Fencing failed because new task configurations were 
generated for the connector");
+            }
+            if (!writeToConfigTopicAsLeader(() -> 
configBackingStore.putTaskCountRecord(connName, tasksToRecord))) {
+                throw new ConnectException("Failed to write connector task 
count record to config topic since worker was fenced out");
+            }

Review Comment:
   I pushed a change to https://github.com/apache/kafka/pull/11778 that 
basically does this; will rebase and update the new config topic writes 
introduced in this PR accordingly. One noteworthy difference now is that the 
exception message is always the same regardless of which operation failed; I 
tried to make it generic and user-friendly enough to work with that, but if 
that doesn't work well enough, we can add a message parameter to this method 
and use it as part of the message for the exception that gets thrown on 
failure. BTW, it might be more helpful to leave comments about this topic on 
that PR, but I'll do my best to handle them either way.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -836,7 +883,9 @@ public void deleteConnectorConfig(final String connName, 
final Callback<Created<
                     callback.onCompletion(new NotFoundException("Connector " + 
connName + " not found"), null);
                 } else {
                     log.trace("Removing connector config {} {}", connName, 
configState.connectors());
-                    configBackingStore.removeConnectorConfig(connName);
+                    if (!writeToConfigTopicAsLeader(() -> 
configBackingStore.removeConnectorConfig(connName))) {
+                        throw new ConnectException("Failed to remove connector 
configuration from config topic since worker was fenced out");
+                    }
                     callback.onCompletion(null, new Created<>(false, null));

Review Comment:
   Yes, although the internal API for this is a little convoluted:
   - `addRequest` accepts an action (a `Callable<Void>`) and a callback (a 
`Callback<Void>`)
   - When requests submitted to `addRequest` are run, the callback is always 
invoked after they complete; if they throw an exception, it's invoked with that 
exception, and if they don't, it's invoked with `null` for both parameters
   - The callback we pass to `addRequest` here is the result of wrapping the 
callback given to the `deleteConnectorConfig` method in the 
`forwardErrorCallback` method, which causes it to be invoked if and only if an 
exception is thrown when the request is run
   - As a result, if we throw any exceptions from the action that we pass to 
`addRequest`, they're guaranteed to be passed to the callback supplied to 
`deleteConnectorConfig`
   
   Although I think it's cleaner to throw exceptions instead of invoking 
`Callback::onCompletion` with an exception and then doing a `return null`, for 
consistency's sake, it's probably better to do the former, since that's the 
existing pattern. I'll address this first in 
https://github.com/apache/kafka/pull/11778 and then add it here in the 
subsequent rebase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to