yashmayya commented on code in PR #13434: URL: https://github.com/apache/kafka/pull/13434#discussion_r1144415671
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1139,106 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @return the connector's offsets + */ + public ConnectorOffsets connectorOffsets(String connName, Map<String, String> connectorConfig) { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + } + + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Fetching offsets for sink connector: {}", connName); + return sinkConnectorOffsets(connName, connector, connectorConfig); + } else { + log.debug("Fetching offsets for source connector: {}", connName); + return sourceConnectorOffsets(connName, connector, connectorConfig); + } + } + + /** + * Get the current consumer group offsets for a sink connector. + * @param connName the name of the sink connector whose offsets are to be retrieved + * @param connector the sink connector + * @param connectorConfig the sink connector's configurations + * @return the consumer group offsets for the sink connector + */ + private ConnectorOffsets sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig) { + return sinkConnectorOffsets(connName, connector, connectorConfig, Admin::create); + } + + // Visible for testing; allows us to mock out the Admin client for testing + ConnectorOffsets sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Function<Map<String, Object>, Admin> adminFactory) { + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + Admin admin = adminFactory.apply(adminConfig); + try { + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId); Review Comment: Currently, admin clients are only instantiated for sink connectors to create the DLQ topic if required. So it seems like it could be technically possible for a sink connector's consumer client overrides to target a different Kafka cluster from its producer and admin client overrides. Such a setup won't work with this implementation of the get offsets API as it is using an admin client to get a sink connector's consumer group offsets. However, I'm not sure we want to use a consumer client to retrieve the offsets either as we shouldn't be disrupting the existing sink tasks' consumer group just to fetch offsets. Leveraging a sink task's consumer also isn't an option because fetching offsets for a stopped sink connector (where all the tasks will be stopped) should be allowed. I'm wondering if we should document that a connector's various client config override policies shouldn't target different Kafka clusters (side note - looks like we don't [currently document](https://kafka.apache. org/documentation/#connect) client config overrides for Connect beyond just the worker property `connector.client.config.override.policy`). ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1133,6 +1139,106 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt } } + /** + * Get the current offsets for a connector. + * @param connName the name of the connector whose offsets are to be retrieved + * @param connectorConfig the connector's configurations + * @return the connector's offsets + */ + public ConnectorOffsets connectorOffsets(String connName, Map<String, String> connectorConfig) { + String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); + Connector connector; + + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + connector = plugins.newConnector(connectorClassOrAlias); + } + + if (ConnectUtils.isSinkConnector(connector)) { + log.debug("Fetching offsets for sink connector: {}", connName); + return sinkConnectorOffsets(connName, connector, connectorConfig); + } else { + log.debug("Fetching offsets for source connector: {}", connName); + return sourceConnectorOffsets(connName, connector, connectorConfig); + } + } + + /** + * Get the current consumer group offsets for a sink connector. + * @param connName the name of the sink connector whose offsets are to be retrieved + * @param connector the sink connector + * @param connectorConfig the sink connector's configurations + * @return the consumer group offsets for the sink connector + */ + private ConnectorOffsets sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig) { + return sinkConnectorOffsets(connName, connector, connectorConfig, Admin::create); + } + + // Visible for testing; allows us to mock out the Admin client for testing + ConnectorOffsets sinkConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig, + Function<Map<String, Object>, Admin> adminFactory) { + Map<String, Object> adminConfig = adminConfigs( + connName, + "connector-worker-adminclient-" + connName, + config, + new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), + connectorClientConfigOverridePolicy, + kafkaClusterId, + ConnectorType.SOURCE); + String groupId = (String) baseConsumerConfigs( + connName, "connector-consumer-", config, new SinkConnectorConfig(plugins, connectorConfig), + connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG); + Admin admin = adminFactory.apply(adminConfig); + try { + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = admin.listConsumerGroupOffsets(groupId); + try { + // Not using a timeout for the Future::get here because each offset get request is handled in its own thread in AbstractHerder + // and the REST API request timeout in HerderRequestHandler will ensure that the user request doesn't hang indefinitely + Map<TopicPartition, OffsetAndMetadata> offsets = listConsumerGroupOffsetsResult.all().get().get(groupId); + return SinkUtils.consumerGroupOffsetsToConnectorOffsets(offsets); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to retrieve consumer group offsets for sink connector {}", connName); + throw new ConnectException("Failed to retrieve consumer group offsets for sink connector " + connName, e); + } + } finally { + Utils.closeQuietly(admin, "Offset fetch admin for sink connector " + connName); + } + } + + /** + * Get the current offsets for a source connector. + * @param connName the name of the source connector whose offsets are to be retrieved + * @param connector the source connector + * @param connectorConfig the source connector's configurations + * @return the source connector's offsets + */ + private ConnectorOffsets sourceConnectorOffsets(String connName, Connector connector, Map<String, String> connectorConfig) { Review Comment: An alternate to this could be to route offset read requests to the worker that is running the source connector and use the `WorkerConnector`'s offset store. The tradeoff is having potentially multiple request reroutes (once from the original worker where the REST request lands to the leader and another from the leader to the worker which is running the connector) and a divergence between how requests are handled for source and sink connectors versus reading to the end of the offsets topic (presumably, the `WorkerConnector`'s offset reader would not have to read from the beginning to the end of the offset topic since it would already have been started). ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java: ########## @@ -65,6 +66,13 @@ public interface OffsetBackingStore { */ Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callback); + /** + * Get all the partitions for the specified connector. + * @param connectorName the name of the connector whose partitions are to be retrieved + * @return set of connector partitions + */ + Set<Map<String, Object>> connectorPartitions(String connectorName); Review Comment: There's a couple of things I'd like to note here: 1. As discussed in the [KIP-875 mailing list discussion](https://lists.apache.org/thread/m5bklnh5w4mwr9nbzrmfk0pftpxfjd02), the `OffsetBackingStore` currently only deals with serialized byte buffers because Connect originally had support for pluggable converters for internal topics (which was removed and the`JsonConverter` has been used as the default converter since `2.0.0`). 2. The `OffsetBackingStore` interface was originally designed as a generic key value store. However, the `OffsetStorageWriter` interface is currently the only writer to the offset store interface and it ensures that the keys are of a [certain format](https://github.com/apache/kafka/blob/1560c5bd7e556ca6c0f49934b5ad3542ed6208fb/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java#L177-L178). The knowledge of this format is required in order to be able to track partitions by connector in the `OffsetBackingStore`. In either this PR or a follow-up, we should probably refactor the `OffsetBackingStore` interface methods and move all serialization / deserialization from the `OffsetStorageWriter` and `OffsetStorageReader` interfaces into the `OffsetBackingStore` itself. This will make the new `OffsetBackingStore::connectorPartitions` method more in line with the other interface method and also keep the logic for the key format (list of the form `[connectorName, partition]` where `connectorName` is a `string` value and `partition` is a `Map<String, Object>`) in a single central place. While one could argue that this would be making the `OffsetBackingStore` interface more specific / less generic than it currently is, there aren't any other use cases for the generic offset store currently or in the foreseeable future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org