gharris1727 commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1268624984


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -933,12 +938,94 @@ protected static void 
waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster conne
         }
     }
 
-    private static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List<Class<? extends Connector>> connectorClasses)  {
+    protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List<Class<? extends Connector>> connectorClasses)  {
         for (Class<? extends Connector> connector : connectorClasses) {
             connectCluster.restartConnectorAndTasks(connector.getSimpleName(), 
false, true, false);
         }
     }
 
+    @SafeVarargs
+    protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, Class<? extends Connector>... connectorClasses) throws 
InterruptedException {
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            connectCluster.resumeConnector(connectorClass.getSimpleName());
+        }
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            String connectorName = connectorClass.getSimpleName();
+            
connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                    connectorName,
+                    1,
+                    "Connector '" + connectorName + "' and/or task did not 
resume in time"
+            );
+        }
+    }
+
+    protected static void 
alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, 
LongUnaryOperator alterOffset, String... topics) {

Review Comment:
   This also stops the connectors, which isn't described in the function name. 
Can you split this into two functions, or change the name to stopAndAlter.. etc?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -933,12 +938,94 @@ protected static void 
waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster conne
         }
     }
 
-    private static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List<Class<? extends Connector>> connectorClasses)  {
+    protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List<Class<? extends Connector>> connectorClasses)  {
         for (Class<? extends Connector> connector : connectorClasses) {
             connectCluster.restartConnectorAndTasks(connector.getSimpleName(), 
false, true, false);
         }
     }
 
+    @SafeVarargs
+    protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, Class<? extends Connector>... connectorClasses) throws 
InterruptedException {
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            connectCluster.resumeConnector(connectorClass.getSimpleName());
+        }
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            String connectorName = connectorClass.getSimpleName();
+            
connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                    connectorName,
+                    1,
+                    "Connector '" + connectorName + "' and/or task did not 
resume in time"
+            );
+        }
+    }
+
+    protected static void 
alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, 
LongUnaryOperator alterOffset, String... topics) {
+        Set<String> topicsSet = new HashSet<>(Arrays.asList(topics));
+
+        String connectorName = MirrorSourceConnector.class.getSimpleName();
+        connectCluster.stopConnector(connectorName);

Review Comment:
   Should this additionally wait for the connectors to stop? it appears that it 
just waits for the REST API to return a 200, which may complete before the 
tasks have stopped executing and committed offsets.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java:
##########
@@ -46,4 +51,49 @@ public void startClusters() throws Exception {
         super.startClusters();
     }
 
+    @Override
+    @Test
+    public void testReplication() throws Exception {
+        super.testReplication();
+
+        // Augment the base replication test case with some extra testing of 
the offset management
+        // API introduced in KIP-875
+        // We do this only when exactly-once support is enabled in order to 
avoid having to worry about
+        // zombie tasks producing duplicate records and/or writing stale 
offsets to the offsets topic
+
+        String backupTopic1 = remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS);
+        String backupTopic2 = remoteTopicName("test-topic-2", 
PRIMARY_CLUSTER_ALIAS);
+
+        // Explicitly move back to offset 0
+        // Note that the connector treats the offset as the last-consumed 
offset,
+        // so it will start reading the topic partition from offset 1 when it 
resumes
+        alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, 
"test-topic-1");
+        // Reset the offsets for test-topic-2
+        resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2");
+        resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class);
+
+        int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + 
((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS);
+        assertEquals(expectedRecordsTopic1, 
backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, 
backupTopic1).count(),
+                "Records were not re-replicated to backup cluster after 
altering offsets.");
+        int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2;
+        assertEquals(expectedRecordsTopic2, 
backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, 
backupTopic2).count(),
+                "New topic was not re-replicated to backup cluster after 
altering offsets.");
+
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        Class<? extends Connector>[] connectorsToReset = 
CONNECTOR_LIST.toArray(new Class[0]);
+        // Resetting the offsets for the heartbeat and checkpoint connectors 
doesn't have any effect
+        // on their behavior, but users may want to wipe offsets from them to 
prevent the offsets topic
+        // from growing infinitely. So, we include them in the list of 
connectors to reset as a sanity check

Review Comment:
   I think the currently proposed validation is reasonable, and I agree with 
the points raised above.
   I didn't even know that the Checkpoint and Heartbeat connectors emitted 
offsets in the first place, since the operation of the connector never reads 
them back and I never had any use to trace their data flow. I think the GET 
portion of the API will be the first time any user sees these offsets, and the 
PUT / alterOffsets methods will be the first time that they see how the (don't) 
affect the operation of the connectors.
   
   Since it is harmless to have the users change these offsets, the 
alterOffsets calls should guide users to well-formed inputs but not restrict 
them more than that. For the connector, the biggest impact will be keeping the 
offsets store clean in case a future extension wishes to use the offsets in a 
meaningful way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to