C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r890817431


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1745,6 +1958,23 @@ private boolean checkRebalanceNeeded(Callback<?> 
callback) {
         return false;
     }
 
+    /**
+     * Execute the given action and subsequent callback immediately if the 
current thread is the herder's tick thread,
+     * or use them to create and store a {@link DistributedHerderRequest} on 
the request queue and return the resulting request
+     * if not.
+     * @param action the action that should be run on the herder's tick thread
+     * @param callback the callback that should be invoked once the action is 
complete
+     * @return a new {@link DistributedHerderRequest} if one has been created 
and added to the request queue, and {@code null} otherwise
+     */
+    DistributedHerderRequest addOrRunRequest(Callable<Void> action, 
Callback<Void> callback) {

Review Comment:
   That works, yeah 👍



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection<String> 
connectors, Collection<Connect
         }
     }
 
+    private boolean isSourceConnector(String connName) {
+        return 
ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName)));
+    }
+
+    private boolean requestNotSignedProperly(InternalRequestSignature 
requestSignature, Callback<?> callback) {
+        if (internalRequestValidationEnabled()) {
+            ConnectRestException requestValidationError = null;
+            if (requestSignature == null) {
+                requestValidationError = new BadRequestException("Internal 
request missing required signature");
+            } else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+                requestValidationError = new BadRequestException(String.format(
+                        "This worker does not support the '%s' key signing 
algorithm used by other workers. "
+                                + "This worker is currently configured to use: 
%s. "
+                                + "Check that all workers' configuration files 
permit the same set of signature algorithms, "
+                                + "and correct any misconfigured worker and 
restart it.",
+                        requestSignature.keyAlgorithm(),
+                        keySignatureVerificationAlgorithms
+                ));
+            } else {
+                if (!requestSignature.isValid(sessionKey)) {
+                    requestValidationError = new ConnectRestException(
+                            Response.Status.FORBIDDEN,
+                            "Internal request contained invalid signature."
+                    );
+                }
+            }
+            if (requestValidationError != null) {
+                callback.onCompletion(requestValidationError, null);

Review Comment:
   This follows the same pattern as 
[AbstractHerder::maybeAddConfigErrors](https://github.com/apache/kafka/blob/09570f2540269cc1196c4c69cc7997d035159d1d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L711),
 which accepts a `Callback` but only invokes it on errors. This is useful if 
you'd like to establish some reusable logic that terminates control flow for a 
method and reports an error to a callback if something goes wrong, but 
otherwise allows control flow to continue and possibly fail later.
   
   I'll take a page out of `AbstractHerder::maybeAddConfigErrors`'s book and 
add Javadocs making note of this fact. 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection<String> 
connectors, Collection<Connect
         }
     }
 
+    private boolean isSourceConnector(String connName) {
+        return 
ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName)));
+    }
+
+    private boolean requestNotSignedProperly(InternalRequestSignature 
requestSignature, Callback<?> callback) {
+        if (internalRequestValidationEnabled()) {
+            ConnectRestException requestValidationError = null;
+            if (requestSignature == null) {
+                requestValidationError = new BadRequestException("Internal 
request missing required signature");
+            } else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+                requestValidationError = new BadRequestException(String.format(
+                        "This worker does not support the '%s' key signing 
algorithm used by other workers. "
+                                + "This worker is currently configured to use: 
%s. "
+                                + "Check that all workers' configuration files 
permit the same set of signature algorithms, "
+                                + "and correct any misconfigured worker and 
restart it.",
+                        requestSignature.keyAlgorithm(),
+                        keySignatureVerificationAlgorithms
+                ));
+            } else {
+                if (!requestSignature.isValid(sessionKey)) {
+                    requestValidationError = new ConnectRestException(
+                            Response.Status.FORBIDDEN,
+                            "Internal request contained invalid signature."
+                    );
+                }
+            }
+            if (requestValidationError != null) {
+                callback.onCompletion(requestValidationError, null);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Represents an active zombie fencing: that is, an in-progress attempt to 
invoke
+     * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, 
write a new task count
+     * record to the config topic.
+     */
+    class ZombieFencing {
+        private final String connName;
+        private final int tasksToRecord;
+        private final int taskGen;
+        private final FutureCallback<Void> fencingFollowup;
+        private final KafkaFuture<Void> fencingFuture;
+
+        public ZombieFencing(String connName, int tasksToFence, int 
tasksToRecord, int taskGen) {
+            this.connName = connName;
+            this.tasksToRecord = tasksToRecord;
+            this.taskGen = taskGen;
+            this.fencingFollowup = new FutureCallback<>();
+            this.fencingFuture = worker.fenceZombies(connName, tasksToFence, 
configState.connectorConfig(connName)).thenApply(ignored -> {
+                // This callback will be called on the same thread that 
invokes KafkaFuture::thenApply if
+                // the future is already completed. Since that thread is the 
herder tick thread, we don't need
+                // to perform follow-up logic through an additional herder 
request (and if we tried, it would lead
+                // to deadlock)
+                addOrRunRequest(
+                        this::onZombieFencingSuccess,
+                        fencingFollowup
+                );
+                awaitFollowup();
+                return null;
+            });
+        }
+
+        // Invoked after the worker has successfully fenced out the producers 
of old task generations using an admin client
+        // Note that work here will be performed on the herder's tick thread, 
so it should not block for very long
+        private Void onZombieFencingSuccess() throws TimeoutException {
+            configBackingStore.refresh(1, TimeUnit.MINUTES);

Review Comment:
   It seemed reasonable considering how vital being able to reach the config 
topic is to the health of a Connect worker, and that the penalty for failure 
here is that a task will fail to start. But given the existing 
`workerSyncTimeoutMs` field and its use, it seems better to just follow that 
precedent and use that value to dictate how long we're willing to wait to reach 
the end of the config topic in most cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to