yashmayya commented on code in PR #13424:
URL: https://github.com/apache/kafka/pull/13424#discussion_r1144593645


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -245,6 +245,14 @@ default void validateConnectorConfig(Map<String, String> 
connectorConfig, Callba
      */
     void restartConnectorAndTasks(RestartRequest request, 
Callback<ConnectorStateInfo> cb);
 
+    /**
+     * Stop the conector. This call will asynchronously suspend processing by 
the connector and all
+     * of its tasks.

Review Comment:
   > This call will asynchronously suspend processing by the connector and all 
of its tasks.
   
   This is the same as the description for `Herder::pauseConnector`. It might 
be worth briefly outlining the difference between the two (tasks stop etc.)?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -325,6 +325,181 @@ public void 
testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce
         assertTrue("Connector and all tasks were not stopped in time", 
stopCounter.await(1, TimeUnit.MINUTES));
     }
 
+    /**
+     * Verify that the target state (started, paused, stopped) of a connector 
can be updated, with
+     * an emphasis on ensuring that the transitions between each state are 
correct.
+     * <p>
+     * The transitions we need to cover are:
+     * <ol>
+     *     <li>RUNNING -> PAUSED</li>
+     *     <li>RUNNING -> STOPPED</li>
+     *     <li>PAUSED -> RUNNING</li>
+     *     <li>PAUSED -> STOPPED</li>
+     *     <li>STOPPED -> RUNNING</li>
+     *     <li>STOPPED -> PAUSED</li>
+     * </ol>
+     * With some reordering, we can perform each transition just once:
+     * <ul>
+     *     <li>Start with RUNNING</li>
+     *     <li>Transition to STOPPED (2)</li>
+     *     <li>Transition to RUNNING (5)</li>
+     *     <li>Transition to PAUSED (1)</li>
+     *     <li>Transition to STOPPED (4)</li>
+     *     <li>Transition to PAUSED (6)</li>
+     *     <li>Transition to RUNNING (3)</li>
+     * </ul>

Review Comment:
   Nice!



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java:
##########
@@ -291,9 +293,21 @@ protected boolean awaitUnpause() throws 
InterruptedException {
 
     public void transitionTo(TargetState state) {
         synchronized (this) {
-            // ignore the state change if we are stopping
-            if (stopping)
+            // Ignore the state change if we are stopping.

Review Comment:
   Is there any particular reason we're trying to handle a case that shouldn't 
arise here in the `WorkerTask`? I'm just curious; is it to minimize the impact 
of potential future bugs in implementation? If so, wouldn't this just help mask 
the hypothetical bug for longer?



##########
gradle/spotbugs-exclude.xml:
##########
@@ -303,7 +303,7 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
         <Class name="org.apache.kafka.connect.runtime.WorkerConnector"/>
         <Or>
             <Method name="doStart"/>
-            <Method name="pause"/>

Review Comment:
   The comment above seems to indicate that this is for switch statement 
fallthrough. Your changes refactored it to use if else statements instead, so I 
guess this can be removed?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -221,28 +223,44 @@ public boolean isRunning() {
     }
 
     @SuppressWarnings("fallthrough")

Review Comment:
   ```suggestion
   ```
   I guess this can be removed now?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -221,28 +223,44 @@ public boolean isRunning() {
     }
 
     @SuppressWarnings("fallthrough")
-    private void pause() {
+    private void stop(boolean paused) {
+        State newState = paused ? State.PAUSED : State.STOPPED;
         try {
-            switch (state) {
-                case STOPPED:
-                    return;
+            if ((state == State.STOPPED || state == State.PAUSED) && state == 
newState) {
+                // Already in the desired state
+                return;
+            }
 
-                case STARTED:
-                    connector.stop();
-                    // fall through
+            if (state == State.STARTED) {
+                connector.stop();
+            }
 
-                case INIT:
-                    statusListener.onPause(connName);
-                    this.state = State.STOPPED;
-                    break;
+            if (state == State.FAILED && newState != State.STOPPED) {
+                throw new IllegalArgumentException("Cannot transition to 
non-stopped state when connector has already failed");
+            }
 
-                default:
-                    throw new IllegalArgumentException("Cannot pause connector 
in state " + state);
+            if (paused) {
+                statusListener.onPause(connName);
+            } else {
+                statusListener.onStop(connName);
             }
+
+            this.state = newState;
         } catch (Throwable t) {
-            log.error("{} Error while shutting down connector", this, t);
-            statusListener.onFailure(connName, t);
-            this.state = State.FAILED;
+            log.error("{} Error while {} connector", this, paused ? "pausing" 
: "stopping", t);
+            if (paused) {
+                statusListener.onFailure(connName, t);
+                this.state = State.FAILED;
+            } else {
+                // We say the connector is STOPPED even if it fails at this 
point
+                this.state = State.STOPPED;
+                // One more try to make sure the status is updated correctly
+                try {
+                    statusListener.onStop(connName);
+                } catch (Throwable t2) {
+                    log.error("{} Error during failover attempt to stop 
connector", this, t2);
+                }

Review Comment:
   Can we use a `finally` block instead here?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1090,6 +1090,39 @@ public void putConnectorConfig(final String connName, 
final Map<String, String>
         );
     }
 
+    @Override
+    public void stopConnector(final String connName, final Callback<Void> 
callback) {
+        log.trace("Submitting request to transition connector {} to STOPPED 
state", connName);
+
+        addRequest(
+                () -> {
+                    refreshConfigSnapshot(workerSyncTimeoutMs);

Review Comment:
   Why do we need to read to the end of the config topic here? Shouldn't the 
regular reads in the tick loop be  good enough?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -274,6 +274,19 @@ public Response restartConnector(final 
@PathParam("connector") String connector,
         return Response.accepted().entity(stateInfo).build();
     }
 
+    @PUT
+    @Path("/{connector}/stop")
+    @Operation(summary = "Stop the specified connector",
+               description = "This operation is idempotent and has no effects 
if the connector is already stopped")
+    public void stopConnector(

Review Comment:
   This will lead to a `204 No Content` response (assuming no exceptions 
thrown). It might be more appropriate to return a `202 Accepted` response 
similar to the existing pause and resume APIs?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java:
##########
@@ -450,6 +480,35 @@ protected boolean checkConnectorAndTasksAreStopped(String 
connectorName) {
                 && info.tasks().stream().noneMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));
     }
 
+
+    /**
+     * Assert that a connector is in the stopped state and has no tasks.
+     *
+     * @param connectorName the connector name
+     * @param detailMessage the assertion message
+     * @throws InterruptedException
+     */
+    public void assertConnectorIsStopped(String connectorName, String 
detailMessage)

Review Comment:
   nit: `detailMessage` is always `"Connector did not stop in time"`. Do we 
really need it as a method parameter?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -424,6 +435,9 @@ private void updateConnectorTasks(String connName) {
         if (!worker.isRunning(connName)) {
             log.info("Skipping update of connector {} since it is not 
running", connName);
             return;
+        } else if (configState.targetState(connName) != TargetState.STARTED) {
+            log.info("Skipping update of connector {} since its target state 
is {}", connName, configState.targetState(connName));

Review Comment:
   ```suggestion
               log.info("Skipping update of connector tasks for connector {} 
since its target state is {}", connName, configState.targetState(connName));
   ```
   nit: `Skipping update of connector` sounds ambiguous



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##########
@@ -553,6 +570,54 @@ public ActiveTopicsInfo connectorTopics(String 
connectorName) {
                 "Could not read connector state. Error response: " + 
responseToString(response));
     }
 
+    /**
+     * Get the info of a connector running in this cluster (retrieved via the 
<code>GET /connectors/{connector}</code> endpoint).
+
+     * @param connectorName name of the connector
+     * @return an instance of {@link ConnectorInfo} populated with state 
information of the connector and its tasks.
+     */
+    public ConnectorInfo connectorInfo(String connectorName) {
+        ObjectMapper mapper = new ObjectMapper();
+        String url = endpointForResource(String.format("connectors/%s", 
connectorName));
+        Response response = requestGet(url);
+        try {
+            if (response.getStatus() < 
Response.Status.BAD_REQUEST.getStatusCode()) {
+                return mapper.readValue(responseToString(response), 
ConnectorInfo.class);
+            }
+        } catch (IOException e) {
+            log.error("Could not read connector info from response: {}",
+                    responseToString(response), e);
+            throw new ConnectException("Could not not parse connector info", 
e);
+        }
+        throw new ConnectRestException(response.getStatus(),
+                "Could not read connector info. Error response: " + 
responseToString(response));
+    }
+
+    /**
+     * Get the task configs of a connector running in this cluster.
+
+     * @param connectorName name of the connector
+     * @return a map from task ID (connector name + "-" + task number) to task 
config
+     */
+    public Map<String, Map<String, String>> taskConfigs(String connectorName) {
+        ObjectMapper mapper = new ObjectMapper();
+        String url = 
endpointForResource(String.format("connectors/%s/tasks-config", connectorName));

Review Comment:
   I'm a little confused as to why we have a `GET 
/connectors/{connector}/tasks` as well as a `GET 
/connectors/{connector}/tasks-config` API? Looks like the only difference 
between them is that the former returns "raw" task configurations (i.e. before 
externalized secrets are replaced using config transformers) - in which case 
the names of the two APIs don't really help distinguish between them much (same 
with the Herder methods - `Herder::taskConfigs` and `Herder.tasksConfig`).



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -325,6 +325,181 @@ public void 
testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce
         assertTrue("Connector and all tasks were not stopped in time", 
stopCounter.await(1, TimeUnit.MINUTES));
     }
 
+    /**
+     * Verify that the target state (started, paused, stopped) of a connector 
can be updated, with
+     * an emphasis on ensuring that the transitions between each state are 
correct.
+     * <p>
+     * The transitions we need to cover are:
+     * <ol>
+     *     <li>RUNNING -> PAUSED</li>
+     *     <li>RUNNING -> STOPPED</li>
+     *     <li>PAUSED -> RUNNING</li>
+     *     <li>PAUSED -> STOPPED</li>
+     *     <li>STOPPED -> RUNNING</li>
+     *     <li>STOPPED -> PAUSED</li>
+     * </ol>
+     * With some reordering, we can perform each transition just once:
+     * <ul>
+     *     <li>Start with RUNNING</li>
+     *     <li>Transition to STOPPED (2)</li>
+     *     <li>Transition to RUNNING (5)</li>
+     *     <li>Transition to PAUSED (1)</li>
+     *     <li>Transition to STOPPED (4)</li>
+     *     <li>Transition to PAUSED (6)</li>
+     *     <li>Transition to RUNNING (3)</li>
+     * </ul>
+     */
+    @Test
+    public void testPauseStopResume() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
+
+        // Want to make sure to use multiple tasks
+        final int numTasks = 4;
+        Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+        props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks));
+
+        // Start with RUNNING
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not start in time"
+        );
+
+        // Transition to STOPPED
+        connect.stopConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Transition to RUNNING
+        connect.resumeConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not resume in time"
+        );
+
+        // Transition to PAUSED
+        connect.pauseConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.pauseConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector did not pause in time"
+        );
+
+        // Transition to STOPPED
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Transition to PAUSED
+        connect.pauseConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+                CONNECTOR_NAME,
+                0,
+                "Connector did not pause in time"
+        );
+
+        // Transition to RUNNING
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not resume in time"
+        );
+
+        // Delete the connector
+        connect.deleteConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndTasksAreNotRunning(

Review Comment:
   `assertConnectorAndTasksAreNotRunning` asserts whether the status endpoint 
returns a 404 response **OR** the connector and tasks are all have a 
non-running status. Should we add a stronger assertion for deletion (perhaps 
just checking if the status endpoint returns a 404 response)?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -325,6 +325,181 @@ public void 
testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce
         assertTrue("Connector and all tasks were not stopped in time", 
stopCounter.await(1, TimeUnit.MINUTES));
     }
 
+    /**
+     * Verify that the target state (started, paused, stopped) of a connector 
can be updated, with
+     * an emphasis on ensuring that the transitions between each state are 
correct.
+     * <p>
+     * The transitions we need to cover are:
+     * <ol>
+     *     <li>RUNNING -> PAUSED</li>
+     *     <li>RUNNING -> STOPPED</li>
+     *     <li>PAUSED -> RUNNING</li>
+     *     <li>PAUSED -> STOPPED</li>
+     *     <li>STOPPED -> RUNNING</li>
+     *     <li>STOPPED -> PAUSED</li>
+     * </ol>
+     * With some reordering, we can perform each transition just once:
+     * <ul>
+     *     <li>Start with RUNNING</li>
+     *     <li>Transition to STOPPED (2)</li>
+     *     <li>Transition to RUNNING (5)</li>
+     *     <li>Transition to PAUSED (1)</li>
+     *     <li>Transition to STOPPED (4)</li>
+     *     <li>Transition to PAUSED (6)</li>
+     *     <li>Transition to RUNNING (3)</li>
+     * </ul>
+     */
+    @Test
+    public void testPauseStopResume() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
+
+        // Want to make sure to use multiple tasks
+        final int numTasks = 4;
+        Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+        props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks));

Review Comment:
   > Want to make sure to use multiple tasks
   
   `tasks.max` is already set to 4 in `defaultSourceConnectorProps` though?
   
   Or did you mean something like even if we wanted to change the default max 
tasks for other tests in this class, this one shouldn't be touched?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -325,6 +325,181 @@ public void 
testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce
         assertTrue("Connector and all tasks were not stopped in time", 
stopCounter.await(1, TimeUnit.MINUTES));
     }
 
+    /**
+     * Verify that the target state (started, paused, stopped) of a connector 
can be updated, with
+     * an emphasis on ensuring that the transitions between each state are 
correct.
+     * <p>
+     * The transitions we need to cover are:
+     * <ol>
+     *     <li>RUNNING -> PAUSED</li>
+     *     <li>RUNNING -> STOPPED</li>
+     *     <li>PAUSED -> RUNNING</li>
+     *     <li>PAUSED -> STOPPED</li>
+     *     <li>STOPPED -> RUNNING</li>
+     *     <li>STOPPED -> PAUSED</li>
+     * </ol>
+     * With some reordering, we can perform each transition just once:
+     * <ul>
+     *     <li>Start with RUNNING</li>
+     *     <li>Transition to STOPPED (2)</li>
+     *     <li>Transition to RUNNING (5)</li>
+     *     <li>Transition to PAUSED (1)</li>
+     *     <li>Transition to STOPPED (4)</li>
+     *     <li>Transition to PAUSED (6)</li>
+     *     <li>Transition to RUNNING (3)</li>
+     * </ul>
+     */
+    @Test
+    public void testPauseStopResume() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
+
+        // Want to make sure to use multiple tasks
+        final int numTasks = 4;
+        Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+        props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks));
+
+        // Start with RUNNING
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not start in time"
+        );
+
+        // Transition to STOPPED
+        connect.stopConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Transition to RUNNING
+        connect.resumeConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not resume in time"
+        );
+
+        // Transition to PAUSED
+        connect.pauseConnector(CONNECTOR_NAME);
+        // Issue a second request to ensure that this operation is idempotent
+        connect.pauseConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector did not pause in time"
+        );
+
+        // Transition to STOPPED
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Transition to PAUSED
+        connect.pauseConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
+                CONNECTOR_NAME,
+                0,
+                "Connector did not pause in time"
+        );
+
+        // Transition to RUNNING
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                numTasks,
+                "Connector tasks did not resume in time"
+        );
+
+        // Delete the connector
+        connect.deleteConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndTasksAreNotRunning(
+                CONNECTOR_NAME,
+                "Connector tasks were not destroyed in time"
+        );
+    }
+
+    /**
+     * Test out the {@code STOPPED} state introduced in
+     * <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Newtargetstate:STOPPED";>KIP-875</a>,
+     * with an emphasis on correctly handling errors thrown from the connector.
+     */
+    @Test
+    public void testStoppedState() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
+
+        Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
+        // Fail the connector on startup
+        props.put("connector.start.inject.error", "true");
+
+        // Start the connector (should fail immediately and generate no tasks)
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
+                CONNECTOR_NAME,
+                0,
+                "Connector should have failed and not generated any tasks"
+        );
+
+        // Stopping a failed connector updates its state to STOPPED in the 
REST API
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Can resume a connector after its Connector has failed before 
shutdown after receiving a stop request
+        props.remove("connector.start.inject.error");
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.resumeConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                NUM_TASKS,
+                "Connector or tasks did not start running healthily in time"
+        );
+
+        // Fail the connector on shutdown
+        props.put("connector.stop.inject.error", "true");
+        // Stopping a connector that fails during shutdown after receiving a 
stop request updates its state to STOPPED in the REST API
+        connect.configureConnector(CONNECTOR_NAME, props);
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(
+                CONNECTOR_NAME,
+                "Connector did not stop in time"
+        );
+
+        // Can resume a connector after is Connector has failed during 
shutdown after receiving a stop request

Review Comment:
   ```suggestion
           // Can resume a connector after its Connector has failed during 
shutdown after receiving a stop request
   ```
   Did you mean to differentiate between a logical "connector" (Connector + 
Tasks)  and a "Connector" instance?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to