C0urante commented on code in PR #13284:
URL: https://github.com/apache/kafka/pull/13284#discussion_r1123546376


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##########
@@ -322,6 +324,29 @@ public void run() {
         }
     }
 
+    /**
+     * Validates that the underlying connectors and tasks are running.
+     * Visible for testing purpose.
+     */
+    boolean isConfiguredAndRunning() {
+        return CONNECTOR_CLASSES.stream().allMatch(connectorClazz -> {
+            final String connName = connectorClazz.getSimpleName();
+            return herderPairs.stream().allMatch(sourceAndTarget -> {
+                final ConnectorStateInfo connectorStatus = 
this.connectorStatus(sourceAndTarget, connName);
+                return connectorStatus != null
+                    // verify that connector state is set to running
+                    && 
connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+                    // verify that all tasks are set to running
+                    && connectorStatus.tasks().stream().allMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));

Review Comment:
   This doesn't actually verify that tasks are running (i.e., there could be an 
empty set of tasks for a connector and this condition would still be true).
   
   We probably want to either 1) require that at least one task exists or 2) 
require the user to specify the expected number of tasks for each connector, in 
addition to this check (which ensures that every task that does exist is in the 
`RUNNING` state).



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##########
@@ -322,6 +324,29 @@ public void run() {
         }
     }
 
+    /**
+     * Validates that the underlying connectors and tasks are running.
+     * Visible for testing purpose.
+     */
+    boolean isConfiguredAndRunning() {
+        return CONNECTOR_CLASSES.stream().allMatch(connectorClazz -> {
+            final String connName = connectorClazz.getSimpleName();
+            return herderPairs.stream().allMatch(sourceAndTarget -> {
+                final ConnectorStateInfo connectorStatus = 
this.connectorStatus(sourceAndTarget, connName);
+                return connectorStatus != null
+                    // verify that connector state is set to running
+                    && 
connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+                    // verify that all tasks are set to running
+                    && connectorStatus.tasks().stream().allMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));
+            });
+        });
+    }

Review Comment:
   We should expose the smallest testing-only APIs necessary in non-testing 
code, and implement other logic in testing-only places.
   
   So in this case, the API we'd expose here would be the `connectorStatus` 
method, and the logic in this method would go in, e.g., the 
`DedicatedMirrorIntegrationTest` suite.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java:
##########
@@ -62,8 +60,10 @@ public void setup() {
     @AfterEach
     public void teardown() throws Throwable {
         AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
-        mirrorMakers.forEach((name, mirrorMaker) ->
-            Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + 
name + "'", shutdownFailure)
+        mirrorMakers.forEach((name, mirrorMaker) -> {
+                Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + 
name + "'", shutdownFailure);
+                mirrorMaker.awaitStop();
+            }

Review Comment:
   We can optimize a little here, which will hopefully reduce the impact this 
change has on test time when running multiple MM2 nodes:
   ```suggestion
           mirrorMakers.forEach((name, mirrorMaker) -> {
                   Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" 
+ name + "'", shutdownFailure);
               }
           );
           mirrorMakers.forEach((name, mirrorMaker) -> {
                       mirrorMaker.awaitStop();
               }
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java:
##########
@@ -322,6 +324,29 @@ public void run() {
         }
     }
 
+    /**
+     * Validates that the underlying connectors and tasks are running.
+     * Visible for testing purpose.
+     */
+    boolean isConfiguredAndRunning() {
+        return CONNECTOR_CLASSES.stream().allMatch(connectorClazz -> {
+            final String connName = connectorClazz.getSimpleName();
+            return herderPairs.stream().allMatch(sourceAndTarget -> {
+                final ConnectorStateInfo connectorStatus = 
this.connectorStatus(sourceAndTarget, connName);
+                return connectorStatus != null
+                    // verify that connector state is set to running
+                    && 
connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+                    // verify that all tasks are set to running
+                    && connectorStatus.tasks().stream().allMatch(s -> 
s.state().equals(AbstractStatus.State.RUNNING.toString()));
+            });
+        });
+    }
+
+    private ConnectorStateInfo connectorStatus(SourceAndTarget 
sourceAndTarget, String connector) {
+        checkHerder(sourceAndTarget);
+        return herders.get(sourceAndTarget).connectorStatus(connector);
+    }

Review Comment:
   This can be made `public` and we can move all other assertion logic around 
the status of MM2 into testing code.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/DedicatedMirrorIntegrationTest.java:
##########
@@ -14,14 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.connect.mirror.integration;

Review Comment:
   This move isn't necessary; we can and should keep this test suite in the 
`integration` package.
   
   It's fine to add `public` methods to the `MirrorMaker` class since that 
class is not part of the public API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to