featzhang created FLINK-39388:
---------------------------------

             Summary: Fix Flaky DataGeneratorSourceITCase#testGatedRateLimiter
                 Key: FLINK-39388
                 URL: https://issues.apache.org/jira/browse/FLINK-39388
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 2.0.2
            Reporter: featzhang


{{DataGeneratorSourceITCase#testGatedRateLimiter}} is a flaky test due to a 
race condition in the {{FirstCheckpointFilter}} inner class.

----

h2. Description

The integration test {{DataGeneratorSourceITCase#testGatedRateLimiter}} fails 
intermittently in CI (observed in build 
[#73815|https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=73815]
 triggered by PR [#27351|https://github.com/apache/flink/pull/27351]).

h3. Root Cause

The test uses an inner class {{FirstCheckpointFilter}} to collect only the 
elements emitted before the first checkpoint. In the original implementation, 
{{FirstCheckpointFilter}} stops collecting inside {{snapshotState()}}:

{code:java}
// BEFORE (buggy)
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    firstCheckpoint = false;   // stops collecting immediately on snapshot
}
{code}

This creates a race condition:

# {{GatedRateLimiter}} completes its first cycle immediately (by design) and 
emits {{capacityPerCheckpoint = 8}} elements without waiting for a checkpoint.
# These 8 elements travel through the pipeline network to reach 
{{FirstCheckpointFilter}}.
# If the checkpoint barrier arrives at {{FirstCheckpointFilter}} *before* all 8 
elements have been processed, {{snapshotState()}} is called prematurely and 
{{firstCheckpoint}} is set to {{false}}.
# Subsequent elements are dropped, so the final result contains fewer than 8 
elements.
# The assertion {{assertThat(results).hasSize(capacityPerCheckpoint)}} fails.

The failure is non-deterministic and depends on scheduling, network latency, 
and system load in the CI environment — classic symptoms of a flaky test.

h3. Fix

Replace the {{snapshotState}}-based cutoff with a 
{{notifyCheckpointComplete}}-based cutoff by implementing 
{{CheckpointListener}}. The {{notifyCheckpointComplete}} callback is invoked 
only after the checkpoint has been fully acknowledged by all operators, which 
guarantees that all elements emitted in the same checkpoint cycle have already 
been processed downstream.

{code:java}
// AFTER (fixed)
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    // Record the ID of the first checkpoint so we can stop collecting when it 
completes.
    if (firstCheckpointId == Long.MIN_VALUE) {
        firstCheckpointId = context.getCheckpointId();
    }
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
    // Stop collecting elements once the first checkpoint has completed.
    if (checkpointId >= firstCheckpointId && firstCheckpointId != 
Long.MIN_VALUE) {
        firstCheckpointCompleted = true;
    }
}
{code}

----

h2. Steps to Reproduce

Run {{DataGeneratorSourceITCase#testGatedRateLimiter}} repeatedly under load or 
in a slow CI environment. The test will occasionally fail with:

{noformat}
org.opentest4j.AssertionFailedError:
expected: a collection with size 8
 but was: a collection with size <N> (N < 8)
{noformat}

----

h2. Expected Behavior

The test passes consistently regardless of checkpoint timing.

----

h2. Actual Behavior

The test fails intermittently when the checkpoint barrier reaches 
{{FirstCheckpointFilter}} before all upstream elements have been processed.

----

h2. Environment

- *Flink Version*: 2.0 (master)
- *Java Version*: 17
- *OS*: Linux (CI)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to