This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d44838da20 IGNITE-26950 Causality token mismatch error when zone 
creation (#7009)
1d44838da20 is described below

commit 1d44838da200067d2b5ec69877a3d7150f2f48bb
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Wed Nov 19 23:13:15 2025 +0300

    IGNITE-26950 Causality token mismatch error when zone creation (#7009)
---
 .../internal/causality/BaseVersionedValue.java     | 25 +++++++++----
 .../causality/CompletableVersionedValue.java       | 15 +++++---
 .../internal/causality/CompletionListener.java     |  4 +-
 .../causality/IncrementalVersionedValue.java       |  6 ++-
 .../causality/IncrementalVersionedValueTest.java   | 43 +++++++++++++++++++++-
 .../internal/table/distributed/TableManager.java   |  2 +
 6 files changed, 76 insertions(+), 19 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/BaseVersionedValue.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/BaseVersionedValue.java
index 7f5f4f8794d..5879156d6a3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/BaseVersionedValue.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/BaseVersionedValue.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.causality;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.function.Function.identity;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
 
@@ -37,6 +38,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.Lazy;
 import org.jetbrains.annotations.Nullable;
 
@@ -166,8 +168,9 @@ class BaseVersionedValue<T> implements VersionedValue<T> {
      * completed.</p>
      *
      * @param causalityToken Causality token.
+     * @return A future that will be completed when the operation is done.
      */
-    void complete(long causalityToken) {
+    CompletableFuture<Void> complete(long causalityToken) {
         CompletableFuture<T> futureForToken;
 
         readWriteLock.writeLock().lock();
@@ -198,7 +201,7 @@ class BaseVersionedValue<T> implements VersionedValue<T> {
             readWriteLock.writeLock().unlock();
         }
 
-        notifyCompletionListeners(causalityToken, futureForToken);
+        return notifyCompletionListeners(causalityToken, futureForToken);
     }
 
     /**
@@ -212,8 +215,10 @@ class BaseVersionedValue<T> implements VersionedValue<T> {
      * completed.</p>
      *
      * @param causalityToken Causality token.
+     * @param future Future to complete the Versioned Value with.
+     * @return A future that will be completed when the operation is done.
      */
-    void complete(long causalityToken, CompletableFuture<T> future) {
+    CompletableFuture<Void> complete(long causalityToken, CompletableFuture<T> 
future) {
         assert future.isDone() : format("Future is not done during completion 
[name={}, future={}]", name, future);
 
         readWriteLock.writeLock().lock();
@@ -235,7 +240,7 @@ class BaseVersionedValue<T> implements VersionedValue<T> {
             readWriteLock.writeLock().unlock();
         }
 
-        notifyCompletionListeners(causalityToken, future);
+        return notifyCompletionListeners(causalityToken, future);
     }
 
     /**
@@ -378,18 +383,22 @@ class BaseVersionedValue<T> implements VersionedValue<T> {
     /**
      * Notifies completion listeners.
      */
-    private void notifyCompletionListeners(long causalityToken, 
CompletableFuture<T> future) {
-        future.whenComplete((v, t) -> {
+    private CompletableFuture<Void> notifyCompletionListeners(long 
causalityToken, CompletableFuture<T> future) {
+        return future.handle((v, t) -> {
             Throwable unpackedThrowable = t instanceof CompletionException ? 
t.getCause() : t;
 
+            ArrayList<CompletableFuture<?>> futs = new ArrayList<>();
+
             for (CompletionListener<T> listener : completionListeners) {
                 try {
-                    listener.whenComplete(causalityToken, v, 
unpackedThrowable);
+                    futs.add(listener.whenComplete(causalityToken, v, 
unpackedThrowable));
                 } catch (Exception e) {
                     log.error("Exception when notifying a completion 
listener", e);
                 }
             }
-        });
+
+            return CompletableFutures.allOf(futs);
+        }).thenCompose(identity());
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletableVersionedValue.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletableVersionedValue.java
index 41dfef712c2..3e009aa3921 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletableVersionedValue.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletableVersionedValue.java
@@ -90,9 +90,10 @@ public class CompletableVersionedValue<T> implements 
VersionedValue<T> {
      * <p>Calling this method will trigger the {@link #whenComplete} listeners 
for the given token.
      *
      * @param causalityToken Causality token.
+     * @return A future that will be completed when all listeners complete.
      */
-    public void complete(long causalityToken) {
-        versionedValue.complete(causalityToken);
+    public CompletableFuture<Void> complete(long causalityToken) {
+        return versionedValue.complete(causalityToken);
     }
 
     /**
@@ -103,9 +104,10 @@ public class CompletableVersionedValue<T> implements 
VersionedValue<T> {
      *
      * @param causalityToken Causality token.
      * @param value Current value.
+     * @return A future that will be completed when all listeners complete.
      */
-    public void complete(long causalityToken, T value) {
-        versionedValue.complete(causalityToken, 
CompletableFuture.completedFuture(value));
+    public CompletableFuture<Void> complete(long causalityToken, T value) {
+        return versionedValue.complete(causalityToken, 
CompletableFuture.completedFuture(value));
     }
 
     /**
@@ -116,8 +118,9 @@ public class CompletableVersionedValue<T> implements 
VersionedValue<T> {
      *
      * @param causalityToken Causality token.
      * @param throwable An exception.
+     * @return A future that will be completed when all listeners complete.
      */
-    public void completeExceptionally(long causalityToken, Throwable 
throwable) {
-        versionedValue.complete(causalityToken, 
CompletableFuture.failedFuture(throwable));
+    public CompletableFuture<Void> completeExceptionally(long causalityToken, 
Throwable throwable) {
+        return versionedValue.complete(causalityToken, 
CompletableFuture.failedFuture(throwable));
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletionListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletionListener.java
index b08971a6e38..2e9298ee388 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletionListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletionListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.causality;
 
+import java.util.concurrent.CompletableFuture;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -32,6 +33,7 @@ public interface CompletionListener<T> {
      * @param token Token for which a value has been completed.
      * @param value Value that the Versioned Value was completed with.
      * @param ex If not {@code null} - the Versioned Value has benn completed 
with an exception.
+     * @return Future that signifies the end of the event execution.
      */
-    void whenComplete(long token, @Nullable T value, @Nullable Throwable ex);
+    CompletableFuture<?> whenComplete(long token, @Nullable T value, @Nullable 
Throwable ex);
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
index c4b6a98a0a8..d533c532ca6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
@@ -297,9 +297,11 @@ public class IncrementalVersionedValue<T> implements 
VersionedValue<T> {
             if (updaterFuture.isDone()) {
                 // Since the future has already been completed, there's no 
need to store a new future object in the history map and we can
                 // save a little bit of memory. This is useful when no 
"update" calls have been made between two "complete" calls.
-                updaterFuture.whenComplete((v, t) -> 
versionedValue.complete(causalityToken, localUpdaterFuture));
+                updaterFuture = versionedValue.complete(causalityToken, 
localUpdaterFuture)
+                        .thenCompose(unused -> localUpdaterFuture);
             } else {
-                updaterFuture = updaterFuture.whenComplete((v, t) -> 
versionedValue.complete(causalityToken, localUpdaterFuture));
+                updaterFuture = updaterFuture.thenCompose(v -> 
versionedValue.complete(causalityToken, localUpdaterFuture))
+                        .thenCompose(unused -> localUpdaterFuture);
             }
 
             return updaterFuture;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
index f605583c356..8ddd8e61c92 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
@@ -194,7 +194,11 @@ public class IncrementalVersionedValueTest extends 
BaseIgniteAbstractTest {
 
         AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
 
-        vv.whenComplete((t, v, e) -> exceptionRef.set(e));
+        vv.whenComplete((t, v, e) -> {
+            exceptionRef.set(e);
+
+            return nullCompletedFuture();
+        });
 
         vv.complete(0L);
 
@@ -217,7 +221,11 @@ public class IncrementalVersionedValueTest extends 
BaseIgniteAbstractTest {
         IncrementalVersionedValue<Map<UUID, String>> schemasVv = new 
IncrementalVersionedValue<>("test", register, HashMap::new);
         IncrementalVersionedValue<Map<UUID, String>> assignmentsVv = new 
IncrementalVersionedValue<>("test", register, HashMap::new);
 
-        schemasVv.whenComplete((token, value, ex) -> tablesVv.complete(token));
+        schemasVv.whenComplete((token, value, ex) -> {
+            tablesVv.complete(token);
+
+            return nullCompletedFuture();
+        });
 
         BiFunction<Long, UUID, CompletableFuture<String>> schemaRegistry =
                 (token, uuid) -> schemasVv.get(token).thenApply(schemas -> 
schemas.get(uuid));
@@ -403,6 +411,8 @@ public class IncrementalVersionedValueTest extends 
BaseIgniteAbstractTest {
 
         CompletionListener<Integer> listener = mock(CompletionListener.class);
 
+        when(listener.whenComplete(anyLong(), any(), 
any())).thenReturn(nullCompletedFuture());
+
         vv.whenComplete(listener);
 
         // Test complete.
@@ -562,4 +572,33 @@ public class IncrementalVersionedValueTest extends 
BaseIgniteAbstractTest {
 
         assertEquals(expectedDefault == null ? null : expectedDefault + 2, 
f.join());
     }
+
+    @Test
+    public void testDependingVv() {
+        var versionedVv = new 
IncrementalVersionedValue<Integer>("versionedVv", register);
+        var dependentVv = new IncrementalVersionedValue<Void>("dependentVv", 
dependingOn(versionedVv));
+
+        CompletableFuture<Integer> versionedVvUpdateResult = new 
CompletableFuture<>();
+
+        versionedVv.update(0, (integer, throwable) -> versionedVvUpdateResult);
+
+        CompletableFuture<Void> dependentVvUpdateResult = new 
CompletableFuture<>();
+
+        dependentVv.update(0, (integer, throwable) -> dependentVvUpdateResult);
+
+        CompletableFuture<?> registerFut = register.updateRevision(0L);
+
+        assertFalse(registerFut.isDone());
+
+        IgniteTestUtils.runAsync(() -> 
versionedVvUpdateResult.complete(TEST_VALUE));
+
+        assertThat(versionedVv.get(0), willCompleteSuccessfully());
+        assertFalse(dependentVv.get(0).isDone());
+        assertFalse(registerFut.isDone());
+
+        IgniteTestUtils.runAsync(() -> dependentVvUpdateResult.complete(null));
+
+        assertThat(dependentVv.get(0), willCompleteSuccessfully());
+        assertThat(registerFut, willCompleteSuccessfully());
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e3fb64f6f06..2df4c02721e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -2274,6 +2274,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             } else {
                 getLatestTableFuture.completeExceptionally(th);
             }
+
+            return nullCompletedFuture();
         };
 
         assignmentsUpdatedVv.whenComplete(tablesListener);

Reply via email to