This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1c8bb61a43d KAFKA-15387: Deprecate Connect's redundant task 
configurations endpoint (#14361)
1c8bb61a43d is described below

commit 1c8bb61a43d3ad1fd7a10eb3947342ceba783c4e
Author: Yash Mayya <[email protected]>
AuthorDate: Sat Oct 14 10:16:50 2023 +0100

    KAFKA-15387: Deprecate Connect's redundant task configurations endpoint 
(#14361)
    
    Reviewers: Mickael Maison <[email protected]>, Sagar Rao 
<[email protected]>
---
 .../connect/runtime/rest/entities/TaskInfo.java    |  4 +-
 .../runtime/rest/resources/ConnectorsResource.java |  4 +-
 .../integration/ConnectWorkerIntegrationTest.java  | 45 ++++++++++++++++++++--
 .../util/clusters/EmbeddedConnectCluster.java      | 11 +++---
 docs/connect.html                                  |  3 +-
 5 files changed, 55 insertions(+), 12 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java
index 8e6f3d7baac..635b8be0704 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.rest.entities;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
@@ -26,7 +27,8 @@ public class TaskInfo {
     private final ConnectorTaskId id;
     private final Map<String, String> config;
 
-    public TaskInfo(ConnectorTaskId id, Map<String, String> config) {
+    @JsonCreator
+    public TaskInfo(@JsonProperty("id") ConnectorTaskId id, 
@JsonProperty("config") Map<String, String> config) {
         this.id = id;
         this.config = config;
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 9f1cabcfa14..4878b8df9e1 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -173,9 +173,11 @@ public class ConnectorsResource implements ConnectResource 
{
 
     @GET
     @Path("/{connector}/tasks-config")
-    @Operation(summary = "Get the configuration of all tasks for the specified 
connector")
+    @Operation(deprecated = true, summary = "Get the configuration of all 
tasks for the specified connector")
     public Map<ConnectorTaskId, Map<String, String>> getTasksConfig(
             final @PathParam("connector") String connector) throws Throwable {
+        log.warn("The 'GET /connectors/{connector}/tasks-config' endpoint is 
deprecated and will be removed in the next major release. "
+            + "Please use the 'GET /connectors/{connector}/tasks' endpoint 
instead.");
         FutureCallback<Map<ConnectorTaskId, Map<String, String>>> cb = new 
FutureCallback<>();
         herder.tasksConfig(connector, cb);
         return requestHandler.completeRequest(cb);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 4c393d95ad3..42c3831faf4 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.connect.integration;
 
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
 import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.WorkerHandle;
@@ -29,9 +31,11 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
@@ -51,6 +55,8 @@ import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -382,7 +388,7 @@ public class ConnectWorkerIntegrationTest {
         );
         // If the connector is truly stopped, we should also see an empty set 
of tasks and task configs
         assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
-        assertEquals(Collections.emptyMap(), 
connect.taskConfigs(CONNECTOR_NAME));
+        assertEquals(Collections.emptyList(), 
connect.taskConfigs(CONNECTOR_NAME));
 
         // Transition to RUNNING
         connect.resumeConnector(CONNECTOR_NAME);
@@ -411,7 +417,7 @@ public class ConnectWorkerIntegrationTest {
                 "Connector did not stop in time"
         );
         assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
-        assertEquals(Collections.emptyMap(), 
connect.taskConfigs(CONNECTOR_NAME));
+        assertEquals(Collections.emptyList(), 
connect.taskConfigs(CONNECTOR_NAME));
 
         // Transition to PAUSED
         connect.pauseConnector(CONNECTOR_NAME);
@@ -471,7 +477,7 @@ public class ConnectWorkerIntegrationTest {
         );
         // If the connector is truly stopped, we should also see an empty set 
of tasks and task configs
         assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
-        assertEquals(Collections.emptyMap(), 
connect.taskConfigs(CONNECTOR_NAME));
+        assertEquals(Collections.emptyList(), 
connect.taskConfigs(CONNECTOR_NAME));
 
         // Can resume a connector after its Connector has failed before 
shutdown after receiving a stop request
         props.remove("connector.start.inject.error");
@@ -493,7 +499,7 @@ public class ConnectWorkerIntegrationTest {
                 "Connector did not stop in time"
         );
         assertEquals(Collections.emptyList(), 
connect.connectorInfo(CONNECTOR_NAME).tasks());
-        assertEquals(Collections.emptyMap(), 
connect.taskConfigs(CONNECTOR_NAME));
+        assertEquals(Collections.emptyList(), 
connect.taskConfigs(CONNECTOR_NAME));
 
         // Can resume a connector after its Connector has failed during 
shutdown after receiving a stop request
         connect.resumeConnector(CONNECTOR_NAME);
@@ -511,6 +517,37 @@ public class ConnectWorkerIntegrationTest {
         );
     }
 
+    /**
+     * The <strong><em>GET /connectors/{connector}/tasks-config</em></strong> 
endpoint was deprecated in
+     * <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-970%3A+Deprecate+and+remove+Connect%27s+redundant+task+configurations+endpoint";>KIP-970</a>
+     * and is slated for removal in the next major release. This test verifies 
that the deprecation warning log is emitted on trying to use the
+     * deprecated endpoint.
+     */
+    @Test
+    public void testTasksConfigDeprecation() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+            "Initial group of workers did not start in time.");
+
+        connect.configureConnector(CONNECTOR_NAME, 
defaultSourceConnectorProps(TOPIC_NAME));
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME,
+            NUM_TASKS,
+            "Connector tasks did not start in time"
+        );
+
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(ConnectorsResource.class)) {
+            connect.requestGet(connect.endpointForResource("connectors/" + 
CONNECTOR_NAME + "/tasks-config"));
+            List<LogCaptureAppender.Event> logEvents = 
logCaptureAppender.getEvents();
+            assertEquals(1, logEvents.size());
+            assertEquals(Level.WARN.toString(), logEvents.get(0).getLevel());
+            assertThat(logEvents.get(0).getMessage(), 
containsString("deprecated"));
+        }
+    }
+
     private Map<String, String> defaultSourceConnectorProps(String topic) {
         // setup up props for the source connector
         Map<String, String> props = new HashMap<>();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 20dce332fcf..9e3edcd8f24 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -31,6 +31,7 @@ import 
org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
+import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.util.SinkUtils;
 import org.eclipse.jetty.client.HttpClient;
@@ -611,19 +612,19 @@ public class EmbeddedConnectCluster {
 
     /**
      * Get the task configs of a connector running in this cluster.
-
+     *
      * @param connectorName name of the connector
-     * @return a map from task ID (connector name + "-" + task number) to task 
config
+     * @return a list of task configurations for the connector
      */
-    public Map<String, Map<String, String>> taskConfigs(String connectorName) {
+    public List<TaskInfo> taskConfigs(String connectorName) {
         ObjectMapper mapper = new ObjectMapper();
-        String url = 
endpointForResource(String.format("connectors/%s/tasks-config", connectorName));
+        String url = endpointForResource(String.format("connectors/%s/tasks", 
connectorName));
         Response response = requestGet(url);
         try {
             if (response.getStatus() < 
Response.Status.BAD_REQUEST.getStatusCode()) {
                 // We use String instead of ConnectorTaskId as the key here 
since the latter can't be automatically
                 // deserialized by Jackson when used as a JSON object key 
(i.e., when it's serialized as a JSON string)
-                return mapper.readValue(responseToString(response), new 
TypeReference<Map<String, Map<String, String>>>() { });
+                return mapper.readValue(responseToString(response), new 
TypeReference<List<TaskInfo>>() { });
             }
         } catch (IOException e) {
             log.error("Could not read task configs from response: {}",
diff --git a/docs/connect.html b/docs/connect.html
index 2deb8901888..16f268e78b0 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -298,7 +298,8 @@ listeners=http://localhost:8080,https://localhost:8443</pre>
         <li><code>GET /connectors/{name}/config</code> - get the configuration 
parameters for a specific connector</li>
         <li><code>PUT /connectors/{name}/config</code> - update the 
configuration parameters for a specific connector</li>
         <li><code>GET /connectors/{name}/status</code> - get current status of 
the connector, including if it is running, failed, paused, etc., which worker 
it is assigned to, error information if it has failed, and the state of all its 
tasks</li>
-        <li><code>GET /connectors/{name}/tasks</code> - get a list of tasks 
currently running for a connector</li>
+        <li><code>GET /connectors/{name}/tasks</code> - get a list of tasks 
currently running for a connector along with their configurations</li>
+        <li><code>GET /connectors/{name}/tasks-config</code> - get the 
configuration of all tasks for a specific connector. This endpoint is 
deprecated and will be removed in the next major release. Please use the 
<code>GET /connectors/{name}/tasks</code> endpoint instead. Note that the 
response structures of the two endpoints differ slightly, please refer to the 
<a href="/{{version}}/generated/connect_rest.yaml">OpenAPI documentation</a> 
for more details</li>
         <li><code>GET /connectors/{name}/tasks/{taskid}/status</code> - get 
current status of the task, including if it is running, failed, paused, etc., 
which worker it is assigned to, and error information if it has failed</li>
         <li><code>PUT /connectors/{name}/pause</code> - pause the connector 
and its tasks, which stops message processing until the connector is resumed. 
Any resources claimed by its tasks are left allocated, which allows the 
connector to begin processing data quickly once it is resumed.</li>
         <li id="connect_stopconnector"><code>PUT 
/connectors/{name}/stop</code> - stop the connector and shut down its tasks, 
deallocating any resources claimed by its tasks. This is more efficient from a 
resource usage standpoint than pausing the connector, but can cause it to take 
longer to begin processing data once resumed. Note that the offsets for a 
connector can be only modified via the offsets management endpoints if it is in 
the stopped state</li>

Reply via email to