C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r890817727


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection<String> 
connectors, Collection<Connect
         }
     }
 
+    private boolean isSourceConnector(String connName) {
+        return 
ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName)));
+    }
+
+    private boolean requestNotSignedProperly(InternalRequestSignature 
requestSignature, Callback<?> callback) {
+        if (internalRequestValidationEnabled()) {
+            ConnectRestException requestValidationError = null;
+            if (requestSignature == null) {
+                requestValidationError = new BadRequestException("Internal 
request missing required signature");
+            } else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+                requestValidationError = new BadRequestException(String.format(
+                        "This worker does not support the '%s' key signing 
algorithm used by other workers. "
+                                + "This worker is currently configured to use: 
%s. "
+                                + "Check that all workers' configuration files 
permit the same set of signature algorithms, "
+                                + "and correct any misconfigured worker and 
restart it.",
+                        requestSignature.keyAlgorithm(),
+                        keySignatureVerificationAlgorithms
+                ));
+            } else {
+                if (!requestSignature.isValid(sessionKey)) {
+                    requestValidationError = new ConnectRestException(
+                            Response.Status.FORBIDDEN,
+                            "Internal request contained invalid signature."
+                    );
+                }
+            }
+            if (requestValidationError != null) {
+                callback.onCompletion(requestValidationError, null);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Represents an active zombie fencing: that is, an in-progress attempt to 
invoke
+     * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, 
write a new task count
+     * record to the config topic.
+     */
+    class ZombieFencing {
+        private final String connName;
+        private final int tasksToRecord;
+        private final int taskGen;
+        private final FutureCallback<Void> fencingFollowup;
+        private final KafkaFuture<Void> fencingFuture;
+
+        public ZombieFencing(String connName, int tasksToFence, int 
tasksToRecord, int taskGen) {
+            this.connName = connName;
+            this.tasksToRecord = tasksToRecord;
+            this.taskGen = taskGen;
+            this.fencingFollowup = new FutureCallback<>();
+            this.fencingFuture = worker.fenceZombies(connName, tasksToFence, 
configState.connectorConfig(connName)).thenApply(ignored -> {
+                // This callback will be called on the same thread that 
invokes KafkaFuture::thenApply if
+                // the future is already completed. Since that thread is the 
herder tick thread, we don't need
+                // to perform follow-up logic through an additional herder 
request (and if we tried, it would lead
+                // to deadlock)
+                addOrRunRequest(
+                        this::onZombieFencingSuccess,
+                        fencingFollowup
+                );
+                awaitFollowup();
+                return null;
+            });
+        }
+
+        // Invoked after the worker has successfully fenced out the producers 
of old task generations using an admin client
+        // Note that work here will be performed on the herder's tick thread, 
so it should not block for very long
+        private Void onZombieFencingSuccess() throws TimeoutException {
+            configBackingStore.refresh(1, TimeUnit.MINUTES);
+            configState = configBackingStore.snapshot();
+            if (taskGen < configState.taskConfigGeneration(connName)) {
+                throw new ConnectRestException(
+                    Response.Status.CONFLICT.getStatusCode(),
+                    "Fencing failed because new task configurations were 
generated for the connector");
+            }
+            if (!writeToConfigTopicAsLeader(() -> 
configBackingStore.putTaskCountRecord(connName, tasksToRecord))) {

Review Comment:
   Good point, replaced `configLog.readToEnd().get()` with 
`configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS)`, 
which is used everywhere else in the `KafkaConfigBackingStore` where we read to 
the end of the log to ensure that writes that we just performed have landed. It 
comes with the downside that it makes zombie fencing rounds more frail, but 
that's better than squatting indefinitely on the herder thread.
   
   I also fixed another potential blocking issue around this area by shifting 
the call to `onZombieFencingSuccess` (or rather, the registration of it as a 
follow-up to the future returned by `Worker::fenceZombies`) into a separate 
method that can then be invoked after the `ZombieFencing` object has been 
constructed and the lock on the `DistributedHerder` instance has been 
relinquished.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -547,6 +703,27 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
     }
 
+    private void maybeSendFencably(String key, byte[] value) {
+        if (!usesFencableWriter) {
+            configLog.send(key, value);

Review Comment:
   `sendPossiblyFencibly` (fencably?) does work but it's a bit verbose. Do you 
think `sendPrivileged` works? It refers to the concept inherited by the 
`ConfigBackingStore` interface and its `claimWritePrivileges` method, and the 
write itself is technically privileged in that it should only ever be performed 
by the leader, even if those privileges are only enforced when the backing 
store is configured to use a fencable producer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to