yashmayya commented on code in PR #13434:
URL: https://github.com/apache/kafka/pull/13434#discussion_r1148315133


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1139,105 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector.
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @return the connector's offsets
+     */
+    public ConnectorOffsets connectorOffsets(String connName, Map<String, 
String> connectorConfig) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+        }

Review Comment:
   I've made the changes because it looks like all other calls to retrieve 
client configs for a connector or calls to `ConnectUtils.isSinkConnector` are 
using connector specific class loaders. However, I'm not sure I entirely follow 
why that is required since there don't seem to be any direct interactions with 
connectors in there?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1139,105 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector.
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @return the connector's offsets
+     */
+    public ConnectorOffsets connectorOffsets(String connName, Map<String, 
String> connectorConfig) {

Review Comment:
   Thanks, that's a great suggestion! I've done the required refactoring.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -866,4 +882,20 @@ public List<ConfigKeyInfo> connectorPluginConfig(String 
pluginName) {
         }
     }
 
+    @Override
+    public void connectorOffsets(String connName, Callback<ConnectorOffsets> 
cb) {
+        log.debug("Submitting offset fetch request for connector: {}", 
connName);
+        connectorExecutor.submit(() -> {
+            try {
+                if (!configState.contains(connName)) {

Review Comment:
   Oh wow, I somehow completely missed that 🤦 
   I've refactored this to use the config backing store (and snapshotting it to 
retrieve the configs)



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorOffset.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents a single {partition, offset} pair for either a sink connector or 
a source connector. For source connectors,
+ * the partition and offset structures are defined by the connector 
implementations themselves. For a sink connector,
+ * where offsets represent the underlying Kafka consumer group offsets, this 
would look something like:
+ * <pre>
+ *     {
+ *       "partition": {
+ *         "kafka_topic": "topic"
+ *         "kafka_partition": 3
+ *       },
+ *       "offset": {
+ *         "kafka_offset": 1000
+ *       }
+ *     }
+ * </pre>
+ */
+public class ConnectorOffset {
+    public static final String KAFKA_TOPIC_KEY = "kafka_topic";
+    public static final String KAFKA_PARTITION_KEY = "kafka_partition";
+    public static final String KAFKA_OFFSET_KEY = "kafka_offset";

Review Comment:
   Fair point, I've moved these to `SinkUtils` rather than adding a sink 
specific static factory method here since it does seem like a better idea to 
keep this class source / sink agnostic.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java:
##########
@@ -78,6 +85,27 @@ private void load() {
                 ByteBuffer key = (mapEntry.getKey() != null) ? 
ByteBuffer.wrap(mapEntry.getKey()) : null;
                 ByteBuffer value = (mapEntry.getValue() != null) ? 
ByteBuffer.wrap(mapEntry.getValue()) : null;
                 data.put(key, value);
+                if (key != null) {
+                    try {
+                        // The topic parameter is irrelevant for the 
JsonConverter which is the internal converter used by
+                        // Connect workers.
+                        List<Object> keyValue = (List<Object>) 
keyConverter.toConnectData("", key.array()).value();
+                        // The key should always be of the form 
[connectorName, partition] where connectorName is a
+                        // string value and partition is a Map<String, Object>
+                        String connectorName = (String) keyValue.get(0);
+                        Map<String, Object> partition = (Map<String, Object>) 
keyValue.get(1);
+                        if (!connectorPartitions.containsKey(connectorName)) {
+                            connectorPartitions.put(connectorName, new 
HashSet<>());
+                        }
+                        if (value == null) {
+                            
connectorPartitions.get(connectorName).remove(partition);

Review Comment:
   I did think about this one while writing it originally but since the main 
offset tracking `data` map here (and in the `KafkaOffsetBackingStore` as well) 
also grows infinitely in the same way, I figured that this is something that 
can looked at later (if we do decide that it's something that needs to be 
solved). Like you pointed out, it's been this way since the beginning and there 
haven't really been any practical concerns so far. We'd discussed something 
very similar previously for the `ConfigBackingStore` 
[here](https://github.com/apache/kafka/pull/12490).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java:
##########
@@ -53,9 +53,30 @@ public void requestTimeoutMs(long requestTimeoutMs) {
     }
 
     /**
-     * Wait for a FutureCallback to complete. If it succeeds, return the 
parsed response. If it fails, try to forward the
-     * request to the leader.
-      */
+     * Wait for a {@link FutureCallback} to complete and return the result if 
successful.
+     * @param cb the future callback to wait for
+     * @return the future callback's result if successful
+     * @param <T> the future's result type
+     * @throws Throwable if the future callback isn't successful
+     */
+    public <T> T completeRequest(FutureCallback<T> cb) throws Throwable {
+        try {
+            return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw e.getCause();
+        } catch (TimeoutException e) {
+            // This timeout is for the operation itself. None of the timeout 
error codes are relevant, so internal server
+            // error is the best option
+            throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
"Request timed out");
+        } catch (InterruptedException e) {
+            throw new 
ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
"Request interrupted");
+        }
+    }

Review Comment:
   Hm, I'd argue that not accounting for the ease of implementation of the 
unlikely hypothetical scenario where we need to change the logic for requests 
using this method to start doing forwarding is better than calling 
`HerderRequestHandler::completeOrForwardRequest` for requests that don't 
currently involve any request forwarding as it is slightly misleading and also 
requires passing a bunch of irrelevant parameters.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -343,12 +336,38 @@ public Future<Void> set(final Map<ByteBuffer, ByteBuffer> 
values, final Callback
         return producerCallback;
     }
 
+    @Override
+    public Set<Map<String, Object>> connectorPartitions(String connectorName) {
+        return connectorPartitions.getOrDefault(connectorName, 
Collections.emptySet());
+    }
+
+    @SuppressWarnings("unchecked")
     protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback 
= (error, record) -> {
         if (error != null) {
             log.error("Failed to read from the offsets topic", error);
             return;
         }
 
+        if (record.key() != null) {
+            try {
+                // The key should always be a list of the form [connectorName, 
partition] where connectorName is a
+                // string value and partition is a Map<String, Object>
+                List<Object> keyValue = (List<Object>) 
keyConverter.toConnectData(topic, record.key()).value();
+                String connectorName = (String) keyValue.get(0);
+                Map<String, Object> partition = (Map<String, Object>) 
keyValue.get(1);

Review Comment:
   Makes sense, done.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1133,6 +1139,105 @@ public void setTargetState(String connName, TargetState 
state, Callback<TargetSt
         }
     }
 
+    /**
+     * Get the current offsets for a connector.
+     * @param connName the name of the connector whose offsets are to be 
retrieved
+     * @param connectorConfig the connector's configurations
+     * @return the connector's offsets
+     */
+    public ConnectorOffsets connectorOffsets(String connName, Map<String, 
String> connectorConfig) {
+        String connectorClassOrAlias = 
connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        ClassLoader connectorLoader = 
plugins.connectorLoader(connectorClassOrAlias);
+        Connector connector;
+
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) 
{
+            connector = plugins.newConnector(connectorClassOrAlias);
+        }
+
+        if (ConnectUtils.isSinkConnector(connector)) {
+            log.debug("Fetching offsets for sink connector: {}", connName);
+            return sinkConnectorOffsets(connName, connector, connectorConfig);
+        } else {
+            log.debug("Fetching offsets for source connector: {}", connName);
+            return sourceConnectorOffsets(connName, connector, 
connectorConfig);
+        }
+    }
+
+    /**
+     * Get the current consumer group offsets for a sink connector.
+     * @param connName the name of the sink connector whose offsets are to be 
retrieved
+     * @param connector the sink connector
+     * @param connectorConfig the sink connector's configurations
+     * @return the consumer group offsets for the sink connector
+     */
+    private ConnectorOffsets sinkConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig) {
+        return sinkConnectorOffsets(connName, connector, connectorConfig, 
Admin::create);
+    }
+
+    // Visible for testing; allows us to mock out the Admin client for testing
+    ConnectorOffsets sinkConnectorOffsets(String connName, Connector 
connector, Map<String, String> connectorConfig,
+                                          Function<Map<String, Object>, Admin> 
adminFactory) {
+        Map<String, Object> adminConfig = adminConfigs(
+                connName,
+                "connector-worker-adminclient-" + connName,
+                config,
+                new SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(),
+                connectorClientConfigOverridePolicy,
+                kafkaClusterId,
+                ConnectorType.SOURCE);
+        String groupId = (String) baseConsumerConfigs(
+                connName, "connector-consumer-", config, new 
SinkConnectorConfig(plugins, connectorConfig),
+                connector.getClass(), connectorClientConfigOverridePolicy, 
kafkaClusterId, ConnectorType.SINK).get(ConsumerConfig.GROUP_ID_CONFIG);
+        Admin admin = adminFactory.apply(adminConfig);
+        try {
+            ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = 
admin.listConsumerGroupOffsets(groupId);
+            try {
+                // Not using a timeout for the Future::get here because each 
offset get request is handled in its own thread in AbstractHerder
+                // and the REST API request timeout in HerderRequestHandler 
will ensure that the user request doesn't hang indefinitely

Review Comment:
   Ah yes, that makes sense. Thanks, done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to