C0urante commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1153462321


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -866,4 +867,18 @@ public List<ConfigKeyInfo> connectorPluginConfig(String 
pluginName) {
         }
     }
 
+    @Override
+    public void connectorOffsets(String connName, Callback<ConnectorOffsets> 
cb) {

Review Comment:
   Ah yeah, forgot that we don't forward to the leader for this.
   
   In that case, I think we should go back to using a snapshot-based approach 
in the `AbstractHerder::connectorOffsets` method, which will guarantee a 
consistent view of the config topic for the remainder of the method:
   ```java
       @Override
       public void connectorOffsets(String connName, Callback<ConnectorOffsets> 
cb) {
           log.trace("Submitting offset fetch request for connector: {}", 
connName);
           ClusterConfigState snapshot = configBackingStore.snapshot();
           try {
               if (!snapshot.contains(connName)) {
                   cb.onCompletion(new NotFoundException("Connector " + 
connName + " not found"), null);
                   return;
               }
               // The worker asynchronously processes the request and completes 
the passed callback when done
               worker.connectorOffsets(connName, 
snapshot.connectorConfig(connName), cb);
           } catch (Throwable t) {
               cb.onCompletion(t, null);
           }
       }
   ```
   
   I'd also still recommend doing it on the tick thread for the distributed 
herder just to avoid any potential concurrency headaches; the operations we 
perform between before things get delegated to another thread by the `Worker` 
class are lightweight enough that there shouldn't be concerns about 
unnecessarily blocking the herder's tick thread. We might also want to do a 
brief refresh of the config topic before invoking `super::connectorOffsets`, to 
give us a best-effort chance to get up-to-speed on the state of the topic 
before taking the snapshot.
   
   RE the "submitting request" language: you're not wrong, but so far we've 
used that language exclusively when submitting requests to the herder queue, 
and it'll be nice if we can tweak the language in other places to preserve that 
distinction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to