This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8be9f21a9f5 IGNITE-28068 Expose async method to update low watermark 
(#7712)
8be9f21a9f5 is described below

commit 8be9f21a9f59cebd23c04162476b583c5eb5f4a6
Author: Ivan Zlenko <[email protected]>
AuthorDate: Thu Mar 5 22:02:11 2026 +0500

    IGNITE-28068 Expose async method to update low watermark (#7712)
---
 .../ignite/internal/lowwatermark/LowWatermark.java | 17 ++++++-
 .../internal/lowwatermark/LowWatermarkImpl.java    |  6 +--
 .../lowwatermark/LowWatermarkImplTest.java         | 57 ++++++++++++++++++++++
 .../internal/lowwatermark/TestLowWatermark.java    |  6 ++-
 4 files changed, 77 insertions(+), 9 deletions(-)

diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
index 3184458ab53..681124bdba8 100644
--- 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.lowwatermark;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import org.apache.ignite.internal.event.EventProducer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -45,11 +46,23 @@ public interface LowWatermark extends 
EventProducer<LowWatermarkEvent, LowWaterm
     void getLowWatermarkSafe(Consumer<@Nullable HybridTimestamp> consumer);
 
     /**
-     * Updates the low watermark if it is higher than the current one.
+     * Updates the low watermark if it is higher than the current one. 
Fire-and-forget version of
+     * {@link #updateLowWatermarkAsync(HybridTimestamp)}.
      *
      * @param newLowWatermark Candidate for update.
      */
-    void updateLowWatermark(HybridTimestamp newLowWatermark);
+    default void updateLowWatermark(HybridTimestamp newLowWatermark) {
+        updateLowWatermarkAsync(newLowWatermark);
+    }
+
+    /**
+     * Updates the low watermark asynchronously if it is higher than the 
current one. If no updates required the resulting future
+     * will complete immediately.
+     *
+     * @param newLowWatermark Candidate for update.
+     * @return Future that completes when the low watermark update is finished.
+     */
+    CompletableFuture<Void> updateLowWatermarkAsync(HybridTimestamp 
newLowWatermark);
 
     /**
      * Sets the low watermark during node recovery.
diff --git 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
index a82849b7d88..5af9e48fe92 100644
--- 
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
+++ 
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
@@ -324,11 +324,7 @@ public class LowWatermarkImpl extends 
AbstractEventProducer<LowWatermarkEvent, L
     }
 
     @Override
-    public void updateLowWatermark(HybridTimestamp newLowWatermark) {
-        updateLowWatermarkAsync(newLowWatermark);
-    }
-
-    CompletableFuture<Void> updateLowWatermarkAsync(HybridTimestamp 
newLowWatermark) {
+    public CompletableFuture<Void> updateLowWatermarkAsync(HybridTimestamp 
newLowWatermark) {
         return inBusyLockAsync(busyLock, () -> {
             LowWatermarkCandidate newLowWatermarkCandidate = new 
LowWatermarkCandidate(newLowWatermark, new CompletableFuture<>());
             LowWatermarkCandidate oldLowWatermarkCandidate;
diff --git 
a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
 
b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
index 0469165c3fd..2869a298f69 100644
--- 
a/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
+++ 
b/modules/low-watermark/src/test/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImplTest.java
@@ -310,6 +310,63 @@ public class LowWatermarkImplTest extends 
BaseIgniteAbstractTest {
         assertThat(updateLowWatermarkFuture2, willTimeoutFast());
     }
 
+    @Test
+    void testUpdateLowWatermarkAsync() {
+        assertThat(lowWatermark.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        // Happy path: future completes successfully after update is persisted 
and listeners are notified.
+        HybridTimestamp newLwm0 = clockService.now();
+
+        assertThat(lowWatermark.updateLowWatermarkAsync(newLwm0), 
willCompleteSuccessfully());
+
+        assertEquals(newLwm0, lowWatermark.getLowWatermark());
+
+        // Higher value: future also completes successfully.
+        HybridTimestamp newLwm1 = clockService.now();
+
+        assertThat(lowWatermark.updateLowWatermarkAsync(newLwm1), 
willCompleteSuccessfully());
+
+        assertEquals(newLwm1, lowWatermark.getLowWatermark());
+
+        // No-op: candidate is not higher than the current watermark, future 
should complete immediately.
+        CompletableFuture<Void> noOpFuture = 
lowWatermark.updateLowWatermarkAsync(newLwm0);
+
+        assertTrue(noOpFuture.isDone(), "No-op update future should complete 
immediately");
+        assertThat(noOpFuture, willCompleteSuccessfully());
+    }
+
+    @Test
+    void testUpdateLowWatermarkAsyncCompletesAfterListeners() {
+        assertThat(lowWatermark.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        var listenerFinishFuture = new CompletableFuture<Boolean>();
+
+        lowWatermark.listen(LOW_WATERMARK_CHANGED, 
(ChangeLowWatermarkEventParameters parameters) -> listenerFinishFuture);
+
+        HybridTimestamp newLwm = clockService.now();
+
+        CompletableFuture<Void> asyncFuture = 
lowWatermark.updateLowWatermarkAsync(newLwm);
+
+        // The async future should not complete until the listener completes.
+        assertThat(asyncFuture, willTimeoutFast());
+
+        listenerFinishFuture.complete(false);
+
+        assertThat(asyncFuture, willCompleteSuccessfully());
+    }
+
+    @Test
+    void testUpdateLowWatermarkAsyncPropagatesListenerError() {
+        String errorMessage = "test error";
+
+        lowWatermark.listen(LOW_WATERMARK_CHANGED, parameters -> 
failedFuture(new RuntimeException(errorMessage)));
+
+        assertThat(
+                lowWatermark.updateLowWatermarkAsync(clockService.now()),
+                willThrowWithCauseOrSuppressed(RuntimeException.class, 
errorMessage)
+        );
+    }
+
     @Test
     void testGetLowWatermarkFromListener() {
         assertThat(lowWatermark.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
diff --git 
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
 
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
index 8af39abc56a..ad0bc30cede 100644
--- 
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
+++ 
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
@@ -71,11 +71,11 @@ public class TestLowWatermark extends 
AbstractEventProducer<LowWatermarkEvent, L
     }
 
     @Override
-    public void updateLowWatermark(HybridTimestamp newLowWatermark) {
+    public CompletableFuture<Void> updateLowWatermarkAsync(HybridTimestamp 
newLowWatermark) {
         var currentTs = ts;
 
         if (currentTs == null || newLowWatermark.compareTo(currentTs) > 0) {
-            supplyAsync(() -> updateAndNotifyInternal(newLowWatermark))
+            return supplyAsync(() -> updateAndNotifyInternal(newLowWatermark))
                     .thenCompose(Function.identity())
                     .whenComplete((unused, throwable) -> {
                         if (throwable != null) {
@@ -83,6 +83,8 @@ public class TestLowWatermark extends 
AbstractEventProducer<LowWatermarkEvent, L
                         }
                     });
         }
+
+        return nullCompletedFuture();
     }
 
     @Override

Reply via email to