Martijn Visser created FLINK-39954:
--------------------------------------
Summary: SinkV2ITCase over-asserts commit multiplicity under
unaligned checkpoints
Key: FLINK-39954
URL: https://issues.apache.org/jira/browse/FLINK-39954
Project: Flink
Issue Type: Bug
Components: API / Core
Reporter: Martijn Visser
{{SinkV2ITCase.writerAndCommitterExecuteInStreamingModeWithScaling}} asserts an
exact commit multiplicity
({{containsExactlyInAnyOrder(duplicate(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE))}},
i.e. each value committed exactly 2x) using a non-idempotent test committer
({{TrackingCommitter}} appends every {{commit()}} call). This is stronger than
the {{Committer}} contract guarantees and makes the test fragile under
unaligned checkpoints (UC), producing intermittent "unexpected element"
(duplicate) failures.
This surfaced while working on FLINK-38614
I believe it's over-specified because the {{Committer}} contract
({{flink-core}} {{o.a.f.api.connector.sink2.Committer}} JavaDoc) guarantees an
exactly-once effect and explicitly permits repeated commits: "A commit must be
idempotent... Flink will restart from a previous checkpoint and re-attempt to
commit all committables. Thus, some or all committables may have already been
committed." It does not guarantee a specific number of {{commit()}} invocations.
* Under aligned checkpoints the commit/re-commit pattern is deterministic
(fixed barrier boundaries -> exactly the two-run replay), so
{{duplicate(EXPECTED)}} holds.
* Under unaligned checkpoints a boundary committable can shift, so it is
committed in run 1, re-committed on run 2's restore, and committed again in run
2's normal processing -> 3x. That extra is contract-permitted (a real
idempotent sink dedupes it to a no-op) but the non-idempotent counter records
it as an "unexpected element".
It looks like we can't easily make TrackingCommitter idempotent, because:
* The committable is {{Record}} = (value, timestamp, watermark); under this
test all records have {{timestamp=null, watermark=MIN}}, and the source emits
each value twice legitimately, so those committables are byte-identical. There
is no unique transaction/committable id to dedupe on ({{CommitRequest}} exposes
only {{getCommittable()}} plus retry signals). Deduping by content would
wrongly drop the legitimate second copy.
* The test deliberately expects duplication ({{duplicate(EXPECTED)}}), so
making the committer idempotent also requires changing the expected result to
{{EXPECTED}}.
A faithful fix gives the committable a unique identity (a
{{TestSinkV2}}/{{Record}} plus source change), makes {{TrackingCommitter}}
idempotent (dedupe by id, or use {{CommitRequest.signalAlreadyCommitted()}}),
and asserts the exactly-once effect ({{EXPECTED}}, not
{{duplicate(EXPECTED)}}). This is a non-trivial test redesign and should get
sink/checkpointing maintainer review since it changes what the canonical SinkV2
test asserts.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)