This is an automated email from the ASF dual-hosted git repository.

zchovan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 4179b6de569a996e5292622ab1cbcb5b787f5930
Author: Marton Greber <[email protected]>
AuthorDate: Sun Oct 26 13:08:20 2025 +0100

    KUDU-3662: Add stress test for checkpoint recovery
    
    Add TestReplicationCheckpointStress to verify data durability under
    extreme failure conditions. The test aggressively triggers the
    checkpoint race condition by:
    
    - Creating table with multiple partitions (high split count)
    - Inserting data continuously across partitions
    - Repeatedly crashing and restoring from checkpoints mid-replication
    - Verifying no data loss after multiple crash/recovery cycles
    
    Test validates that the checkpoint race condition fix in
    MetricWrappedKuduEnumerator correctly handles:
    - Splits finishing between snapshot and checkpoint completion
    - Multiple concurrent splits in various states (active, buffered)
    - At-least-once semantics with idempotent UPSERT sink operations
    
    Test stability: Timeouts carefully tuned based on checkpoint and
    discovery intervals to avoid flakiness. Verified by running 1024
    iterations on dist-test with debug build - no flakiness detected.
    
    Change-Id: Ic463c143a5330bf8db7c5b026a8b4dbe83b7a769
    Reviewed-on: http://gerrit.cloudera.org:8080/23608
    Tested-by: Marton Greber <[email protected]>
    Reviewed-by: Zoltan Chovan <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 .../TestReplicationCheckpointStress.java           | 270 +++++++++++++++++++++
 1 file changed, 270 insertions(+)

diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationCheckpointStress.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationCheckpointStress.java
new file mode 100644
index 000000000..f84fd0837
--- /dev/null
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationCheckpointStress.java
@@ -0,0 +1,270 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication;
+
+import static org.apache.kudu.test.ClientTestUtil.countRowsInTable;
+import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connector.kudu.connector.writer.KuduWriterConfig;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduScanToken;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.SessionConfiguration;
+
+/**
+ * Stress test for checkpoint race condition (FLINK-38575) fixes in wrapped 
source connector.
+ *
+ * This test is designed to aggressively trigger the checkpoint race condition 
by:
+ * - Creating 32 tablets (8 × 4 hash partitions) for maximum split count
+ * - Inserting 100 rows into EACH partition per iteration (3200 
rows/iteration, 32 splits)
+ * - Using very aggressive checkpointing (1 second interval vs typical 30-60s)
+ * - Configuring small split sizes (1KB) to generate more split events
+ * - Performing 10 crash/restore cycles with hard cancels
+ * - Comparing source vs sink row counts after each iteration
+ *
+ * Uses production UPSERT semantics (via CustomReplicationOperationMapper).
+ * This allows the test to focus on detecting MISSING data (the actual symptom
+ * of the race condition) by comparing source and sink row counts.
+ *
+ * Expected Behavior:
+ * - Without fix: Test fails within 5-10 iterations with sink rows < source 
rows
+ * - With fix: Source and sink remain identical across all 10 iterations
+ */
+public class TestReplicationCheckpointStress extends ReplicationTestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestReplicationCheckpointStress.class);
+
+  private static final int NUM_ITERATIONS = 10;
+  private static final int NUM_PARTITIONS = 32;
+  private static final int ROWS_PER_PARTITION = 100;
+  private static final int ROWS_PER_ITERATION = NUM_PARTITIONS * 
ROWS_PER_PARTITION;
+  private static final int CHECKPOINT_INTERVAL_MS = 1000;
+
+  @ClassRule
+  public static final MiniClusterWithClientResource flinkCluster =
+          new MiniClusterWithClientResource(
+                  new MiniClusterResourceConfiguration.Builder()
+                          .setNumberSlotsPerTaskManager(8)
+                          .setNumberTaskManagers(1)
+                          .setConfiguration(new Configuration())
+                          .build());
+
+  private ClusterClient<?> clusterClient;
+
+  @After
+  public void cleanup() {
+    flinkCluster.cancelAllJobs();
+  }
+
+  @Before
+  public void setUp() {
+    clusterClient = flinkCluster.getClusterClient();
+  }
+
+  @Override
+  protected ReplicationJobConfig.Builder createDefaultJobConfigBuilder() {
+    return super.createDefaultJobConfigBuilder()
+            .setCheckpointingIntervalMillis(CHECKPOINT_INTERVAL_MS);
+  }
+
+  @Override
+  protected KuduReaderConfig createDefaultReaderConfig() {
+    return KuduReaderConfig.Builder
+            .setMasters(sourceHarness.getMasterAddressesAsString())
+            .setSplitSizeBytes(1024)         // 1KB - tiny splits to force one 
per tablet
+            .setBatchSizeBytes(65536)        // 64KB - higher throughput per 
split
+            .build();
+  }
+
+  /**
+   * Creates a table with aggressive partitioning to generate many splits.
+   * Uses hash partitioning on two columns: 8 buckets × 4 buckets = 32 tablets.
+   * This means every scan (full or incremental) generates 32 split events.
+   */
+  private void createStressTestTable() throws Exception {
+    Schema schema = new Schema(Arrays.asList(
+        new ColumnSchema.ColumnSchemaBuilder("id", Type.INT64)
+            .key(true).build(),
+        new ColumnSchema.ColumnSchemaBuilder("partition_key", Type.INT32)
+            .key(true).build(),
+        new ColumnSchema.ColumnSchemaBuilder("iteration", Type.INT32)
+            .build(),
+        new ColumnSchema.ColumnSchemaBuilder("data", Type.STRING)
+            .build(),
+        new ColumnSchema.ColumnSchemaBuilder("insert_time", 
Type.UNIXTIME_MICROS)
+            .build()
+    ));
+
+    CreateTableOptions options = new CreateTableOptions()
+        .addHashPartitions(Arrays.asList("id"), 8)
+        .addHashPartitions(Arrays.asList("partition_key"), 4)
+        .setNumReplicas(1);
+
+    sourceClient.createTable(TABLE_NAME, schema, options);
+    sinkClient.createTable(TABLE_NAME, schema, options);
+  }
+
+  private void insertStressTestData(int iteration) throws Exception {
+    KuduTable table = sourceClient.openTable(TABLE_NAME);
+    KuduSession session = sourceClient.newSession();
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+
+    long baseId = (long) iteration * ROWS_PER_ITERATION;
+    int rowIndex = 0;
+
+    for (int partition = 0; partition < NUM_PARTITIONS; partition++) {
+      for (int i = 0; i < ROWS_PER_PARTITION; i++) {
+        Insert insert = table.newInsert();
+        PartialRow row = insert.getRow();
+        row.addLong("id", baseId + rowIndex);
+        row.addInt("partition_key", partition);
+        row.addInt("iteration", iteration);
+        row.addString("data", String.format("stress_iter%d_part%d_row%d",
+                                            iteration, partition, i));
+        row.addLong("insert_time", System.currentTimeMillis() * 1000);
+        session.apply(insert);
+        rowIndex++;
+      }
+    }
+
+    session.flush();
+    session.close();
+  }
+
+
+  @Test(timeout = 600000)
+  public void testCheckpointRestartStressWithManySmallSplits() throws 
Exception {
+    createStressTestTable();
+
+    ReplicationJobConfig config = createDefaultJobConfig();
+    KuduReaderConfig readerConfig = createDefaultReaderConfig();
+    KuduWriterConfig writerConfig = createDefaultWriterConfig();
+
+    KuduTable sourceTable = sourceClient.openTable(TABLE_NAME);
+    KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+
+    List<KuduScanToken> tokens = 
sourceClient.newScanTokenBuilder(sourceTable).build();
+    assertEquals("Should have 32 tablets (8x4 hash partitions)", 
NUM_PARTITIONS, tokens.size());
+
+    JobID currentJobId = null;
+    int expectedTotalRows = 0;
+
+    for (int iteration = 0; iteration < NUM_ITERATIONS; iteration++) {
+      LOG.info("Stress iteration {}/{}", iteration + 1, NUM_ITERATIONS);
+
+      // Step 1: Insert new batch of data
+      insertStressTestData(iteration);
+      expectedTotalRows += ROWS_PER_ITERATION;
+
+      // Step 2: Start or restore job
+      if (iteration == 0) {
+        ReplicationEnvProvider envProvider = new ReplicationEnvProvider(
+                config, readerConfig, writerConfig);
+        currentJobId = envProvider.getEnv().executeAsync().getJobID();
+      } else {
+        Path checkpoint = findLatestCheckpoint(checkpointDir, currentJobId);
+
+        ReplicationEnvProvider envProvider = new ReplicationEnvProvider(
+                config, readerConfig, writerConfig);
+        JobGraph jobGraph = 
envProvider.getEnv().getStreamGraph().getJobGraph();
+        jobGraph.setSavepointRestoreSettings(
+                SavepointRestoreSettings.forPath(checkpoint.toString(), 
false));
+
+        currentJobId = clusterClient.submitJob(jobGraph).get();
+      }
+
+      // Step 3: Wait for job to start processing, then wait for checkpoint
+      // For stress testing, we don't need all data replicated - just need the 
job
+      // to be actively processing so checkpoints trigger. Wait for a small 
amount
+      // of data to appear (proof job is running), then wait for checkpoint.
+      final int minRowsForCheckpoint = Math.min(ROWS_PER_PARTITION, 
expectedTotalRows);
+      assertEventuallyTrue(
+          String.format("[%d] Wait for job to start processing (>= %d rows)",
+              iteration, minRowsForCheckpoint),
+          () -> countRowsInTable(sinkTable) >= minRowsForCheckpoint,
+          30000);
+
+      // Now wait for at least ONE checkpoint to complete for this specific 
job.
+      // This ensures we have a restore point while still crashing 
mid-replication.
+      waitForCheckpointCompletion(checkpointDir, currentJobId, 15000);
+
+      // Step 4: Hard cancel immediately after checkpoint (mid-replication 
crash)
+      clusterClient.cancel(currentJobId).get();
+      waitForJobTermination(currentJobId, clusterClient, 15000);
+    }
+
+    LOG.info("All {} crash cycles complete - starting final recovery", 
NUM_ITERATIONS);
+
+    // Final restore: Let the job recover and complete all pending replication
+    Path finalCheckpoint = findLatestCheckpoint(checkpointDir, currentJobId);
+    ReplicationEnvProvider finalEnvProvider = new ReplicationEnvProvider(
+            config, readerConfig, writerConfig);
+    JobGraph finalJobGraph = 
finalEnvProvider.getEnv().getStreamGraph().getJobGraph();
+    finalJobGraph.setSavepointRestoreSettings(
+            SavepointRestoreSettings.forPath(finalCheckpoint.toString(), 
false));
+
+    final JobID finalJobId = clusterClient.submitJob(finalJobGraph).get();
+
+    // Wait for all data to eventually replicate (with UPSERT semantics 
handling duplicates)
+    final int finalExpectedRows = expectedTotalRows;
+    assertEventuallyTrue(
+        String.format("All %d rows should eventually replicate after recovery",
+            finalExpectedRows),
+        () -> countRowsInTable(sinkTable) >= finalExpectedRows,
+        180000);
+
+    // Final verification: source vs sink must match perfectly
+    long finalSourceCount = countRowsInTable(sourceTable);
+    long finalSinkCount = countRowsInTable(sinkTable);
+
+    assertEquals("Source rows must match expected after " + NUM_ITERATIONS + " 
iterations",
+        expectedTotalRows, finalSourceCount);
+    assertEquals(
+        "Sink must match source (no data loss despite " + NUM_ITERATIONS + " 
hard crashes)",
+        finalSourceCount, finalSinkCount);
+
+    clusterClient.cancel(finalJobId).get();
+  }
+}
+

Reply via email to