mimaison commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r890986946


##########
gradle/spotbugs-exclude.xml:
##########
@@ -311,6 +311,16 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
         <Bug pattern="SF_SWITCH_FALLTHROUGH"/>
     </Match>
 
+    <Match>
+        <!--
+            Temporarily suppress warnings about unused private methods (will 
be used in a subsequent pull request)
+            TODO: Remove this before merging to trunk

Review Comment:
   Do you already have the PR that clears this?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -138,6 +138,18 @@ public interface Herder {
      */
     void putTaskConfigs(String connName, List<Map<String, String>> configs, 
Callback<Void> callback, InternalRequestSignature requestSignature);
 
+    /**
+     * Fence out any older task generations for a source connector, and then 
write a record to the config topic
+     * indicating that it is safe to bring up a new generation of tasks. If 
that record is already present, do nothing
+     * and invoke the callback successfully.
+     * @param connName the name of the connector to fence out, which must 
refer to a source connector; if the
+     *                 connector does not exist or is not a source connector, 
the callback will be invoked with an error
+     * @param callback callback to invoke upon completion
+     * @param requestSignature the signature of the request made for this 
connector;
+     *                         may be null if no signature was provided
+     */
+    void fenceZombies(String connName, Callback<Void> callback, 
InternalRequestSignature requestSignature);

Review Comment:
   What about `fenceZombieSourceTasks()`? I find `fenceZombies()` a bit too 
generic



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -320,6 +320,18 @@ public void putTaskConfigs(final @PathParam("connector") 
String connector,
         completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", 
"POST", headers, taskConfigs, forward);
     }
 
+    @PUT
+    @Path("/{connector}/fence")
+    public Response fenceZombies(final @PathParam("connector") String 
connector,
+                             final @Context HttpHeaders headers,
+                             final @QueryParam("forward") Boolean forward,
+                             final byte[] requestBody) throws Throwable {
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herder.fenceZombies(connector, cb, 
InternalRequestSignature.fromHeaders(requestBody, headers));
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/fence", 
"PUT", headers, requestBody, forward);
+        return Response.ok().build();

Review Comment:
   Do we need this? Also does this method need to return `Response`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -291,7 +328,16 @@ public void start() {
         log.info("Starting KafkaConfigBackingStore");
         // Before startup, callbacks are *not* invoked. You can grab a 
snapshot after starting -- just take care that
         // updates can continue to occur in the background
-        configLog.start();
+        try {
+            configLog.start();
+        } catch (UnsupportedVersionException e) {
+            throw new ConnectException(
+                    "Enabling exactly-once support for source connectors 
requires a Kafka broker version that allows "
+                            + "admin clients to read consumer offsets. Disable 
the worker's exactly-once support "
+                            + "for source connectors, or use a new Kafka 
broker version.",

Review Comment:
   `newer`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -201,6 +214,9 @@ public static String COMMIT_TASKS_KEY(String connectorName) 
{
     public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct()
             .field("state", Schema.STRING_SCHEMA)
             .build();
+    public static final Schema TASK_COUNT_RECORD_V0 = SchemaBuilder.struct()
+            .field("tasks", Schema.INT32_SCHEMA)

Review Comment:
   Would `task_count` or even `count` (like `state`) be clearer?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -309,16 +355,70 @@ public void start() {
     @Override
     public void stop() {
         log.info("Closing KafkaConfigBackingStore");
-        try {
-            configLog.stop();
-        } finally {
-            if (ownTopicAdmin != null) {
-                ownTopicAdmin.close();
-            }
+
+        if (fencableProducer != null) {
+            Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), 
"fencable producer for config topic");
         }
+        Utils.closeQuietly(ownTopicAdmin, "admin for config topic");
+        Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic");
+
         log.info("Closed KafkaConfigBackingStore");
     }
 
+    @Override
+    public void claimWritePrivileges() {
+        if (usesFencableWriter && fencableProducer == null) {
+            try {
+                fencableProducer = createFencableProducer();
+                fencableProducer.initTransactions();
+            } catch (Exception e) {
+                if (fencableProducer != null) {
+                    Utils.closeQuietly(() -> 
fencableProducer.close(Duration.ZERO), "fencable producer for config topic");
+                    fencableProducer = null;
+                }
+                throw new ConnectException("Failed to create and initialize 
fencable producer for config topic", e);
+            }
+        }
+    }
+
+    private Map<String, Object> baseProducerProps(WorkerConfig workerConfig) {
+        Map<String, Object> producerProps = new 
HashMap<>(workerConfig.originals());
+        String kafkaClusterId = 
ConnectUtils.lookupKafkaClusterId(workerConfig);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
+        ConnectUtils.addMetricsContextProperties(producerProps, workerConfig, 
kafkaClusterId);
+        return producerProps;
+    }
+
+    // Visible for testing
+    Map<String, Object> fencableProducerProps(DistributedConfig workerConfig) {
+        Map<String, Object> result = new 
HashMap<>(baseProducerProps(workerConfig));
+
+        // Always require producer acks to all to ensure durable writes
+        result.put(ProducerConfig.ACKS_CONFIG, "all");
+        // Don't allow more than one in-flight request to prevent reordering 
on retry (if enabled)
+        result.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

Review Comment:
   Isn't ordering still guaranteed with retries when idempotency is enabled?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java:
##########
@@ -90,6 +89,10 @@ public interface ConfigBackingStore {
      */
     void putTargetState(String connector, TargetState state);
 
+    /**
+     * Store a new {@link SessionKey} that can be used to validate internal 
(i.e., non-user-triggered) inter-worker communication.

Review Comment:
   This type of small cleanups are really appreciated, thanks!



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -784,6 +845,14 @@ private static Map<String, Object> 
connectorClientConfigOverrides(ConnectorTaskI
         return clientOverrides;
     }
 
+    private String transactionalId(ConnectorTaskId id) {

Review Comment:
   I was confused for a moment as I remember seeing these methods in another 
PR. I see this PR has conflicts so this must be the reason and they'll 
disappear from here once this is rebased on trunk



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -784,6 +845,14 @@ private static Map<String, Object> 
connectorClientConfigOverrides(ConnectorTaskI
         return clientOverrides;
     }
 
+    private String transactionalId(ConnectorTaskId id) {
+        return transactionalId(config.groupId(), id.connector(), id.task());
+    }
+
+    public static String transactionalId(String groupId, String connector, int 
taskId) {

Review Comment:
   Is this going to be called from other places in the remaining PRs? If not we 
could get rid of it



##########
connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java:
##########
@@ -139,14 +140,23 @@ public void testNoFileOption() throws IOException {
         jaasBasicAuthFilter.filter(requestContext);
 
         verify(requestContext).abortWith(any(Response.class));
-        verify(requestContext).getMethod();
+        verify(requestContext, atLeastOnce()).getMethod();
         
verify(requestContext).getHeaderString(JaasBasicAuthFilter.AUTHORIZATION);
     }
 
     @Test
-    public void testPostWithoutAppropriateCredential() throws IOException {
+    public void testInternalTaskConfigEndpointSkipped() throws IOException {
+        testInternalEndpointSkipped("connectors/connName/tasks");
+    }
+
+    @Test
+    public void testInternalZombieFencingEndpointSkipped() throws IOException {
+        testInternalEndpointSkipped("connectors/connName/fence");
+    }
+
+    private void testInternalEndpointSkipped(String endpoint) throws 
IOException {
         UriInfo uriInfo = mock(UriInfo.class);
-        when(uriInfo.getPath()).thenReturn("connectors/connName/tasks");
+        when(uriInfo.getPath()).thenReturn(endpoint);
 
         ContainerRequestContext requestContext = 
mock(ContainerRequestContext.class);
         when(requestContext.getMethod()).thenReturn(HttpMethod.POST);

Review Comment:
   How does this test work with the `connectors/connName/fence` endpoint that 
uses `PUT`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java:
##########
@@ -280,4 +326,4 @@ public int hashCode() {
                 inconsistentConnectors,
                 configTransformer);
     }
-}
+}

Review Comment:
   Nit, let's keep the new line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to