This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new be9702a8623 [hotfix] Improve test coverage of
UnalignedCheckpointsInterruptibleTimersTest
be9702a8623 is described below
commit be9702a86231a1bf5aa3e4a07b5af26568eb968a
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed Jan 7 17:35:12 2026 +0100
[hotfix] Improve test coverage of
UnalignedCheckpointsInterruptibleTimersTest
Previous version was overloading more methods, and for example
bugs in the orginal #open method would not be covered by the old
version of this test. Bugs like for example logic of handling
ConfigOptions.
---
...nalignedCheckpointsInterruptibleTimersTest.java | 25 +++++-----------------
1 file changed, 5 insertions(+), 20 deletions(-)
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
index b73021125ff..10650dcb319 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.MailboxWatermarkProcessor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.Triggerable;
@@ -170,7 +169,6 @@ class UnalignedCheckpointsInterruptibleTimersTest {
private final Map<Instant, Integer> timersToRegister;
private transient @Nullable MailboxExecutor mailboxExecutor;
- private transient @Nullable MailboxWatermarkProcessor
watermarkProcessor;
MultipleTimersAtTheSameTimestamp() {
this(Collections.emptyMap());
@@ -181,18 +179,14 @@ class UnalignedCheckpointsInterruptibleTimersTest {
}
@Override
- public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
- this.mailboxExecutor = mailboxExecutor;
+ public boolean useInterruptibleTimers() {
+ return true;
}
@Override
- public void open() throws Exception {
- super.open();
- if (getTimeServiceManager().isPresent()) {
- this.watermarkProcessor =
- new MailboxWatermarkProcessor(
- output, mailboxExecutor,
getTimeServiceManager().get());
- }
+ public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
+ super.setMailboxExecutor(mailboxExecutor);
+ this.mailboxExecutor = mailboxExecutor;
}
@Override
@@ -212,15 +206,6 @@ class UnalignedCheckpointsInterruptibleTimersTest {
}
}
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- if (watermarkProcessor == null) {
- super.processWatermark(mark);
- } else {
- watermarkProcessor.emitWatermarkInsideMailbox(mark);
- }
- }
-
@Override
public void onEventTime(InternalTimer<String, String> timer) throws
Exception {
mailboxExecutor.execute(