C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r892554812


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -901,6 +947,19 @@ public void onCompletion(Throwable error, 
ConsumerRecord<String, byte[]> record)
                 if (request != null && started) {
                     updateListener.onRestartRequest(request);
                 }
+            } else if (record.key().startsWith(TASK_COUNT_RECORD_PREFIX)) {

Review Comment:
   Yeah, we've definitely reached that point 👍
   
   While doing this decomposition I kept the bodies for each new method 
identical to the `if`/`else if` blocks that they were extracted from, with 
these exceptions:
   - Log messages that include the record key are adjusted to use the connector 
name in its place (this doesn't drop any information)
   - Calls to `Object::getClass` for logging messages are all converted to 
calls to the newly-introduced and null-safe `className` method
   - The `@SuppressWarnings("unchecked")` annotation is removed from method 
signatures and is instead added only to assignments within the method bodies 
that require it



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -901,6 +947,19 @@ public void onCompletion(Throwable error, 
ConsumerRecord<String, byte[]> record)
                 if (request != null && started) {
                     updateListener.onRestartRequest(request);
                 }
+            } else if (record.key().startsWith(TASK_COUNT_RECORD_PREFIX)) {
+                String connectorName = 
record.key().substring(TASK_COUNT_RECORD_PREFIX.length());
+                if (!(value.value() instanceof Map)) {
+                    log.error("Found task count record ({}) in wrong format: 
{}",  record.key(), value.value().getClass());

Review Comment:
   It's implicitly included in the record key but that info is redundant. 
Updated to just use the connector name and make the message clearer



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -2133,6 +2335,166 @@ private void resetActiveTopics(Collection<String> 
connectors, Collection<Connect
         }
     }
 
+    private boolean isSourceConnector(String connName) {
+        return 
ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName)));
+    }
+
+    /**
+     * Checks a given {@link InternalRequestSignature request signature} for 
validity and adds an exception
+     * to the given {@link Callback} if any errors are found.
+     *
+     * @param requestSignature the request signature to validate
+     * @param callback callback to report invalid signature errors to
+     * @return true if the signature was not valid
+     */
+    private boolean requestNotSignedProperly(InternalRequestSignature 
requestSignature, Callback<?> callback) {
+        if (internalRequestValidationEnabled()) {
+            ConnectRestException requestValidationError = null;
+            if (requestSignature == null) {
+                requestValidationError = new BadRequestException("Internal 
request missing required signature");
+            } else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+                requestValidationError = new BadRequestException(String.format(
+                        "This worker does not support the '%s' key signing 
algorithm used by other workers. "
+                                + "This worker is currently configured to use: 
%s. "
+                                + "Check that all workers' configuration files 
permit the same set of signature algorithms, "
+                                + "and correct any misconfigured worker and 
restart it.",
+                        requestSignature.keyAlgorithm(),
+                        keySignatureVerificationAlgorithms
+                ));
+            } else {
+                if (!requestSignature.isValid(sessionKey)) {
+                    requestValidationError = new ConnectRestException(
+                            Response.Status.FORBIDDEN,
+                            "Internal request contained invalid signature."
+                    );
+                }
+            }
+            if (requestValidationError != null) {
+                callback.onCompletion(requestValidationError, null);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Represents an active zombie fencing: that is, an in-progress attempt to 
invoke
+     * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, 
write a new task count
+     * record to the config topic.
+     */
+    class ZombieFencing {
+        private final String connName;
+        private final int tasksToFence;
+        private final int tasksToRecord;
+        private final int taskGen;
+        private final FutureCallback<Void> fencingFollowup;
+        private KafkaFuture<Void> fencingFuture;

Review Comment:
   It's initialized in `start()`, not in the constructor.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -190,6 +196,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     // Similarly collect target state changes (when observed by the config 
storage listener) for handling in the
     // herder's main thread.
     private Set<String> connectorTargetStateChanges = new HashSet<>();
+    private final Map<String, ZombieFencing> activeZombieFencings = new 
HashMap<>();

Review Comment:
   👍 done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to