This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8be9f21a9f5 IGNITE-28068 Expose async method to update low watermark
(#7712)
8be9f21a9f5 is described below
commit 8be9f21a9f59cebd23c04162476b583c5eb5f4a6
Author: Ivan Zlenko <[email protected]>
AuthorDate: Thu Mar 5 22:02:11 2026 +0500
IGNITE-28068 Expose async method to update low watermark (#7712)
---
.../ignite/internal/lowwatermark/LowWatermark.java | 17 ++++++-
.../internal/lowwatermark/LowWatermarkImpl.java | 6 +--
.../lowwatermark/LowWatermarkImplTest.java | 57 ++++++++++++++++++++++
.../internal/lowwatermark/TestLowWatermark.java | 6 ++-
4 files changed, 77 insertions(+), 9 deletions(-)
diff --git
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
index 3184458ab53..681124bdba8 100644
---
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
+++
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.lowwatermark;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.ignite.internal.event.EventProducer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -45,11 +46,23 @@ public interface LowWatermark extends
EventProducer<LowWatermarkEvent, LowWaterm
void getLowWatermarkSafe(Consumer<@Nullable HybridTimestamp> consumer);
/**
- * Updates the low watermark if it is higher than the current one.
+ * Updates the low watermark if it is higher than the current one.
Fire-and-forget version of
+ * {@link #updateLowWatermarkAsync(HybridTimestamp)}.
*
* @param newLowWatermark Candidate for update.
*/
- void updateLowWatermark(HybridTimestamp newLowWatermark);
+ default void updateLowWatermark(HybridTimestamp newLowWatermark) {
+ updateLowWatermarkAsync(newLowWatermark);
+ }
+
+ /**
+ * Updates the low watermark asynchronously if it is higher than the
current one. If no updates required the resulting future
+ * will complete immediately.
+ *
+ * @param newLowWatermark Candidate for update.
+ * @return Future that completes when the low watermark update is finished.
+ */
+ CompletableFuture<Void> updateLowWatermarkAsync(HybridTimestamp
newLowWatermark);
/**
* Sets the low watermark during node recovery.
diff --git
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
index a82849b7d88..5af9e48fe92 100644
---
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
+++
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
@@ -324,11 +324,7 @@ public class LowWatermarkImpl extends
AbstractEventProducer<LowWatermarkEvent, L
}
@Override
- public void updateLowWatermark(HybridTimestamp newLowWatermark) {
- updateLowWatermarkAsync(newLowWatermark);
- }
-
- CompletableFuture<Void> updateLowWatermarkAsync(HybridTimestamp
newLowWatermark) {
+ public CompletableFuture<Void> updateLowWatermarkAsync(HybridTimestamp
newLowWatermark) {
return inBusyLockAsync(busyLock, () -> {
LowWatermarkCandidate newLowWatermarkCandidate = new
LowWatermarkCandidate(newLowWatermark, new CompletableFuture<>());
LowWatermarkCandidate oldLowWatermarkCandidate;
diff --git
a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
index 0469165c3fd..2869a298f69 100644
---
a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
+++
b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
@@ -310,6 +310,63 @@ public class LowWatermarkImplTest extends
BaseIgniteAbstractTest {
assertThat(updateLowWatermarkFuture2, willTimeoutFast());
}
+ @Test
+ void testUpdateLowWatermarkAsync() {
+ assertThat(lowWatermark.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+
+ // Happy path: future completes successfully after update is persisted
and listeners are notified.
+ HybridTimestamp newLwm0 = clockService.now();
+
+ assertThat(lowWatermark.updateLowWatermarkAsync(newLwm0),
willCompleteSuccessfully());
+
+ assertEquals(newLwm0, lowWatermark.getLowWatermark());
+
+ // Higher value: future also completes successfully.
+ HybridTimestamp newLwm1 = clockService.now();
+
+ assertThat(lowWatermark.updateLowWatermarkAsync(newLwm1),
willCompleteSuccessfully());
+
+ assertEquals(newLwm1, lowWatermark.getLowWatermark());
+
+ // No-op: candidate is not higher than the current watermark, future
should complete immediately.
+ CompletableFuture<Void> noOpFuture =
lowWatermark.updateLowWatermarkAsync(newLwm0);
+
+ assertTrue(noOpFuture.isDone(), "No-op update future should complete
immediately");
+ assertThat(noOpFuture, willCompleteSuccessfully());
+ }
+
+ @Test
+ void testUpdateLowWatermarkAsyncCompletesAfterListeners() {
+ assertThat(lowWatermark.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+
+ var listenerFinishFuture = new CompletableFuture<Boolean>();
+
+ lowWatermark.listen(LOW_WATERMARK_CHANGED,
(ChangeLowWatermarkEventParameters parameters) -> listenerFinishFuture);
+
+ HybridTimestamp newLwm = clockService.now();
+
+ CompletableFuture<Void> asyncFuture =
lowWatermark.updateLowWatermarkAsync(newLwm);
+
+ // The async future should not complete until the listener completes.
+ assertThat(asyncFuture, willTimeoutFast());
+
+ listenerFinishFuture.complete(false);
+
+ assertThat(asyncFuture, willCompleteSuccessfully());
+ }
+
+ @Test
+ void testUpdateLowWatermarkAsyncPropagatesListenerError() {
+ String errorMessage = "test error";
+
+ lowWatermark.listen(LOW_WATERMARK_CHANGED, parameters ->
failedFuture(new RuntimeException(errorMessage)));
+
+ assertThat(
+ lowWatermark.updateLowWatermarkAsync(clockService.now()),
+ willThrowWithCauseOrSuppressed(RuntimeException.class,
errorMessage)
+ );
+ }
+
@Test
void testGetLowWatermarkFromListener() {
assertThat(lowWatermark.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
index 8af39abc56a..ad0bc30cede 100644
---
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
+++
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
@@ -71,11 +71,11 @@ public class TestLowWatermark extends
AbstractEventProducer<LowWatermarkEvent, L
}
@Override
- public void updateLowWatermark(HybridTimestamp newLowWatermark) {
+ public CompletableFuture<Void> updateLowWatermarkAsync(HybridTimestamp
newLowWatermark) {
var currentTs = ts;
if (currentTs == null || newLowWatermark.compareTo(currentTs) > 0) {
- supplyAsync(() -> updateAndNotifyInternal(newLowWatermark))
+ return supplyAsync(() -> updateAndNotifyInternal(newLowWatermark))
.thenCompose(Function.identity())
.whenComplete((unused, throwable) -> {
if (throwable != null) {
@@ -83,6 +83,8 @@ public class TestLowWatermark extends
AbstractEventProducer<LowWatermarkEvent, L
}
});
}
+
+ return nullCompletedFuture();
}
@Override