C0urante commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1153462551
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1137,111 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. This method is asynchronous and the passed callback is completed when the + * request finishes processing. + * + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @param cb callback to invoke upon completion of the request + */ + public void connectorOffsets(String connName, Map<String, String> connectorConfig, Callback<ConnectorOffsets> cb) { + executor.submit(() -> { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Fetching offsets for sink connector: {}", connName); + sinkConnectorOffsets(connName, connector, connectorConfig, cb); + } else { + log.debug("Fetching offsets for source connector: {}", connName); + sourceConnectorOffsets(connName, connector, connectorConfig, cb); + } + } catch (Exception e) { + cb.onCompletion(e, null); + } + }); + } + + /** + * Get the current consumer group offsets for a sink connector. + * @param connName the name of the sink connector whose offsets are to be retrieved + * @param connector the sink connector + * @param connectorConfig the sink connector's configurations + * @param cb callback to invoke upon completion of the request + */ + private void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb) { + sinkConnectorOffsets(connName, connector, connectorConfig, cb, Admin::create); + } + + // Visible for testing; allows us to mock out the Admin client for testing + void sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Callback<ConnectorOffsets> cb, Function<Map<String, Object>, Admin> adminFactory) { + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + Admin admin = adminFactory.apply(adminConfig); Review Comment: Yep, besides the `Exception` vs. `Throwable` thing this looks good 👍 ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -866,4 +867,18 @@ public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { } } + @Override + public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { Review Comment: Ah yeah, forgot that we don't forward to the leader for this. In that case, I think we should go back to using a snapshot-based approach in the `AbstractHerder::connectorOffsets` method, which will guarantee a consistent view of the config topic for the remainder of the method: ```java @Override public void connectorOffsets(String connName, Callback<ConnectorOffsets> cb) { log.trace("Submitting offset fetch request for connector: {}", connName); ClusterConfigState snapshot = configBackingStore.snapshot(); try { if (!snapshot.contains(connName)) { cb.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); return; } // The worker asynchronously processes the request and completes the passed callback when done worker.connectorOffsets(connName, snapshot.connectorConfig(connName), cb); } catch (Throwable t) { cb.onCompletion(t, null); } } ``` I'd also still recommend doing it on the tick thread for the distributed herder just to avoid any potential concurrency headaches; the operations we perform between before things get delegated to another thread by the `Worker` class are lightweight enough that there shouldn't be concerns about unnecessarily blocking the herder's tick thread. RE the "submitting request" language: you're not wrong, but so far we've used that language exclusively when submitting requests to the herder queue, and it'll be nice if we can tweak the language in other places to preserve that distinction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org