Martijn Visser created FLINK-39954:
--------------------------------------

             Summary: SinkV2ITCase over-asserts commit multiplicity under 
unaligned checkpoints 
                 Key: FLINK-39954
                 URL: https://issues.apache.org/jira/browse/FLINK-39954
             Project: Flink
          Issue Type: Bug
          Components: API / Core
            Reporter: Martijn Visser


{{SinkV2ITCase.writerAndCommitterExecuteInStreamingModeWithScaling}} asserts an 
exact commit multiplicity 
({{containsExactlyInAnyOrder(duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE))}},
 i.e. each value committed exactly 2x) using a non-idempotent test committer 
({{TrackingCommitter}} appends every {{commit()}} call). This is stronger than 
the {{Committer}} contract guarantees and makes the test fragile under 
unaligned checkpoints (UC), producing intermittent "unexpected element" 
(duplicate) failures.

This surfaced while working on FLINK-38614

I believe it's over-specified because the {{Committer}} contract 
({{flink-core}} {{o.a.f.api.connector.sink2.Committer}} JavaDoc) guarantees an 
exactly-once effect and explicitly permits repeated commits: "A commit must be 
idempotent... Flink will restart from a previous checkpoint and re-attempt to 
commit all committables. Thus, some or all committables may have already been 
committed." It does not guarantee a specific number of {{commit()}} invocations.

* Under aligned checkpoints the commit/re-commit pattern is deterministic 
(fixed barrier boundaries -> exactly the two-run replay), so 
{{duplicate(EXPECTED)}} holds.
* Under unaligned checkpoints a boundary committable can shift, so it is 
committed in run 1, re-committed on run 2's restore, and committed again in run 
2's normal processing -> 3x. That extra is contract-permitted (a real 
idempotent sink dedupes it to a no-op) but the non-idempotent counter records 
it as an "unexpected element".

It looks like we can't easily make TrackingCommitter idempotent, because:

* The committable is {{Record}} = (value, timestamp, watermark); under this 
test all records have {{timestamp=null, watermark=MIN}}, and the source emits 
each value twice legitimately, so those committables are byte-identical. There 
is no unique transaction/committable id to dedupe on ({{CommitRequest}} exposes 
only {{getCommittable()}} plus retry signals). Deduping by content would 
wrongly drop the legitimate second copy.
* The test deliberately expects duplication ({{duplicate(EXPECTED)}}), so 
making the committer idempotent also requires changing the expected result to 
{{EXPECTED}}.

A faithful fix gives the committable a unique identity (a 
{{TestSinkV2}}/{{Record}} plus source change), makes {{TrackingCommitter}} 
idempotent (dedupe by id, or use {{CommitRequest.signalAlreadyCommitted()}}), 
and asserts the exactly-once effect ({{EXPECTED}}, not 
{{duplicate(EXPECTED)}}). This is a non-trivial test redesign and should get 
sink/checkpointing maintainer review since it changes what the canonical SinkV2 
test asserts.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to