This is an automated email from the ASF dual-hosted git repository.

zchovan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit fefef37ed6dbc64e210a57eac4b1979a91651c75
Author: Marton Greber <[email protected]>
AuthorDate: Sun Oct 26 13:08:41 2025 +0100

    KUDU-3662: Fix race condition in Kudu replication
    
    Implement fix for data loss race condition where splits finishing
    between checkpoint snapshot and checkpoint completion are lost,
    causing records to be skipped on job restart.
    
    Race condition scenario (without fix):
    1. Split finishes, reader sends SplitFinishedEvent to enumerator
    2. Enumerator removes split from pending immediately
    3. Checkpoint snapshot is taken (split already removed from state)
    4. Job crashes before notifyCheckpointComplete
    5. On restore, split is missing from checkpoint -> data loss
    
    Solution:
    Wrap KuduSourceEnumerator to defer split removal until checkpoint
    completes, following Flink's CheckpointListener buffering contract [1].
    When SplitFinishedEvent arrives:
    - Delegate processes the event normally
    - Re-add finished split back to pending (if removed by delegate)
    - Track split ID in splitsPendingRemoval buffer
    - Only remove from pending in notifyCheckpointComplete()
    
    On restore from checkpoint:
    - Re-enqueue all restored pending splits to unassigned for replay
    - Clear pending to unblock timestamp advancement
    - Guarantees at-least-once processing (splits may be replayed)
    
    Stateless reader wrapper:
    To prevent duplicate split assignment issues on restore, wrap
    KuduSourceReader with StatelessKuduReaderWrapper that returns empty
    state on checkpoint. This ensures all split lifecycle management flows
    through the enumerator, eliminating potential crashes from duplicate
    split IDs in reader's internal tracking. Since the Kudu connector does
    not implement split progress tracking (splits are atomic scan tokens),
    there is no efficiency loss from re-reading splits on restore.
    
    Restored pending splits contain two types:
    1. Active splits being read when checkpoint was taken - must be replayed
    2. Buffered splits finished by reader but held in pending waiting for
       checkpoint to complete - records may be in-flight, must be replayed
    
    Cannot distinguish between types, so re-enqueue all. With stateless
    reader, this creates clean lifecycle without duplicates. Clearing
    pending after re-enqueue unblocks timestamp advancement immediately
    (within seconds as splits are reassigned, not minutes waiting for
    processing to complete).
    
    Additional changes:
    - Add StatelessKuduReaderWrapper to prevent duplicate split assignment
    - Add TestMetricWrappedKuduEnumerator verifying the fix and including
      canary test to detect upstream resolution of FLINK-38575
    - Add checkpoint configuration to ReplicationJobConfig with validation
      ensuring checkpointing interval < discovery interval
    - Add TestReplicationCheckpoint for end-to-end checkpoint recovery
    - Extend ReplicationTestBase with checkpoint test utilities
    
    [1]: https://github.com/apache/flink/blob/master/flink-core/src/main/
    java/org/apache/flink/api/common/state/CheckpointListener.java
    
    Change-Id: I605a914aaa86b1bdf47537a5b21789de27972add
    Reviewed-on: http://gerrit.cloudera.org:8080/23607
    Tested-by: Kudu Jenkins
    Reviewed-by: Zoltan Chovan <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 java/kudu-replication/build.gradle                 |   1 +
 .../kudu/replication/ReplicationConfigParser.java  |  14 +-
 .../kudu/replication/ReplicationEnvProvider.java   |  36 ++-
 .../kudu/replication/ReplicationJobConfig.java     |  56 ++++-
 .../wrappedsource/MetricWrappedKuduEnumerator.java | 151 +++++++++++-
 .../wrappedsource/MetricWrappedKuduSource.java     |  14 +-
 .../wrappedsource/StatelessKuduReaderWrapper.java  | 101 ++++++++
 .../kudu/replication/ReplicationTestBase.java      | 143 +++++++++++-
 .../TestMetricWrappedKuduEnumerator.java           | 260 +++++++++++++++++++++
 .../replication/TestReplicationCheckpoint.java     | 199 ++++++++++++++++
 .../replication/TestReplicationConfigParser.java   |  95 +++++++-
 .../kudu/replication/TestReplicationMetrics.java   |  10 +-
 .../TestReplicationTableInitializer.java           |   6 +-
 13 files changed, 1050 insertions(+), 36 deletions(-)

diff --git a/java/kudu-replication/build.gradle 
b/java/kudu-replication/build.gradle
index 962b49bfa..268fb7f21 100644
--- a/java/kudu-replication/build.gradle
+++ b/java/kudu-replication/build.gradle
@@ -42,6 +42,7 @@ dependencies {
 
     testImplementation libs.junit
     testImplementation libs.assertj
+    testImplementation libs.mockitoCore
     testImplementation project(path: ":kudu-test-utils", configuration: 
"shadow")
     testImplementation libs.log4jApi
     testImplementation libs.log4jCore
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationConfigParser.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationConfigParser.java
index 829b898db..7ba8f4f45 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationConfigParser.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationConfigParser.java
@@ -50,10 +50,15 @@ public class ReplicationConfigParser {
    *   <li>{@code job.tableSuffix} (optional) – suffix to append to the sink 
table name
    *   (default: empty string)</li>
    *   <li>{@code job.discoveryIntervalSeconds} (optional) – interval in 
seconds at which the source
-   *   tries to perform a diff scan to get new or changed data (default: 
300)</li>
+   *   tries to perform a diff scan to get new or changed data (default: 
600)</li>
    *   <li>{@code job.createTable} (optional) – whether to create the sink 
table if it not exists.
    *   This setting recreates the partition schema that is present on the 
source side.
    *   (default: false)</li>
+   *   <li>{@code job.checkpointingIntervalMillis} (optional) – interval in 
milliseconds at which
+   *   Flink takes checkpoints for fault tolerance (default: 60000).
+   *   Must be strictly less than discoveryIntervalSeconds to ensure timestamp 
progression.</li>
+   *   <li>{@code job.checkpointsDirectory} (required) – filesystem directory 
path where Flink
+   *   stores checkpoint data for recovery. Required for production fault 
tolerance.</li>
    * </ul>
    *
    * @param params the Flink {@code ParameterTool} containing command-line 
parameters
@@ -64,7 +69,8 @@ public class ReplicationConfigParser {
     ReplicationJobConfig.Builder builder = ReplicationJobConfig.builder()
             
.setSourceMasterAddresses(params.getRequired("job.sourceMasterAddresses"))
             
.setSinkMasterAddresses(params.getRequired("job.sinkMasterAddresses"))
-            .setTableName(params.getRequired("job.tableName"));
+            .setTableName(params.getRequired("job.tableName"))
+            
.setCheckpointsDirectory(params.getRequired("job.checkpointsDirectory"));
 
     if (params.has("job.restoreOwner")) {
       builder.setRestoreOwner(params.getBoolean("job.restoreOwner"));
@@ -82,6 +88,10 @@ public class ReplicationConfigParser {
       builder.setCreateTable(params.getBoolean("job.createTable"));
     }
 
+    if (params.has("job.checkpointingIntervalMillis")) {
+      
builder.setCheckpointingIntervalMillis(params.getLong("job.checkpointingIntervalMillis"));
+    }
+
     return builder.build();
   }
 
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
index 923cdc9cb..3a5894531 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
@@ -21,6 +21,9 @@ import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.connector.kudu.connector.KuduTableInfo;
 import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig;
 import org.apache.flink.connector.kudu.connector.writer.KuduWriterConfig;
@@ -29,6 +32,8 @@ import org.apache.flink.connector.kudu.sink.KuduSinkBuilder;
 import org.apache.flink.connector.kudu.source.KuduSource;
 import 
org.apache.flink.connector.kudu.source.enumerator.KuduSourceEnumeratorState;
 import org.apache.flink.connector.kudu.source.split.KuduSourceSplit;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.types.Row;
 
@@ -42,13 +47,22 @@ public class ReplicationEnvProvider {
   private final ReplicationJobConfig jobConfig;
   private final KuduReaderConfig readerConfig;
   private final KuduWriterConfig writerConfig;
+  private final boolean enableFailFast;
 
   ReplicationEnvProvider(ReplicationJobConfig jobConfig,
                          KuduReaderConfig readerConfig,
-                         KuduWriterConfig writerConfig) {
+                         KuduWriterConfig writerConfig,
+                         boolean enableFailFast) {
     this.jobConfig = jobConfig;
     this.readerConfig = readerConfig;
     this.writerConfig = writerConfig;
+    this.enableFailFast = enableFailFast;
+  }
+
+  ReplicationEnvProvider(ReplicationJobConfig jobConfig,
+                         KuduReaderConfig readerConfig,
+                         KuduWriterConfig writerConfig) {
+    this(jobConfig, readerConfig, writerConfig, false);
   }
 
   /**
@@ -67,6 +81,26 @@ public class ReplicationEnvProvider {
 
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
+    env.enableCheckpointing(jobConfig.getCheckpointingIntervalMillis());
+    // Use AT_LEAST_ONCE mode since the replication job uses UPSERT semantics 
which are
+    // idempotent. This avoids the overhead of checkpoint barrier alignment 
while still
+    // providing fault tolerance.
+    
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
+    env.getCheckpointConfig().setExternalizedCheckpointCleanup(
+            
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+    // Configure filesystem-based checkpoint storage for production use.
+    // Checkpoints are required to be persisted to a filesystem path for fault 
tolerance.
+    Configuration config = new Configuration();
+    config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+    config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
jobConfig.getCheckpointsDirectory());
+    if (enableFailFast) {
+      config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+    }
+    env.configure(config);
+
+
     KuduSource<Row> kuduSource = KuduSource.<Row>builder()
             .setReaderConfig(readerConfig)
             .setTableInfo(KuduTableInfo.forTable(jobConfig.getTableName()))
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
index 7b785554e..150cb4742 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
@@ -38,6 +38,8 @@ public class ReplicationJobConfig implements Serializable {
   private final String tableSuffix;
   private final long discoveryIntervalSeconds;
   private final boolean createTable;
+  private final long checkpointingIntervalMillis;
+  private final String checkpointsDirectory;
 
   private ReplicationJobConfig(
           String sourceMasterAddresses,
@@ -46,16 +48,21 @@ public class ReplicationJobConfig implements Serializable {
           boolean restoreOwner,
           String tableSuffix,
           long discoveryIntervalSeconds,
-          boolean createTable) {
+          boolean createTable,
+          long checkpointingIntervalMillis,
+          String checkpointsDirectory) {
     this.sourceMasterAddresses = checkNotNull(sourceMasterAddresses,
       "sourceMasterAddresses cannot be null");
     this.sinkMasterAddresses = checkNotNull(sinkMasterAddresses,
       "sinkMasterAddresses cannot be null");
     this.tableName = checkNotNull(tableName, "tableName cannot be null");
+    this.checkpointsDirectory = checkNotNull(checkpointsDirectory,
+      "checkpointsDirectory cannot be null - filesystem checkpoint storage is 
required");
     this.restoreOwner = restoreOwner;
     this.tableSuffix = tableSuffix != null ? tableSuffix : "";
     this.discoveryIntervalSeconds = discoveryIntervalSeconds;
     this.createTable = createTable;
+    this.checkpointingIntervalMillis = checkpointingIntervalMillis;
   }
 
   public String getSourceMasterAddresses() {
@@ -90,6 +97,14 @@ public class ReplicationJobConfig implements Serializable {
     return tableName + tableSuffix;
   }
 
+  public long getCheckpointingIntervalMillis() {
+    return checkpointingIntervalMillis;
+  }
+
+  public String getCheckpointsDirectory() {
+    return checkpointsDirectory;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -103,9 +118,14 @@ public class ReplicationJobConfig implements Serializable {
     private boolean restoreOwner = false;
     // By default, there is no tableSuffix for the replicated table.
     private String tableSuffix = "";
-    // The default discover interval is 5 minutes.
-    private long discoveryIntervalSeconds = 5 * 60;
+    // The default discovery interval is 10 minutes.
+    private long discoveryIntervalSeconds = 10 * 60;
     private boolean createTable = false;
+    // The default checkpointing interval is 1 minute.
+    // This ensures checkpoints complete frequently enough to advance 
lastEndTimestamp and
+    // keep diff scan windows small (typically ~1 minute of data per discovery 
cycle).
+    private long checkpointingIntervalMillis = 60 * 1000;
+    private String checkpointsDirectory;
 
     public Builder setSourceMasterAddresses(String sourceMasterAddresses) {
       this.sourceMasterAddresses = sourceMasterAddresses;
@@ -142,7 +162,33 @@ public class ReplicationJobConfig implements Serializable {
       return this;
     }
 
+    public Builder setCheckpointingIntervalMillis(long 
checkpointingIntervalMillis) {
+      this.checkpointingIntervalMillis = checkpointingIntervalMillis;
+      return this;
+    }
+
+    public Builder setCheckpointsDirectory(String checkpointsDirectory) {
+      this.checkpointsDirectory = checkpointsDirectory;
+      return this;
+    }
+
     public ReplicationJobConfig build() {
+      // Validate: checkpointing interval must be < discovery interval
+      // The enumerator advances lastEndTimestamp (which defines the diff scan 
window)
+      // only after a checkpoint completes. Discovery cycles are triggered 
periodically
+      // at the discovery interval. If checkpoint interval >= discovery 
interval, there's
+      // a risk that discovery cycles will attempt to enumerate splits before 
the timestamp
+      // advances, causing redundant scans of the same time range.
+      long discoveryIntervalMillis = discoveryIntervalSeconds * 1000;
+      if (checkpointingIntervalMillis >= discoveryIntervalMillis) {
+        throw new IllegalArgumentException(String.format(
+            "Checkpointing interval (%d ms) must be < discovery interval (%d 
ms). " +
+            "Timestamp advancement happens after checkpoint completion, so 
checkpoint " +
+            "interval must be shorter to ensure the timestamp advances before 
the next " +
+            "discovery cycle.",
+            checkpointingIntervalMillis, discoveryIntervalMillis));
+      }
+
       return new ReplicationJobConfig(
               sourceMasterAddresses,
               sinkMasterAddresses,
@@ -150,7 +196,9 @@ public class ReplicationJobConfig implements Serializable {
               restoreOwner,
               tableSuffix,
               discoveryIntervalSeconds,
-              createTable);
+              createTable,
+              checkpointingIntervalMillis,
+              checkpointsDirectory);
     }
   }
 
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java
index b92c8af4c..db17104a1 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java
@@ -17,7 +17,9 @@ package org.apache.kudu.replication.wrappedsource;
 
 import java.lang.reflect.Field;
 import java.time.Duration;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import javax.annotation.Nullable;
 
 import org.apache.flink.api.connector.source.Boundedness;
@@ -30,6 +32,8 @@ import 
org.apache.flink.connector.kudu.source.enumerator.KuduSourceEnumerator;
 import 
org.apache.flink.connector.kudu.source.enumerator.KuduSourceEnumeratorState;
 import org.apache.flink.connector.kudu.source.split.KuduSourceSplit;
 import org.apache.flink.metrics.Gauge;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.util.HybridTimeUtil;
 
@@ -39,17 +43,33 @@ import org.apache.kudu.util.HybridTimeUtil;
  * Uses the delegate pattern - all SplitEnumerator methods are delegated to 
the wrapped
  * KuduSourceEnumerator, while additionally exposing internal enumerator 
fields as metrics
  * via reflection to provide visibility into split assignment and timestamp 
tracking.
- * TODO(mgreber): remove this file once FLINK-38187 is resolved.
+ *
+ * Checkpoint Race Condition (FLINK-38575) Fix:
+ * This wrapper also implements critical fixes to prevent data loss during 
checkpointing.
+ * Without these fixes, records can be lost if the job crashes between split 
completion and
+ * checkpoint completion. The fixes ensure at-least-once delivery semantics by:
+ * 1. Deferring split removal from pending until checkpoint completes
+ * 2. Re-enqueueing restored pending splits on restore to guarantee 
reprocessing
+ *
+ * TODO(mgreber): remove this file once FLINK-38187 and FLINK-38575 are 
resolved.
+ *
  */
 public class MetricWrappedKuduEnumerator implements 
SplitEnumerator<KuduSourceSplit,
                                                                     
KuduSourceEnumeratorState> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetricWrappedKuduEnumerator.class);
+
   private final KuduSourceEnumerator delegate;
   private final SplitEnumeratorContext<KuduSourceSplit> context;
 
   private final Field lastEndTimestampField;
   private final Field pendingField;
   private final Field unassignedField;
+  // Lazily initialized (not final)
+  private Field finishedSplitsField;
+
+  // Buffer finished splits until checkpoint completes
+  private final Set<String> splitsPendingRemoval = new HashSet<>();
 
   public MetricWrappedKuduEnumerator(
           KuduTableInfo tableInfo,
@@ -75,17 +95,53 @@ public class MetricWrappedKuduEnumerator implements 
SplitEnumerator<KuduSourceSp
     this.pendingField = ReflectionSecurityUtils.getAccessibleField(delegate, 
"pending");
     this.unassignedField = 
ReflectionSecurityUtils.getAccessibleField(delegate, "unassigned");
 
+    if (restoredState != null) {
+      // Re-enqueue restored pending splits to guarantee reprocessing after 
failures.
+      //
+      // Since we use StatelessKuduReaderWrapper, readers don't checkpoint 
their splits.
+      // All split lifecycle management flows through the enumerator, which 
simplifies
+      // restoration and prevents duplicate split assignment issues.
+      //
+      // Restored pending contains two types of splits:
+      // 1. True pending: splits actively being read when checkpoint was taken.
+      //    Without reader state, these must be re-assigned from enumerator.
+      //
+      // 2. Buffered pending: splits finished by reader but kept in pending by 
our
+      //    race fix (handleSourceEvent intercepts SplitFinishedEvent and 
defers
+      //    removal until checkpoint completes). When checkpoint snapshot is 
taken,
+      //    these buffered splits are captured in the snapshot, but their 
records
+      //    may still be in-flight in the pipeline and not yet committed to 
Kudu.
+      //    Re-enqueueing ensures these splits are replayed on restore, 
preventing
+      //    data loss if the job crashes before checkpoint completes.
+      //
+      // Re-enqueueing all pending to unassigned ensures both types are 
replayed,
+      // maintaining at-least-once semantics with idempotent sink operations.
+      //
+      // Note: With StatelessKuduReaderWrapper, readers start empty on 
restore, so all
+      // splits flow through enumerator. We clear pending after re-enqueueing 
to unblock
+      // timestamp advancement immediately. When splits finish, the buffering 
logic in
+      // handleSourceEvent will re-add them to pending until checkpoint 
completes.
+      List<KuduSourceSplit> pending = getPendingSplits();
+      List<KuduSourceSplit> unassigned = getUnassignedSplits();
+      if (!pending.isEmpty()) {
+        LOG.info("Re-enqueueing {} restored pending splits to unassigned for 
reprocessing",
+                 pending.size());
+        unassigned.addAll(pending);
+        pending.clear();
+      }
+    }
   }
 
 
 
   @Override
   // Safe casts: lambdas return correct types for Gauge<Long> and 
Gauge<Integer>
-  @SuppressWarnings("unchecked")
   public void start() {
     context.metricGroup().gauge("lastEndTimestamp", (Gauge<Long>) 
this::getLastEndTimestamp);
     context.metricGroup().gauge("pendingCount", (Gauge<Integer>) 
this::getPendingCount);
     context.metricGroup().gauge("unassignedCount", (Gauge<Integer>) 
this::getUnassignedCount);
+    context.metricGroup().gauge("pendingRemovalCount",
+            (Gauge<Integer>) this::getPendingRemovalCount);
     delegate.start();
   }
 
@@ -121,6 +177,49 @@ public class MetricWrappedKuduEnumerator implements 
SplitEnumerator<KuduSourceSp
 
   @Override
   public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+    // Intercept SplitFinishedEvent and defer split removal until checkpoint 
completes
+    // Check event type by class name to avoid depending on connector's 
internal event classes
+    if (sourceEvent.getClass().getSimpleName().equals("SplitFinishedEvent")) {
+      // Perform all reflection operations first, before any state mutation.
+      // This ensures atomicity: if reflection fails (throws 
RuntimeException), no state has
+      // been modified.
+      List<KuduSourceSplit> finishedSplits;
+      List<KuduSourceSplit> pending;
+
+      try {
+        finishedSplits = getFinishedSplits(sourceEvent);
+        pending = getPendingSplits();
+      } catch (RuntimeException e) {
+        throw new RuntimeException(
+            "Failed to access internal Flink connector fields via reflection 
in " +
+            "wrapped enumerator.", e);
+      }
+
+      // Delegate the event to the underlying enumerator so it can schedule 
additional work
+      // immediately. The delegate will remove finished splits from 'pending', 
which is the
+      // normal behavior we need to intercept and fix.
+      delegate.handleSourceEvent(subtaskId, sourceEvent);
+
+      // After delegate processed the event, re-add finished splits back to 
'pending'.
+      // This preserves at-least-once semantics without blocking further split 
assignment.
+      // The splits will remain in 'pending' until notifyCheckpointComplete() 
is called,
+      // ensuring they're recoverable if the job crashes before checkpoint 
completion.
+      for (KuduSourceSplit split : finishedSplits) {
+        String splitId = split.splitId();
+        splitsPendingRemoval.add(splitId);
+
+        boolean wasInPending = pending.stream().anyMatch(s -> 
s.splitId().equals(splitId));
+        if (!wasInPending) {
+          pending.add(split);
+        }
+
+        LOG.debug("Split {} finished by subtask {}; deferring removal 
(restored in pending)",
+                  splitId, subtaskId);
+      }
+      return;
+    }
+
+    // For all other events, delegate normally
     delegate.handleSourceEvent(subtaskId, sourceEvent);
   }
 
@@ -141,5 +240,53 @@ public class MetricWrappedKuduEnumerator implements 
SplitEnumerator<KuduSourceSp
         ReflectionSecurityUtils.getFieldValue(unassignedField, delegate);
     return unassigned.size();
   }
+
+  private int getPendingRemovalCount() {
+    return splitsPendingRemoval.size();
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) throws Exception {
+    // Only after checkpoint completes do we remove finished splits from 
pending
+    // This ensures in-flight/buffered records are recoverable on restore
+    if (!splitsPendingRemoval.isEmpty()) {
+      List<KuduSourceSplit> pending = getPendingSplits();
+
+      int removedCount = 0;
+      for (String splitId : splitsPendingRemoval) {
+        boolean removedFromPending = pending.removeIf(split -> 
split.splitId().equals(splitId));
+
+        if (removedFromPending) {
+          removedCount++;
+          LOG.debug("Checkpoint {} complete: removed split {} from pending",
+                    checkpointId, splitId);
+        }
+      }
+
+      LOG.info("Checkpoint {} complete: removed {} finished splits from 
enumerator state",
+               checkpointId, removedCount);
+      splitsPendingRemoval.clear();
+    }
+
+    delegate.notifyCheckpointComplete(checkpointId);
+  }
+
+  private List<KuduSourceSplit> getPendingSplits() {
+    return ReflectionSecurityUtils.getFieldValue(pendingField, delegate);
+  }
+
+  private List<KuduSourceSplit> getUnassignedSplits() {
+    return ReflectionSecurityUtils.getFieldValue(unassignedField, delegate);
+  }
+
+  private List<KuduSourceSplit> getFinishedSplits(SourceEvent sourceEvent) {
+    // Lazy initialization: finishedSplits is a field of sourceEvent, not our 
delegate,
+    // so we can only cache the Field accessor once we receive the first event 
instance.
+    if (finishedSplitsField == null) {
+      finishedSplitsField = ReflectionSecurityUtils.getAccessibleField(
+          sourceEvent, "finishedSplits");
+    }
+    return ReflectionSecurityUtils.getFieldValue(finishedSplitsField, 
sourceEvent);
+  }
 }
 
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduSource.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduSource.java
index a7ecc7ed5..e2e6bdca6 100644
--- 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduSource.java
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduSource.java
@@ -31,12 +31,13 @@ import 
org.apache.flink.connector.kudu.source.split.KuduSourceSplit;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 /**
- * A wrapper around KuduSource that adds metrics support until metrics are 
implemented
- * in the upstream Flink Kudu connector (FLINK-38187).
+ * A wrapper around KuduSource that adds metrics support and checkpoint race 
condition fixes.
  * Uses the delegate pattern - all Source methods are delegated to the wrapped 
KuduSource,
- * except for enumerator creation where we inject our own 
MetricWrappedKuduEnumerator
- * to collect and expose metrics.
- * TODO(mgreber): remove once FLINK-38187 is resolved.
+ * except for:
+ * - Enumerator creation: injects MetricWrappedKuduEnumerator for metrics and 
race fix
+ * - Reader creation: wraps with StatelessKuduReaderWrapper to prevent 
duplicate split assignment
+ *
+ * TODO(mgreber): remove once FLINK-38187 and FLINK-38575 are resolved.
  */
 public class MetricWrappedKuduSource<OUT> implements Source<OUT,
                                                             KuduSourceSplit,
@@ -72,7 +73,8 @@ public class MetricWrappedKuduSource<OUT> implements 
Source<OUT,
   @Override
   public SourceReader<OUT, KuduSourceSplit> createReader(
           SourceReaderContext readerContext) throws Exception {
-    return delegate.createReader(readerContext);
+    SourceReader<OUT, KuduSourceSplit> reader = 
delegate.createReader(readerContext);
+    return new StatelessKuduReaderWrapper<>(reader);
   }
 
   @Override
diff --git 
a/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/StatelessKuduReaderWrapper.java
 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/StatelessKuduReaderWrapper.java
new file mode 100644
index 000000000..9ba8ec9ce
--- /dev/null
+++ 
b/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/StatelessKuduReaderWrapper.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication.wrappedsource;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.connector.kudu.source.split.KuduSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+/**
+ * A wrapper around KuduSourceReader that disables reader state checkpointing.
+ * This ensures all split lifecycle management happens through the enumerator,
+ * preventing duplicate split assignments on restore.
+ *
+ * Background: On restore, Flink's SourceOperator automatically restores reader
+ * state (splits) while our enumerator re-enqueues pending splits to handle the
+ * checkpoint race condition. This causes duplicate split IDs in the reader, 
leading
+ * to potential crash when the first instance finishes and removes the split 
from internal
+ * tracking, leaving the second instance orphaned.
+ *
+ * Solution: By making reader stateless (snapshotState returns empty list), all
+ * splits flow through the enumerator's unassigned queue on restore, 
eliminating
+ * duplicates.
+ *
+ * Efficiency: The Kudu connector does not implement partial progress tracking 
for
+ * splits (KuduSourceSplit is used as both split and split state with no 
progress
+ * tracking). Splits are always read from the beginning on assignment, so 
there's no
+ * efficiency loss from not checkpointing reader state.
+ */
+public class StatelessKuduReaderWrapper<OUT>
+    implements SourceReader<OUT, KuduSourceSplit> {
+
+  private final SourceReader<OUT, KuduSourceSplit> delegate;
+
+  public StatelessKuduReaderWrapper(SourceReader<OUT, KuduSourceSplit> 
delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public void start() {
+    delegate.start();
+  }
+
+  @Override
+  public InputStatus pollNext(ReaderOutput<OUT> output) throws Exception {
+    return delegate.pollNext(output);
+  }
+
+  @Override
+  public List<KuduSourceSplit> snapshotState(long checkpointId) {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public CompletableFuture<Void> isAvailable() {
+    return delegate.isAvailable();
+  }
+
+  @Override
+  public void addSplits(List<KuduSourceSplit> splits) {
+    delegate.addSplits(splits);
+  }
+
+  @Override
+  public void notifyNoMoreSplits() {
+    delegate.notifyNoMoreSplits();
+  }
+
+  @Override
+  public void close() throws Exception {
+    delegate.close();
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) throws Exception {
+    delegate.notifyCheckpointComplete(checkpointId);
+  }
+
+  @Override
+  public void notifyCheckpointAborted(long checkpointId) throws Exception {
+    delegate.notifyCheckpointAborted(checkpointId);
+  }
+}
+
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/ReplicationTestBase.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/ReplicationTestBase.java
index 24bcd4114..1d3185c0b 100644
--- 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/ReplicationTestBase.java
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/ReplicationTestBase.java
@@ -19,13 +19,25 @@ import static 
org.apache.kudu.test.ClientTestUtil.getAllTypesCreateTableOptions;
 import static org.apache.kudu.test.ClientTestUtil.getPartialRowWithAllTypes;
 import static org.apache.kudu.test.ClientTestUtil.getSchemaWithAllTypes;
 
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig;
 import org.apache.flink.connector.kudu.connector.writer.KuduWriterConfig;
 import org.junit.Before;
 import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.CreateTableOptions;
@@ -40,21 +52,30 @@ import org.apache.kudu.client.RowResultIterator;
 import org.apache.kudu.test.KuduTestHarness;
 
 public class ReplicationTestBase {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(ReplicationTestBase.class);
   protected static final String TABLE_NAME = "replication_test_table";
+  protected static final int DEFAULT_DISCOVERY_INTERVAL_SECONDS = 2;
+  protected static final int DEFAULT_CHECKPOINTING_INTERVAL_MILLIS = 500;
+  protected static final String CHECKPOINT_DIR_PREFIX = "chk-";
+  protected static final String CHECKPOINT_METADATA_FILE = "_metadata";
 
   @Rule
   public final KuduTestHarness sourceHarness = new KuduTestHarness();
   @Rule
   public final KuduTestHarness sinkHarness = new KuduTestHarness();
+  @Rule
+  public final TemporaryFolder tempFolder = new TemporaryFolder();
 
   protected KuduClient sourceClient;
   protected KuduClient sinkClient;
   protected ReplicationEnvProvider envProvider;
+  protected Path checkpointDir;
 
   @Before
-  public void setupClientsAndEnvProvider() {
+  public void setupClientsAndEnvProvider() throws Exception {
     this.sourceClient = sourceHarness.getClient();
     this.sinkClient = sinkHarness.getClient();
+    this.checkpointDir = tempFolder.newFolder("checkpoints").toPath();
     this.envProvider = new ReplicationEnvProvider(
             createDefaultJobConfig(),
             createDefaultReaderConfig(),
@@ -62,28 +83,30 @@ public class ReplicationTestBase {
   }
 
 
-  protected ReplicationJobConfig createDefaultJobConfig() {
-    ReplicationJobConfig jobConfig = ReplicationJobConfig.builder()
+  protected ReplicationJobConfig.Builder createDefaultJobConfigBuilder() {
+    return ReplicationJobConfig.builder()
             
.setSourceMasterAddresses(sourceHarness.getMasterAddressesAsString())
             .setSinkMasterAddresses(sinkHarness.getMasterAddressesAsString())
             .setTableName(TABLE_NAME)
-            .setDiscoveryIntervalSeconds(2)
-            .build();
-    return jobConfig;
+            .setDiscoveryIntervalSeconds(DEFAULT_DISCOVERY_INTERVAL_SECONDS)
+            
.setCheckpointingIntervalMillis(DEFAULT_CHECKPOINTING_INTERVAL_MILLIS)
+            .setCheckpointsDirectory(checkpointDir.toUri().toString());
+  }
+
+  protected ReplicationJobConfig createDefaultJobConfig() {
+    return createDefaultJobConfigBuilder().build();
   }
 
   protected KuduReaderConfig createDefaultReaderConfig() {
-    KuduReaderConfig readerConfig = KuduReaderConfig.Builder
+    return KuduReaderConfig.Builder
             .setMasters(sourceHarness.getMasterAddressesAsString())
             .build();
-    return readerConfig;
   }
 
   protected KuduWriterConfig createDefaultWriterConfig() {
-    KuduWriterConfig writerConfig = KuduWriterConfig.Builder
+    return KuduWriterConfig.Builder
             .setMasters(sinkHarness.getMasterAddressesAsString())
             .build();
-    return writerConfig;
   }
 
   protected void createAllTypesTable(KuduClient client) throws Exception {
@@ -158,4 +181,104 @@ public class ReplicationTestBase {
       }
     }
   }
+
+  protected Path findLatestCheckpoint(Path checkpointDir, JobID jobId) {
+    File jobCheckpointDir = checkpointDir.resolve(jobId.toString()).toFile();
+    if (!jobCheckpointDir.exists() || !jobCheckpointDir.isDirectory()) {
+      throw new AssertionError("Job checkpoint directory not found: " + 
jobCheckpointDir);
+    }
+
+    File[] checkpoints = jobCheckpointDir.listFiles(
+        f -> f.isDirectory() && f.getName().startsWith(CHECKPOINT_DIR_PREFIX));
+
+    if (checkpoints == null || checkpoints.length == 0) {
+      throw new AssertionError("No checkpoints found in " + jobCheckpointDir);
+    }
+
+    List<File> complete = Arrays.stream(checkpoints)
+        .filter(dir -> new File(dir, CHECKPOINT_METADATA_FILE).isFile())
+        .sorted(Comparator.comparing(File::getName).reversed())
+        .collect(Collectors.toList());
+
+    if (complete.isEmpty()) {
+      throw new AssertionError("No complete checkpoints (with " + 
CHECKPOINT_METADATA_FILE +
+          ") found in " + jobCheckpointDir);
+    }
+
+    return complete.get(0).toPath();
+  }
+
+  protected void waitForCheckpointCompletion(Path checkpointDir, JobID jobId, 
long timeoutMillis)
+      throws Exception {
+    if (timeoutMillis < DEFAULT_CHECKPOINTING_INTERVAL_MILLIS) {
+      throw new IllegalArgumentException(
+          "Timeout (" + timeoutMillis + "ms) must be at least " +
+          DEFAULT_CHECKPOINTING_INTERVAL_MILLIS + "ms");
+    }
+    long startTime = System.currentTimeMillis();
+    long endTime = startTime + timeoutMillis;
+
+    Path jobCheckpointDir = checkpointDir.resolve(jobId.toString());
+
+    while (System.currentTimeMillis() < endTime) {
+      if (hasCompleteCheckpoint(jobCheckpointDir.toFile())) {
+        LOG.debug("Checkpoint detected for job {} in {}", jobId, 
jobCheckpointDir);
+        return;
+      }
+      Thread.sleep(DEFAULT_CHECKPOINTING_INTERVAL_MILLIS);
+    }
+
+    throw new AssertionError(
+        "No complete checkpoint found for job " + jobId + " in directory " + 
jobCheckpointDir +
+            " after " + timeoutMillis + "ms");
+  }
+
+  protected boolean hasCompleteCheckpoint(File dir) {
+    if (dir == null || !dir.exists() || !dir.isDirectory()) {
+      return false;
+    }
+
+    File[] files = dir.listFiles();
+    if (files == null) {
+      return false;
+    }
+
+    for (File file : files) {
+      if (file.getName().startsWith(CHECKPOINT_DIR_PREFIX) && 
file.isDirectory()) {
+        File metadata = new File(file, CHECKPOINT_METADATA_FILE);
+        if (metadata.exists() && metadata.isFile()) {
+          return true;
+        }
+      }
+      if (file.isDirectory() && !file.getName().equals("shared") &&
+              !file.getName().equals("taskowned")) {
+        if (hasCompleteCheckpoint(file)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  protected void waitForJobTermination(JobID jobId, ClusterClient<?> client, 
long timeoutMillis)
+      throws Exception {
+    if (timeoutMillis < 100) {
+      throw new IllegalArgumentException(
+          "Timeout (" + timeoutMillis + "ms) must be at least 100ms");
+    }
+    long startTime = System.currentTimeMillis();
+    long endTime = startTime + timeoutMillis;
+
+    while (System.currentTimeMillis() < endTime) {
+      JobStatus status = client.getJobStatus(jobId).get();
+      if (status.isGloballyTerminalState()) {
+        LOG.debug("Job {} terminated with status: {}", jobId, status);
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    throw new AssertionError("Job " + jobId + " did not terminate within " +
+        timeoutMillis + "ms");
+  }
 }
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestMetricWrappedKuduEnumerator.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestMetricWrappedKuduEnumerator.java
new file mode 100644
index 000000000..89fd361bc
--- /dev/null
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestMetricWrappedKuduEnumerator.java
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.kudu.connector.KuduTableInfo;
+import org.apache.flink.connector.kudu.connector.reader.KuduReaderConfig;
+import org.apache.flink.connector.kudu.source.enumerator.KuduSourceEnumerator;
+import 
org.apache.flink.connector.kudu.source.enumerator.KuduSourceEnumeratorState;
+import org.apache.flink.connector.kudu.source.split.KuduSourceSplit;
+import org.apache.flink.connector.kudu.source.split.SplitFinishedEvent;
+import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.replication.wrappedsource.MetricWrappedKuduEnumerator;
+import org.apache.kudu.replication.wrappedsource.ReflectionSecurityUtils;
+
+/**
+ * Unit tests for {@link MetricWrappedKuduEnumerator} checkpoint race 
condition fix (FLINK-38575).
+ *
+ * Tests that splits which finish between checkpoint snapshot and checkpoint 
completion
+ * are properly retained in checkpoint state to prevent data loss.
+ */
+public class TestMetricWrappedKuduEnumerator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestMetricWrappedKuduEnumerator.class);
+
+  /**
+   * Tests the core race condition fix:
+   * 1. Split finishes (SplitFinishedEvent arrives)
+   * 2. Checkpoint snapshot is taken (should include finished split in pending)
+   * 3. Job crashes before notifyCheckpointComplete
+   * 4. On restore, finished split must be replayed (at-least-once semantics)
+   *
+   * Without the fix, the split would be removed from pending immediately on 
finish,
+   * causing it to be missing from the checkpoint snapshot, leading to data 
loss.
+   */
+  @Test
+  public void testSplitNotLostWhenFinishedBeforeCheckpointComplete() throws 
Exception {
+    String splitId;
+    KuduSourceEnumeratorState checkpointAfterComplete;
+    try (MetricWrappedKuduEnumerator enumerator = createTestEnumerator()) {
+
+      KuduSourceSplit testSplit = createTestSplit("tablet-1");
+
+      enumerator.addReader(0);
+
+      enumerator.handleSplitRequest(0, "localhost");
+      List<KuduSourceSplit> assignedSplits = 
Collections.singletonList(testSplit);
+      enumerator.addSplitsBack(assignedSplits, 0);
+
+      SourceEvent splitFinishedEvent = 
createSplitFinishedEvent(assignedSplits);
+      enumerator.handleSourceEvent(0, splitFinishedEvent);
+
+      KuduSourceEnumeratorState checkpointBeforeComplete = 
enumerator.snapshotState(1L);
+
+      List<KuduSourceSplit> pendingSplitsInCheckpoint =
+              getPendingSplitsFromState(checkpointBeforeComplete);
+
+      splitId = testSplit.splitId();
+      assertTrue(
+              "Finished split must be present in checkpoint before 
notifyCheckpointComplete",
+              pendingSplitsInCheckpoint.stream()
+                      .anyMatch(split -> split.splitId().equals(splitId)));
+
+      enumerator.notifyCheckpointComplete(1L);
+
+      checkpointAfterComplete = enumerator.snapshotState(2L);
+    }
+    List<KuduSourceSplit> pendingSplitsAfterCleanup =
+        getPendingSplitsFromState(checkpointAfterComplete);
+
+    assertTrue(
+        "Finished split should be removed from pending after 
notifyCheckpointComplete",
+        pendingSplitsAfterCleanup.stream()
+            .noneMatch(split -> split.splitId().equals(splitId)));
+  }
+
+  /**
+   * Tests that multiple splits finishing between checkpoints are all retained 
correctly.
+   */
+  @Test
+  public void testMultipleSplitsRetainedBetweenCheckpoints() throws Exception {
+    String split1Id;
+    String split2Id;
+    KuduSourceEnumeratorState checkpoint3;
+    try (MetricWrappedKuduEnumerator enumerator = createTestEnumerator()) {
+
+      KuduSourceSplit split1 = createTestSplit("tablet-1");
+
+      enumerator.addReader(0);
+
+      enumerator.addSplitsBack(Collections.singletonList(split1), 0);
+      enumerator.handleSplitRequest(0, "localhost");
+      enumerator.handleSourceEvent(0, 
createSplitFinishedEvent(Collections.singletonList(split1)));
+
+      KuduSourceEnumeratorState checkpoint1 = enumerator.snapshotState(1L);
+      List<KuduSourceSplit> pendingAfterCheckpoint1 = 
getPendingSplitsFromState(checkpoint1);
+
+      split1Id = split1.splitId();
+      assertTrue(
+              "Split1 should be retained in pending after finishing but before 
checkpoint complete",
+              pendingAfterCheckpoint1.stream().anyMatch(s -> 
s.splitId().equals(split1Id)));
+
+      KuduSourceSplit split2 = createTestSplit("tablet-2");
+      enumerator.addSplitsBack(Collections.singletonList(split2), 0);
+      enumerator.handleSplitRequest(0, "localhost");
+      enumerator.handleSourceEvent(0, 
createSplitFinishedEvent(Collections.singletonList(split2)));
+
+      KuduSourceEnumeratorState checkpoint2 = enumerator.snapshotState(2L);
+      List<KuduSourceSplit> pendingAfterCheckpoint2 = 
getPendingSplitsFromState(checkpoint2);
+
+      split2Id = split2.splitId();
+      assertTrue("Both finished splits should be in pending before any 
checkpoint completes",
+              pendingAfterCheckpoint2.stream().anyMatch(s -> 
s.splitId().equals(split1Id)));
+      assertTrue("Both finished splits should be in pending before any 
checkpoint completes",
+              pendingAfterCheckpoint2.stream().anyMatch(s -> 
s.splitId().equals(split2Id)));
+
+      enumerator.notifyCheckpointComplete(1L);
+
+      checkpoint3 = enumerator.snapshotState(3L);
+    }
+    List<KuduSourceSplit> pendingAfterCleanup = 
getPendingSplitsFromState(checkpoint3);
+
+    assertTrue("Both splits should be removed after checkpoint completes",
+        pendingAfterCleanup.stream().noneMatch(s -> 
s.splitId().equals(split1Id)));
+    assertTrue("Both splits should be removed after checkpoint completes",
+        pendingAfterCleanup.stream().noneMatch(s -> 
s.splitId().equals(split2Id)));
+  }
+
+  /**
+   * Tests that the PLAIN KuduSourceEnumerator has the race condition.
+   * This test documents the bug in the upstream Flink Kudu connector and acts 
as a canary:
+   * if this test starts FAILING, it means Flink has fixed FLINK-38575 
upstream, and we can
+   * potentially remove our wrapper.
+   */
+  @Test
+  public void testPlainEnumeratorLosesSplitDueToRaceCondition() throws 
Exception {
+    String splitId;
+    KuduSourceEnumeratorState checkpointState;
+    try (KuduSourceEnumerator plainEnumerator = createPlainEnumerator()) {
+
+      KuduSourceSplit testSplit = createTestSplit("tablet-1");
+      splitId = testSplit.splitId();
+
+      plainEnumerator.addReader(0);
+      plainEnumerator.handleSplitRequest(0, "localhost");
+      List<KuduSourceSplit> assignedSplits = 
Collections.singletonList(testSplit);
+      plainEnumerator.addSplitsBack(assignedSplits, 0);
+
+      SourceEvent splitFinishedEvent = 
createSplitFinishedEvent(assignedSplits);
+      plainEnumerator.handleSourceEvent(0, splitFinishedEvent);
+
+      checkpointState = plainEnumerator.snapshotState(1L);
+    }
+    List<KuduSourceSplit> pendingSplitsInCheckpoint = 
getPendingSplitsFromState(checkpointState);
+
+    boolean splitPresentInCheckpoint = pendingSplitsInCheckpoint.stream()
+        .anyMatch(split -> split.splitId().equals(splitId));
+
+    if (!splitPresentInCheckpoint) {
+      LOG.info("EXPECTED: Plain enumerator exhibits race condition - " +
+          "split lost between finish and checkpoint (FLINK-38575 still present 
in connector)");
+    } else {
+      throw new AssertionError(
+          "Plain KuduSourceEnumerator unexpectedly retained the split in 
checkpoint! " +
+          "This suggests FLINK-38575 has been fixed upstream. " +
+          "Consider removing MetricWrappedKuduEnumerator if no longer 
needed.");
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private MetricWrappedKuduEnumerator createTestEnumerator() {
+    KuduTableInfo mockTableInfo = mock(KuduTableInfo.class);
+    when(mockTableInfo.getName()).thenReturn("test_table");
+
+    SplitEnumeratorMetricGroup mockMetricGroup = 
mock(SplitEnumeratorMetricGroup.class);
+    when(mockMetricGroup.gauge(org.mockito.ArgumentMatchers.anyString(),
+        org.mockito.ArgumentMatchers.any())).thenReturn(null);
+
+    SplitEnumeratorContext<KuduSourceSplit> mockContext = 
mock(SplitEnumeratorContext.class);
+    when(mockContext.currentParallelism()).thenReturn(1);
+    when(mockContext.registeredReaders()).thenReturn(Collections.emptyMap());
+    when(mockContext.metricGroup()).thenReturn(mockMetricGroup);
+
+    KuduReaderConfig readerConfig = KuduReaderConfig.Builder
+        .setMasters("localhost:7051")
+        .build();
+
+    return new MetricWrappedKuduEnumerator(
+        mockTableInfo,
+        readerConfig,
+        org.apache.flink.api.connector.source.Boundedness.CONTINUOUS_UNBOUNDED,
+        Duration.ofSeconds(10),
+        mockContext,
+        null
+    );
+  }
+
+  @SuppressWarnings("unchecked")
+  private KuduSourceEnumerator createPlainEnumerator() {
+    KuduTableInfo mockTableInfo = mock(KuduTableInfo.class);
+    when(mockTableInfo.getName()).thenReturn("test_table");
+
+    KuduReaderConfig readerConfig = KuduReaderConfig.Builder
+        .setMasters("localhost:7051")
+        .build();
+
+    SplitEnumeratorContext<KuduSourceSplit> mockContext = 
mock(SplitEnumeratorContext.class);
+    when(mockContext.currentParallelism()).thenReturn(1);
+    when(mockContext.registeredReaders()).thenReturn(Collections.emptyMap());
+
+    return new KuduSourceEnumerator(
+        mockTableInfo,
+        readerConfig,
+        org.apache.flink.api.connector.source.Boundedness.CONTINUOUS_UNBOUNDED,
+        Duration.ofSeconds(10),
+        mockContext
+    );
+  }
+
+  private KuduSourceSplit createTestSplit(String splitId) {
+    byte[] serializedToken = splitId.getBytes(StandardCharsets.UTF_8);
+    return new KuduSourceSplit(serializedToken);
+  }
+
+  private SourceEvent createSplitFinishedEvent(List<KuduSourceSplit> 
finishedSplits) {
+    return new SplitFinishedEvent(finishedSplits);
+  }
+
+  private List<KuduSourceSplit> 
getPendingSplitsFromState(KuduSourceEnumeratorState state) {
+    return ReflectionSecurityUtils.getPrivateFieldValue(state, "pending");
+  }
+}
+
+
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationCheckpoint.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationCheckpoint.java
new file mode 100644
index 000000000..29d721f5c
--- /dev/null
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationCheckpoint.java
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.kudu.replication;
+
+import static org.apache.kudu.test.ClientTestUtil.countRowsInTable;
+import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.file.Path;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.KuduTable;
+
+/**
+ * The replication job uses at-least-once semantics with UPSERT operations for 
idempotency.
+ * This means row count assertions ALONE are insufficient to verify checkpoint 
restoration,
+ * because UPSERT makes a full re-scan indistinguishable from proper 
checkpoint restoration:
+ * - Checkpoint working: Restores state -> incremental replication -> correct 
row count
+ * - Checkpoint broken: Starts fresh -> full re-scan -> UPSERT deduplicates -> 
correct row count
+ *
+ * Both scenarios produce the same final row count! Therefore, we use a 
two-step verification:
+ * - Positive case (testStopAndRestartFromCheckpoint): Restarts from valid 
checkpoint.
+ *   We verify that (1) the job reaches RUNNING state (proves checkpoint was 
loaded and
+ *   restoration succeeded), and (2) all data is eventually replicated 
correctly (proves
+ *   functional correctness). The combination proves checkpoint restoration is 
actually
+ *   working, not just that we got the right final data by accident.
+ *
+ * - Negative case (testRestartWithInvalidCheckpointFails): Deliberately 
provides an
+ *   invalid checkpoint path. We expect the job to FAIL during startup (before 
reaching
+ *   RUNNING state), proving that our test setup actually depends on 
checkpoint restoration
+ *   working and isn't just passing due to idempotent re-replication.
+ */
+public class TestReplicationCheckpoint extends ReplicationTestBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationCheckpoint.class);
+
+  @ClassRule
+  public static final MiniClusterWithClientResource flinkCluster =
+          new MiniClusterWithClientResource(
+                  new MiniClusterResourceConfiguration.Builder()
+                          .setNumberSlotsPerTaskManager(2)
+                          .setNumberTaskManagers(1)
+                          .setConfiguration(new Configuration())
+                          .build());
+
+  private ClusterClient<?> clusterClient;
+
+  @After
+  public void cleanup() {
+    flinkCluster.cancelAllJobs();
+  }
+
+  @Before
+  public void setUp() {
+    clusterClient = flinkCluster.getClusterClient();
+  }
+
+  @Test(timeout = 180000)
+  public void testStopAndRestartFromCheckpoint() throws Exception {
+    createAllTypesTable(sourceClient);
+    createAllTypesTable(sinkClient);
+
+    ReplicationEnvProvider envProvider = new ReplicationEnvProvider(
+        createDefaultJobConfig(), createDefaultReaderConfig(), 
createDefaultWriterConfig(), true);
+
+    insertRowsIntoAllTypesTable(sourceClient, 0, 10);
+
+    JobClient jobClient = envProvider.getEnv().executeAsync();
+
+    KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+
+    // 30s = 15 discovery cycles @ 2s - sufficient for 10 rows finishing 
ideally in one cycle.
+    assertEventuallyTrue("Initial 10 rows should be replicated",
+        () -> countRowsInTable(sinkTable) == 10, 30000);
+
+    // 10s = 20 checkpoint cycles @ 500ms - finds checkpoint quickly
+    waitForCheckpointCompletion(checkpointDir, jobClient.getJobID(), 10000);
+
+    clusterClient.cancel(jobClient.getJobID()).get();
+    waitForJobTermination(jobClient.getJobID(), clusterClient, 15000);
+
+    insertRowsIntoAllTypesTable(sourceClient, 10, 10);
+
+    Path latestCheckpoint = findLatestCheckpoint(checkpointDir, 
jobClient.getJobID());
+    LOG.info("Found latest checkpoint: {}", latestCheckpoint);
+
+    ReplicationEnvProvider restartedEnvProvider = new ReplicationEnvProvider(
+        createDefaultJobConfig(), createDefaultReaderConfig(), 
createDefaultWriterConfig(), true);
+
+    JobGraph jobGraph = 
restartedEnvProvider.getEnv().getStreamGraph().getJobGraph();
+    jobGraph.setSavepointRestoreSettings(
+            SavepointRestoreSettings.forPath(latestCheckpoint.toString(), 
false));
+
+    JobID restartedJobId = clusterClient.submitJob(jobGraph).get();
+    LOG.info("Restarted job with ID: {} from checkpoint: {}", restartedJobId, 
latestCheckpoint);
+
+    // Verify job reaches RUNNING state (proves checkpoint restoration 
succeeded)
+    // Job transitions: INITIALIZING -> CREATED -> RUNNING
+    // 20s timeout - job startup should be quick (couple seconds)
+    JobStatus restartedJobStatus = 
clusterClient.getJobStatus(restartedJobId).get();
+    long statusTimeoutMs = 20000;
+    long statusStartTime = System.currentTimeMillis();
+    while (restartedJobStatus != JobStatus.RUNNING &&
+            !restartedJobStatus.isGloballyTerminalState() &&
+            System.currentTimeMillis() - statusStartTime < statusTimeoutMs) {
+      Thread.sleep(500);
+      restartedJobStatus = clusterClient.getJobStatus(restartedJobId).get();
+      LOG.debug("Job status: {}", restartedJobStatus);
+    }
+
+    assertTrue(
+        "Job should reach RUNNING state after checkpoint restoration, got: " + 
restartedJobStatus,
+        restartedJobStatus == JobStatus.RUNNING);
+
+    assertEventuallyTrue("All 20 rows should be replicated after restart",
+        () -> countRowsInTable(sinkTable) == 20, 30000);
+
+    clusterClient.cancel(restartedJobId).get();
+  }
+
+  @Test(timeout = 180000)
+  public void testRestartWithInvalidCheckpointFails() throws Exception {
+    createAllTypesTable(sourceClient);
+    createAllTypesTable(sinkClient);
+
+    ReplicationEnvProvider envProvider = new ReplicationEnvProvider(
+        createDefaultJobConfig(), createDefaultReaderConfig(), 
createDefaultWriterConfig(), true);
+
+    insertRowsIntoAllTypesTable(sourceClient, 0, 10);
+
+    JobClient jobClient = envProvider.getEnv().executeAsync();
+    JobID jobId = jobClient.getJobID();
+
+    KuduTable sinkTable = sinkClient.openTable(TABLE_NAME);
+
+    // 30s = 15 discovery cycles @ 2s - sufficient for 10 rows finishing 
ideally in one cycle.
+    assertEventuallyTrue("Initial 10 rows should be replicated",
+        () -> countRowsInTable(sinkTable) == 10, 30000);
+
+    // 10s = 20 checkpoint cycles @ 500ms
+    waitForCheckpointCompletion(checkpointDir, jobId, 10000);
+
+    clusterClient.cancel(jobId).get();
+    waitForJobTermination(jobId, clusterClient, 15000);  // Cancellation is 
fast
+
+    ReplicationEnvProvider restartedEnvProvider = new ReplicationEnvProvider(
+        createDefaultJobConfig(), createDefaultReaderConfig(), 
createDefaultWriterConfig(), true);
+
+    JobGraph jobGraph = 
restartedEnvProvider.getEnv().getStreamGraph().getJobGraph();
+    String invalidCheckpointPath = 
checkpointDir.resolve("invalid-checkpoint-path").toString();
+    jobGraph.setSavepointRestoreSettings(
+            SavepointRestoreSettings.forPath(invalidCheckpointPath, false));
+
+    JobID restartedJobId = clusterClient.submitJob(jobGraph).get();
+
+    // 30s timeout - job should fail immediately on invalid checkpoint
+    JobStatus finalStatus = clusterClient.getJobStatus(restartedJobId).get();
+    long timeoutMs = 30000;
+    long startTime = System.currentTimeMillis();
+    while (!finalStatus.isGloballyTerminalState() &&
+            System.currentTimeMillis() - startTime < timeoutMs) {
+      Thread.sleep(500);
+      finalStatus = clusterClient.getJobStatus(restartedJobId).get();
+    }
+
+    LOG.info("Job final status with invalid checkpoint: {}", finalStatus);
+    assertEquals("Job should FAIL when given invalid checkpoint path",
+        JobStatus.FAILED, finalStatus);
+  }
+}
+
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationConfigParser.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationConfigParser.java
index 92a2d294e..4d026c545 100644
--- 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationConfigParser.java
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationConfigParser.java
@@ -70,12 +70,27 @@ public class TestReplicationConfigParser {
   }
 
   @Test
-  public void testJobConfigAllRequiredParamsPresent() {
+  public void testJobConfigMissingCheckpointsDirectoryThrows() {
     String[] args = {
         "--job.sourceMasterAddresses", "source1:7051",
         "--job.sinkMasterAddresses", "sink1:7051",
         "--job.tableName", "my_table"
     };
+    ParameterTool params = ParameterTool.fromArgs(args);
+
+    assertThatThrownBy(() -> ReplicationConfigParser.parseJobConfig(params))
+            .isInstanceOf(RuntimeException.class)
+            .hasMessageContaining("No data for required key 
'job.checkpointsDirectory'");
+  }
+
+  @Test
+  public void testJobConfigAllRequiredParamsPresent() {
+    String[] args = {
+        "--job.sourceMasterAddresses", "source1:7051",
+        "--job.sinkMasterAddresses", "sink1:7051",
+        "--job.tableName", "my_table",
+        "--job.checkpointsDirectory", "file:///tmp/checkpoints"
+    };
 
     ParameterTool params = ParameterTool.fromArgs(args);
     ReplicationJobConfig config = 
ReplicationConfigParser.parseJobConfig(params);
@@ -86,8 +101,82 @@ public class TestReplicationConfigParser {
     // The default value for restoreOwner is False.
     assertFalse(config.getRestoreOwner());
     assertEquals("", config.getTableSuffix());
-    // The default value for the discovery interval is 300.
-    assertEquals(300, config.getDiscoveryIntervalSeconds());
+    // The default value for the discovery interval is 600 seconds (10 
minutes).
+    assertEquals(600, config.getDiscoveryIntervalSeconds());
+    // The default checkpoint interval is 60000 ms (1 minute).
+    assertEquals(60000, config.getCheckpointingIntervalMillis());
+  }
+
+  @Test
+  public void testJobConfigCheckpointingParams() {
+    String[] args = {
+        "--job.sourceMasterAddresses", "source1:7051",
+        "--job.sinkMasterAddresses", "sink1:7051",
+        "--job.tableName", "my_table",
+        "--job.checkpointingIntervalMillis", "120000",
+        "--job.discoveryIntervalSeconds", "150",
+        "--job.checkpointsDirectory", "file:///tmp/checkpoints"
+    };
+
+    ParameterTool params = ParameterTool.fromArgs(args);
+    ReplicationJobConfig config = 
ReplicationConfigParser.parseJobConfig(params);
+
+    assertEquals(120000, config.getCheckpointingIntervalMillis());
+    assertEquals("file:///tmp/checkpoints", config.getCheckpointsDirectory());
+  }
+
+  @Test
+  public void testJobConfigCheckpointIntervalGreaterThanDiscoveryThrows() {
+    String[] args = {
+        "--job.sourceMasterAddresses", "source1:7051",
+        "--job.sinkMasterAddresses", "sink1:7051",
+        "--job.tableName", "my_table",
+        "--job.checkpointsDirectory", "file:///tmp/checkpoints",
+        "--job.checkpointingIntervalMillis", "120000",
+        "--job.discoveryIntervalSeconds", "60"
+    };
+    ParameterTool params = ParameterTool.fromArgs(args);
+
+    assertThatThrownBy(() -> ReplicationConfigParser.parseJobConfig(params))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageMatching(
+            "Checkpointing interval \\(\\d+ ms\\) must be < discovery interval 
\\(\\d+ ms\\).*");
+  }
+
+  @Test
+  public void testJobConfigCheckpointIntervalEqualToDiscoveryThrows() {
+    String[] args = {
+        "--job.sourceMasterAddresses", "source1:7051",
+        "--job.sinkMasterAddresses", "sink1:7051",
+        "--job.tableName", "my_table",
+        "--job.checkpointsDirectory", "file:///tmp/checkpoints",
+        "--job.checkpointingIntervalMillis", "60000",
+        "--job.discoveryIntervalSeconds", "60"
+    };
+    ParameterTool params = ParameterTool.fromArgs(args);
+
+    assertThatThrownBy(() -> ReplicationConfigParser.parseJobConfig(params))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageMatching(
+            "Checkpointing interval \\(\\d+ ms\\) must be < discovery interval 
\\(\\d+ ms\\).*");
+  }
+
+  @Test
+  public void testJobConfigCheckpointIntervalLessThanDiscoveryAllowed() {
+    String[] args = {
+        "--job.sourceMasterAddresses", "source1:7051",
+        "--job.sinkMasterAddresses", "sink1:7051",
+        "--job.tableName", "my_table",
+        "--job.checkpointsDirectory", "file:///tmp/checkpoints",
+        "--job.checkpointingIntervalMillis", "30000",
+        "--job.discoveryIntervalSeconds", "60"
+    };
+    ParameterTool params = ParameterTool.fromArgs(args);
+    ReplicationJobConfig config = 
ReplicationConfigParser.parseJobConfig(params);
+
+    assertNotNull(config);
+    assertEquals(30000, config.getCheckpointingIntervalMillis());
+    assertEquals(60, config.getDiscoveryIntervalSeconds());
   }
 
   @Test
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationMetrics.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationMetrics.java
index 2981c973d..096cd779c 100644
--- 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationMetrics.java
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationMetrics.java
@@ -106,18 +106,22 @@ public class TestReplicationMetrics extends 
ReplicationTestBase {
     Map<String, Metric> metrics = reporter.getMetricsByIdentifiers(jobId);
 
     // Verify all expected metrics are present
-    Optional<Gauge<Long>> lastEndTimestamp = findMetric(metrics, 
"lastEndTimestamp");
-    Optional<Gauge<Integer>> pendingCount = findMetric(metrics, 
"pendingCount");
-    Optional<Gauge<Integer>> unassignedCount = findMetric(metrics, 
"unassignedCount");
+    final Optional<Gauge<Long>> lastEndTimestamp = findMetric(metrics, 
"lastEndTimestamp");
+    final Optional<Gauge<Integer>> pendingCount = findMetric(metrics, 
"pendingCount");
+    final Optional<Gauge<Integer>> unassignedCount = findMetric(metrics, 
"unassignedCount");
+    final Optional<Gauge<Integer>> pendingRemovalCount =
+        findMetric(metrics, "pendingRemovalCount");
 
     assertTrue("lastEndTimestamp metric should be present", 
lastEndTimestamp.isPresent());
     assertTrue("pendingCount metric should be present", 
pendingCount.isPresent());
     assertTrue("unassignedCount metric should be present", 
unassignedCount.isPresent());
+    assertTrue("pendingRemovalCount metric should be present", 
pendingRemovalCount.isPresent());
 
     // Basic sanity checks
     assertNotNull("lastEndTimestamp should have a value", 
lastEndTimestamp.get().getValue());
     assertNotNull("pendingCount should have a value", 
pendingCount.get().getValue());
     assertNotNull("unassignedCount should have a value", 
unassignedCount.get().getValue());
+    assertNotNull("pendingRemovalCount should have a value", 
pendingRemovalCount.get().getValue());
   }
 
   @Test(timeout = 100000)
diff --git 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationTableInitializer.java
 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationTableInitializer.java
index a9b00776b..c332bbc71 100644
--- 
a/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationTableInitializer.java
+++ 
b/java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationTableInitializer.java
@@ -47,11 +47,7 @@ public class TestReplicationTableInitializer extends 
ReplicationTestBase {
 
   @Override
   protected ReplicationJobConfig createDefaultJobConfig() {
-    return ReplicationJobConfig.builder()
-            
.setSourceMasterAddresses(sourceHarness.getMasterAddressesAsString())
-            .setSinkMasterAddresses(sinkHarness.getMasterAddressesAsString())
-            .setTableName(TABLE_NAME)
-            .setDiscoveryIntervalSeconds(2)
+    return createDefaultJobConfigBuilder()
             .setCreateTable(true)
             .build();
   }

Reply via email to