tombentley commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r892122006


##########
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##########
@@ -45,21 +49,40 @@
 public class JaasBasicAuthFilter implements ContainerRequestFilter {
 
     private static final Logger log = 
LoggerFactory.getLogger(JaasBasicAuthFilter.class);
-    private static final Pattern TASK_REQUEST_PATTERN = 
Pattern.compile("/?connectors/([^/]+)/tasks/?");
+    private static final Set<RequestMatcher> INTERNAL_REQUEST_MATCHERS = new 
HashSet<>(Arrays.asList(
+            new RequestMatcher("/?connectors/([^/]+)/tasks/?", 
HttpMethod.POST),
+            new RequestMatcher("/?connectors/[^/]+/fence/?", HttpMethod.PUT)
+    ));
     private static final String CONNECT_LOGIN_MODULE = "KafkaConnect";
 
     static final String AUTHORIZATION = "Authorization";
 
     // Package-private for testing
     final Configuration configuration;
 
+    private static class RequestMatcher implements 
Predicate<ContainerRequestContext> {
+        private final Pattern path;
+        private final String method;
+
+        public RequestMatcher(String path, String method) {

Review Comment:
   Trivial point, but swapping the order of these parameters would match the 
order that they're used in `test()`, and, at the constructor call site, the 
`method, path` ordering matches how these things appear in an actual HTTP 
request.



##########
connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java:
##########
@@ -139,17 +141,26 @@ public void testNoFileOption() throws IOException {
         jaasBasicAuthFilter.filter(requestContext);
 
         verify(requestContext).abortWith(any(Response.class));
-        verify(requestContext).getMethod();
+        verify(requestContext, atLeastOnce()).getMethod();
         
verify(requestContext).getHeaderString(JaasBasicAuthFilter.AUTHORIZATION);
     }
 
     @Test
-    public void testPostWithoutAppropriateCredential() throws IOException {
+    public void testInternalTaskConfigEndpointSkipped() throws IOException {
+        testInternalEndpointSkipped("connectors/connName/tasks", 
HttpMethod.POST);
+    }
+
+    @Test
+    public void testInternalZombieFencingEndpointSkipped() throws IOException {
+        testInternalEndpointSkipped("connectors/connName/fence", 
HttpMethod.PUT);
+    }
+
+    private void testInternalEndpointSkipped(String endpoint, String method) 
throws IOException {

Review Comment:
   I guess we could also swap there parameter order here too?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -901,6 +947,19 @@ public void onCompletion(Throwable error, 
ConsumerRecord<String, byte[]> record)
                 if (request != null && started) {
                     updateListener.onRestartRequest(request);
                 }
+            } else if (record.key().startsWith(TASK_COUNT_RECORD_PREFIX)) {
+                String connectorName = 
record.key().substring(TASK_COUNT_RECORD_PREFIX.length());
+                if (!(value.value() instanceof Map)) {
+                    log.error("Found task count record ({}) in wrong format: 
{}",  record.key(), value.value().getClass());

Review Comment:
   We should include the connector name, I think.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -901,6 +947,19 @@ public void onCompletion(Throwable error, 
ConsumerRecord<String, byte[]> record)
                 if (request != null && started) {
                     updateListener.onRestartRequest(request);
                 }
+            } else if (record.key().startsWith(TASK_COUNT_RECORD_PREFIX)) {

Review Comment:
   We're adding this `else if` clause to a method that's now ~250 lines long. I 
think we can factor the block of each `if` and `else if` into its own method.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -201,6 +214,9 @@ public static String COMMIT_TASKS_KEY(String connectorName) 
{
     public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct()
             .field("state", Schema.STRING_SCHEMA)
             .build();
+    public static final Schema TASK_COUNT_RECORD_V0 = SchemaBuilder.struct()
+            .field("tasks", Schema.INT32_SCHEMA)

Review Comment:
   I wonder whether the key format should _assume_ that a count is involved, or 
whether it should be named for the purpose to which it's being put (zombie 
fencing). e.g. maybe `tasks-fencing-<connector>` is a better key, with 
`task_count` at the field name for this V0 schema which happens to use just the 
count as the implementation?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -2133,6 +2335,166 @@ private void resetActiveTopics(Collection<String> 
connectors, Collection<Connect
         }
     }
 
+    private boolean isSourceConnector(String connName) {
+        return 
ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName)));
+    }
+
+    /**
+     * Checks a given {@link InternalRequestSignature request signature} for 
validity and adds an exception
+     * to the given {@link Callback} if any errors are found.
+     *
+     * @param requestSignature the request signature to validate
+     * @param callback callback to report invalid signature errors to
+     * @return true if the signature was not valid
+     */
+    private boolean requestNotSignedProperly(InternalRequestSignature 
requestSignature, Callback<?> callback) {
+        if (internalRequestValidationEnabled()) {
+            ConnectRestException requestValidationError = null;
+            if (requestSignature == null) {
+                requestValidationError = new BadRequestException("Internal 
request missing required signature");
+            } else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+                requestValidationError = new BadRequestException(String.format(
+                        "This worker does not support the '%s' key signing 
algorithm used by other workers. "
+                                + "This worker is currently configured to use: 
%s. "
+                                + "Check that all workers' configuration files 
permit the same set of signature algorithms, "
+                                + "and correct any misconfigured worker and 
restart it.",
+                        requestSignature.keyAlgorithm(),
+                        keySignatureVerificationAlgorithms
+                ));
+            } else {
+                if (!requestSignature.isValid(sessionKey)) {
+                    requestValidationError = new ConnectRestException(
+                            Response.Status.FORBIDDEN,
+                            "Internal request contained invalid signature."
+                    );
+                }
+            }
+            if (requestValidationError != null) {
+                callback.onCompletion(requestValidationError, null);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Represents an active zombie fencing: that is, an in-progress attempt to 
invoke
+     * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, 
write a new task count
+     * record to the config topic.
+     */
+    class ZombieFencing {
+        private final String connName;
+        private final int tasksToFence;
+        private final int tasksToRecord;
+        private final int taskGen;
+        private final FutureCallback<Void> fencingFollowup;
+        private KafkaFuture<Void> fencingFuture;

Review Comment:
   This can be final too, I think



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1140,6 +1138,113 @@ else if (!configState.contains(connName))
         );
     }
 
+    // Another worker has forwarded a request to this worker (which it 
believes is the leader) to perform a round of zombie fencing
+    @Override
+    public void fenceZombieSourceTasks(final String connName, final 
Callback<Void> callback, InternalRequestSignature requestSignature) {
+        log.trace("Submitting zombie fencing request {}", connName);
+        if (requestNotSignedProperly(requestSignature, callback)) {
+            return;
+        }
+
+        fenceZombieSourceTasks(connName, callback);
+    }
+
+    // A task on this worker requires a round of zombie fencing
+    void fenceZombieSourceTasks(final ConnectorTaskId id, Callback<Void> 
callback) {
+        log.trace("Performing preflight zombie check for task {}", id);
+        fenceZombieSourceTasks(id.connector(), (error, ignored) -> {
+            if (error == null) {
+                callback.onCompletion(null, null);
+            } else if (error instanceof NotLeaderException) {
+                String forwardedUrl = ((NotLeaderException) 
error).forwardUrl() + "connectors/" + id.connector() + "/fence";
+                log.trace("Forwarding zombie fencing request for connector {} 
to leader at {}", id.connector(), forwardedUrl);
+                forwardRequestExecutor.execute(() -> {
+                    try {
+                        RestClient.httpRequest(forwardedUrl, "PUT", null, 
null, null, config, sessionKey, requestSignatureAlgorithm);
+                        callback.onCompletion(null, null);
+                    } catch (Throwable t) {
+                        callback.onCompletion(t, null);
+                    }
+                });
+            } else {
+                error = ConnectUtils.maybeWrap(error, "Failed to perform 
zombie fencing");
+                callback.onCompletion(error, null);
+            }
+        });
+    }
+
+    // Visible for testing
+    void fenceZombieSourceTasks(final String connName, final Callback<Void> 
callback) {
+        addRequest(
+                () -> {
+                    log.trace("Performing zombie fencing request {}", 
connName);

Review Comment:
   ```suggestion
                       log.trace("Performing zombie fencing request for 
connector {}", connName);
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection<String> 
connectors, Collection<Connect
         }
     }
 
+    private boolean isSourceConnector(String connName) {
+        return 
ConnectorType.SOURCE.equals(connectorTypeForConfig(configState.connectorConfig(connName)));
+    }
+
+    private boolean requestNotSignedProperly(InternalRequestSignature 
requestSignature, Callback<?> callback) {
+        if (internalRequestValidationEnabled()) {
+            ConnectRestException requestValidationError = null;
+            if (requestSignature == null) {
+                requestValidationError = new BadRequestException("Internal 
request missing required signature");
+            } else if 
(!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) 
{
+                requestValidationError = new BadRequestException(String.format(
+                        "This worker does not support the '%s' key signing 
algorithm used by other workers. "
+                                + "This worker is currently configured to use: 
%s. "
+                                + "Check that all workers' configuration files 
permit the same set of signature algorithms, "
+                                + "and correct any misconfigured worker and 
restart it.",
+                        requestSignature.keyAlgorithm(),
+                        keySignatureVerificationAlgorithms
+                ));
+            } else {
+                if (!requestSignature.isValid(sessionKey)) {
+                    requestValidationError = new ConnectRestException(
+                            Response.Status.FORBIDDEN,
+                            "Internal request contained invalid signature."
+                    );
+                }
+            }
+            if (requestValidationError != null) {
+                callback.onCompletion(requestValidationError, null);

Review Comment:
   Thanks for the explanation and the Javadoc. 
   



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -190,6 +196,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     // Similarly collect target state changes (when observed by the config 
storage listener) for handling in the
     // herder's main thread.
     private Set<String> connectorTargetStateChanges = new HashSet<>();
+    private final Map<String, ZombieFencing> activeZombieFencings = new 
HashMap<>();

Review Comment:
   Can we document that access is protected by this object's monitor.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+public class LoaderSwap implements AutoCloseable {

Review Comment:
   ```suggestion
   /**
    * Helper for having {@code Plugins} use a given classloader within a 
try-with-resources statement.
    * See {@link Plugins#withClassLoader(ClassLoader)}.
    */
   public class LoaderSwap implements AutoCloseable {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to