yashmayya commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1144415671


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1139,106 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector.
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @return the connector's offsets
+     */
+    public ConnectorOffsets connectorOffsets(String connName, Map<String, 
String> connectorConfig) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+        }
+
+        if (ConnectUtils.isSinkConnector(connector)) {
+            log.debug("Fetching offsets for sink connector: {}", connName);
+            return sinkConnectorOffsets(connName, connector, connectorConfig);
+        } else {
+            log.debug("Fetching offsets for source connector: {}", connName);
+            return sourceConnectorOffsets(connName, connector, 
connectorConfig);
+        }
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @return the consumer group offsets for the sink connector
+     */
+    private ConnectorOffsets sinkConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig) {
+        return sinkConnectorOffsets(connName, connector, connectorConfig, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    ConnectorOffsets sinkConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig,
+                                          Function<Map<String, Object>, Admin> 
adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);
+        try {
+            ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = 
admin.listConsumerGroupOffsets(groupId);

Review Comment:
   Currently, admin clients are only instantiated for sink connectors to create 
the DLQ topic if required. So it seems like it could be technically possible 
for a sink connector's consumer client overrides to target a different Kafka 
cluster from its producer and admin client overrides. Such a setup won't work 
with this implementation of the get offsets API as it is using an admin client 
to get a sink connector's consumer group offsets. However, I'm not sure we want 
to use a consumer client to retrieve the offsets either as we shouldn't be 
disrupting the existing sink tasks' consumer group just to fetch offsets. 
Leveraging a sink task's consumer also isn't an option because fetching offsets 
for a stopped sink connector (where all the tasks will be stopped) should be 
allowed. I'm wondering if we should document that a connector's various client 
config override policies shouldn't target different Kafka clusters (side note - 
looks like we don't [currently document](https://kafka.apache.
 org/documentation/#connect) client config overrides for Connect beyond just 
the worker property `connector.client.config.override.policy`).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1139,106 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector.
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @return the connector's offsets
+     */
+    public ConnectorOffsets connectorOffsets(String connName, Map<String, 
String> connectorConfig) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+        }
+
+        if (ConnectUtils.isSinkConnector(connector)) {
+            log.debug("Fetching offsets for sink connector: {}", connName);
+            return sinkConnectorOffsets(connName, connector, connectorConfig);
+        } else {
+            log.debug("Fetching offsets for source connector: {}", connName);
+            return sourceConnectorOffsets(connName, connector, 
connectorConfig);
+        }
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @return the consumer group offsets for the sink connector
+     */
+    private ConnectorOffsets sinkConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig) {
+        return sinkConnectorOffsets(connName, connector, connectorConfig, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    ConnectorOffsets sinkConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig,
+                                          Function<Map<String, Object>, Admin> 
adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);
+        try {
+            ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = 
admin.listConsumerGroupOffsets(groupId);
+            try {
+                // Not using a timeout for the Future::get here because each 
offset get request is handled in its own thread in AbstractHerder
+                // and the REST API request timeout in HerderRequestHandler 
will ensure that the user request doesn't hang indefinitely
+                Map<TopicPartition, OffsetAndMetadata> offsets = 
listConsumerGroupOffsetsResult.all().get().get(groupId);
+                return 
SinkUtils.consumerGroupOffsetsToConnectorOffsets(offsets);
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Failed to retrieve consumer group offsets for sink 
connector {}", connName);
+                throw new ConnectException("Failed to retrieve consumer group 
offsets for sink connector " + connName, e);
+            }
+        } finally {
+            Utils.closeQuietly(admin, "Offset fetch admin for sink connector " 
+ connName);
+        }
+    }
+
+    /**
+     * Get the current offsets for a source connector.
+     * @param connName the name of the source connector whose offsets are to 
be retrieved
+     * @param connector the source connector
+     * @param connectorConfig the source connector's configurations
+     * @return the source connector's offsets
+     */
+    private ConnectorOffsets sourceConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig) {

Review Comment:
   An alternate to this could be to route offset read requests to the worker 
that is running the source connector and use the `WorkerConnector`'s offset 
store. The tradeoff is having potentially multiple request reroutes (once from 
the original worker where the REST request lands to the leader and another from 
the leader to the worker which is running the connector) and a divergence 
between how requests are handled for source and sink connectors versus reading 
to the end of the offsets topic (presumably, the `WorkerConnector`'s offset 
reader would not have to read from the beginning to the end of the offset topic 
since it would already have been started).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java:
##########
@@ -65,6 +66,13 @@ public interface OffsetBackingStore {
      */
     Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> 
callback);
 
+    /**
+     * Get all the partitions for the specified connector.
+     * @param connectorName the name of the connector whose partitions are to 
be retrieved
+     * @return set of connector partitions
+     */
+    Set<Map<String, Object>> connectorPartitions(String connectorName);

Review Comment:
   There's a couple of things I'd like to note here:
   
   1. As discussed in the [KIP-875 mailing list 
discussion](https://lists.apache.org/thread/m5bklnh5w4mwr9nbzrmfk0pftpxfjd02), 
the `OffsetBackingStore` currently only deals with serialized byte buffers 
because Connect originally had support for pluggable converters for internal 
topics (which was removed and the`JsonConverter` has been used as the default 
converter since `2.0.0`).
   2. The `OffsetBackingStore` interface was originally designed as a generic 
key value store. However, the `OffsetStorageWriter` interface is currently the 
only writer to the offset store interface and it ensures that the keys are of a 
[certain 
format](https://github.com/apache/kafka/blob/1560c5bd7e556ca6c0f49934b5ad3542ed6208fb/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java#L177-L178).
 The knowledge of this format is required in order to be able to track 
partitions by connector in the `OffsetBackingStore`.
   
   In either this PR or a follow-up, we should probably refactor the 
`OffsetBackingStore` interface methods and move all serialization / 
deserialization from the `OffsetStorageWriter` and `OffsetStorageReader` 
interfaces into the `OffsetBackingStore` itself. This will make the new 
`OffsetBackingStore::connectorPartitions` method more in line with the other 
interface method and also keep the logic for the key format (list of the form 
`[connectorName, partition]` where `connectorName` is a `string` value and 
`partition` is a `Map<String, Object>`) in a single central place.
   
   While one could argue that this would be making the `OffsetBackingStore` 
interface more specific / less generic than it currently is, there aren't any 
other use cases for the generic offset store currently or in the foreseeable 
future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to