This is an automated email from the ASF dual-hosted git repository.
lihaosky pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.3 by this push:
new c659491d5f9 [FLINK-39145][tests] Skip counters accumulation when state
isn't initialized (#28397)
c659491d5f9 is described below
commit c659491d5f93d1360b2982398ab42c6a7e25e414
Author: Hao Li <[email protected]>
AuthorDate: Thu Jun 11 15:02:50 2026 -0700
[FLINK-39145][tests] Skip counters accumulation when state isn't
initialized (#28397)
Sink task might be cancelled before state was initialized, should be
handled as a valid scenario to avoid test failures due to null state.
(cherry picked from commit 04a31ba953f357cf50345b277227cd2ac930f28d)
Co-authored-by: Efrat Levitan <[email protected]>
---
.../flink/test/checkpointing/UnalignedCheckpointTestBase.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index efa88a570d6..80ab0bee405 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -1121,10 +1121,13 @@ abstract class UnalignedCheckpointTestBase {
@Override
public void close() throws Exception {
- numOutputCounter.add(state.numOutput);
- outOfOrderCounter.add(state.numOutOfOrderness);
- duplicatesCounter.add(state.numDuplicates);
- lostCounter.add(state.numLostValues);
+ // sink task might be cancelled before state was initialized
+ if (state != null) {
+ numOutputCounter.add(state.numOutput);
+ outOfOrderCounter.add(state.numOutOfOrderness);
+ duplicatesCounter.add(state.numDuplicates);
+ lostCounter.add(state.numLostValues);
+ }
if (getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() ==
0) {
numFailures.add(getRuntimeContext().getTaskInfo().getAttemptNumber());
}