C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r890817292


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -582,6 +588,59 @@ public boolean startTask(
         }
     }
 
+    /**
+     * Using the admin principal for this connector, perform a round of zombie 
fencing that disables transactional producers
+     * for the specified number of source tasks from sending any more records.
+     * @param connName the name of the connector
+     * @param numTasks the number of tasks to fence out
+     * @param connProps the configuration of the connector; may not be null
+     * @return a {@link KafkaFuture} that will complete when the producers 
have all been fenced out, or the attempt has failed
+     */
+    public KafkaFuture<Void> fenceZombies(String connName, int numTasks, 
Map<String, String> connProps) {
+        return fenceZombies(connName, numTasks, connProps, Admin::create);
+    }
+
+    // Allows us to mock out the Admin client for testing
+    KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, 
String> connProps, Function<Map<String, Object>, Admin> adminFactory) {
+        log.debug("Fencing out {} task producers for source connector {}", 
numTasks, connName);
+        try (LoggingContext loggingContext = 
LoggingContext.forConnector(connName)) {
+            ClassLoader savedLoader = plugins.currentThreadLoader();
+            try {
+                String connType = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+                ClassLoader connectorLoader = 
plugins.delegatingLoader().connectorLoader(connType);
+                savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+                final SourceConnectorConfig connConfig = new 
SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
+                final Class<? extends Connector> connClass = 
plugins.connectorClass(
+                        
connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        connConfig,
+                        connClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SOURCE);
+                Admin admin = adminFactory.apply(adminConfig);

Review Comment:
   Good catch, done.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -582,6 +588,59 @@ public boolean startTask(
         }
     }
 
+    /**
+     * Using the admin principal for this connector, perform a round of zombie 
fencing that disables transactional producers
+     * for the specified number of source tasks from sending any more records.
+     * @param connName the name of the connector
+     * @param numTasks the number of tasks to fence out
+     * @param connProps the configuration of the connector; may not be null
+     * @return a {@link KafkaFuture} that will complete when the producers 
have all been fenced out, or the attempt has failed
+     */
+    public KafkaFuture<Void> fenceZombies(String connName, int numTasks, 
Map<String, String> connProps) {
+        return fenceZombies(connName, numTasks, connProps, Admin::create);
+    }
+
+    // Allows us to mock out the Admin client for testing
+    KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, 
String> connProps, Function<Map<String, Object>, Admin> adminFactory) {
+        log.debug("Fencing out {} task producers for source connector {}", 
numTasks, connName);
+        try (LoggingContext loggingContext = 
LoggingContext.forConnector(connName)) {
+            ClassLoader savedLoader = plugins.currentThreadLoader();
+            try {
+                String connType = 
connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+                ClassLoader connectorLoader = 
plugins.delegatingLoader().connectorLoader(connType);
+                savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);

Review Comment:
   Ah yeah, been toying with that idea for a while but never got around to 
trying it out. Works pretty well in this case; the one wrinkle is that the 
signature for `AutoCloseable::close` includes a checked exception. I've added a 
new (internal) `LoaderSwap` class that implements `AutoCloseable` and removes 
that checked exception to address that.
   
   If this looks good, we can retrofit other parts of the code base to leverage 
it in a follow-up.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -138,6 +138,17 @@ public interface Herder {
      */
     void putTaskConfigs(String connName, List<Map<String, String>> configs, 
Callback<Void> callback, InternalRequestSignature requestSignature);
 
+    /**
+     * Fence out any older task generations for a source connector, and then 
write a record to the config topic
+     * indicating that it is safe to bring up a new generation of tasks. If 
that record is already present, do nothing
+     * and invoke the callback successfully.
+     * @param connName the name of the connector to fence out; must refer to a 
source connector

Review Comment:
   The callback is invoked with an error (added to Javadocs)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to