This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1d44838da20 IGNITE-26950 Causality token mismatch error when zone
creation (#7009)
1d44838da20 is described below
commit 1d44838da200067d2b5ec69877a3d7150f2f48bb
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Wed Nov 19 23:13:15 2025 +0300
IGNITE-26950 Causality token mismatch error when zone creation (#7009)
---
.../internal/causality/BaseVersionedValue.java | 25 +++++++++----
.../causality/CompletableVersionedValue.java | 15 +++++---
.../internal/causality/CompletionListener.java | 4 +-
.../causality/IncrementalVersionedValue.java | 6 ++-
.../causality/IncrementalVersionedValueTest.java | 43 +++++++++++++++++++++-
.../internal/table/distributed/TableManager.java | 2 +
6 files changed, 76 insertions(+), 19 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/causality/BaseVersionedValue.java
b/modules/core/src/main/java/org/apache/ignite/internal/causality/BaseVersionedValue.java
index 7f5f4f8794d..5879156d6a3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/causality/BaseVersionedValue.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/causality/BaseVersionedValue.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.causality;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.function.Function.identity;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
@@ -37,6 +38,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Lazy;
import org.jetbrains.annotations.Nullable;
@@ -166,8 +168,9 @@ class BaseVersionedValue<T> implements VersionedValue<T> {
* completed.</p>
*
* @param causalityToken Causality token.
+ * @return A future that will be completed when the operation is done.
*/
- void complete(long causalityToken) {
+ CompletableFuture<Void> complete(long causalityToken) {
CompletableFuture<T> futureForToken;
readWriteLock.writeLock().lock();
@@ -198,7 +201,7 @@ class BaseVersionedValue<T> implements VersionedValue<T> {
readWriteLock.writeLock().unlock();
}
- notifyCompletionListeners(causalityToken, futureForToken);
+ return notifyCompletionListeners(causalityToken, futureForToken);
}
/**
@@ -212,8 +215,10 @@ class BaseVersionedValue<T> implements VersionedValue<T> {
* completed.</p>
*
* @param causalityToken Causality token.
+ * @param future Future to complete the Versioned Value with.
+ * @return A future that will be completed when the operation is done.
*/
- void complete(long causalityToken, CompletableFuture<T> future) {
+ CompletableFuture<Void> complete(long causalityToken, CompletableFuture<T>
future) {
assert future.isDone() : format("Future is not done during completion
[name={}, future={}]", name, future);
readWriteLock.writeLock().lock();
@@ -235,7 +240,7 @@ class BaseVersionedValue<T> implements VersionedValue<T> {
readWriteLock.writeLock().unlock();
}
- notifyCompletionListeners(causalityToken, future);
+ return notifyCompletionListeners(causalityToken, future);
}
/**
@@ -378,18 +383,22 @@ class BaseVersionedValue<T> implements VersionedValue<T> {
/**
* Notifies completion listeners.
*/
- private void notifyCompletionListeners(long causalityToken,
CompletableFuture<T> future) {
- future.whenComplete((v, t) -> {
+ private CompletableFuture<Void> notifyCompletionListeners(long
causalityToken, CompletableFuture<T> future) {
+ return future.handle((v, t) -> {
Throwable unpackedThrowable = t instanceof CompletionException ?
t.getCause() : t;
+ ArrayList<CompletableFuture<?>> futs = new ArrayList<>();
+
for (CompletionListener<T> listener : completionListeners) {
try {
- listener.whenComplete(causalityToken, v,
unpackedThrowable);
+ futs.add(listener.whenComplete(causalityToken, v,
unpackedThrowable));
} catch (Exception e) {
log.error("Exception when notifying a completion
listener", e);
}
}
- });
+
+ return CompletableFutures.allOf(futs);
+ }).thenCompose(identity());
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletableVersionedValue.java
b/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletableVersionedValue.java
index 41dfef712c2..3e009aa3921 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletableVersionedValue.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletableVersionedValue.java
@@ -90,9 +90,10 @@ public class CompletableVersionedValue<T> implements
VersionedValue<T> {
* <p>Calling this method will trigger the {@link #whenComplete} listeners
for the given token.
*
* @param causalityToken Causality token.
+ * @return A future that will be completed when all listeners complete.
*/
- public void complete(long causalityToken) {
- versionedValue.complete(causalityToken);
+ public CompletableFuture<Void> complete(long causalityToken) {
+ return versionedValue.complete(causalityToken);
}
/**
@@ -103,9 +104,10 @@ public class CompletableVersionedValue<T> implements
VersionedValue<T> {
*
* @param causalityToken Causality token.
* @param value Current value.
+ * @return A future that will be completed when all listeners complete.
*/
- public void complete(long causalityToken, T value) {
- versionedValue.complete(causalityToken,
CompletableFuture.completedFuture(value));
+ public CompletableFuture<Void> complete(long causalityToken, T value) {
+ return versionedValue.complete(causalityToken,
CompletableFuture.completedFuture(value));
}
/**
@@ -116,8 +118,9 @@ public class CompletableVersionedValue<T> implements
VersionedValue<T> {
*
* @param causalityToken Causality token.
* @param throwable An exception.
+ * @return A future that will be completed when all listeners complete.
*/
- public void completeExceptionally(long causalityToken, Throwable
throwable) {
- versionedValue.complete(causalityToken,
CompletableFuture.failedFuture(throwable));
+ public CompletableFuture<Void> completeExceptionally(long causalityToken,
Throwable throwable) {
+ return versionedValue.complete(causalityToken,
CompletableFuture.failedFuture(throwable));
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletionListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletionListener.java
index b08971a6e38..2e9298ee388 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletionListener.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/causality/CompletionListener.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.causality;
+import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.Nullable;
/**
@@ -32,6 +33,7 @@ public interface CompletionListener<T> {
* @param token Token for which a value has been completed.
* @param value Value that the Versioned Value was completed with.
* @param ex If not {@code null} - the Versioned Value has benn completed
with an exception.
+ * @return Future that signifies the end of the event execution.
*/
- void whenComplete(long token, @Nullable T value, @Nullable Throwable ex);
+ CompletableFuture<?> whenComplete(long token, @Nullable T value, @Nullable
Throwable ex);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
b/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
index c4b6a98a0a8..d533c532ca6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
@@ -297,9 +297,11 @@ public class IncrementalVersionedValue<T> implements
VersionedValue<T> {
if (updaterFuture.isDone()) {
// Since the future has already been completed, there's no
need to store a new future object in the history map and we can
// save a little bit of memory. This is useful when no
"update" calls have been made between two "complete" calls.
- updaterFuture.whenComplete((v, t) ->
versionedValue.complete(causalityToken, localUpdaterFuture));
+ updaterFuture = versionedValue.complete(causalityToken,
localUpdaterFuture)
+ .thenCompose(unused -> localUpdaterFuture);
} else {
- updaterFuture = updaterFuture.whenComplete((v, t) ->
versionedValue.complete(causalityToken, localUpdaterFuture));
+ updaterFuture = updaterFuture.thenCompose(v ->
versionedValue.complete(causalityToken, localUpdaterFuture))
+ .thenCompose(unused -> localUpdaterFuture);
}
return updaterFuture;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
index f605583c356..8ddd8e61c92 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
@@ -194,7 +194,11 @@ public class IncrementalVersionedValueTest extends
BaseIgniteAbstractTest {
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
- vv.whenComplete((t, v, e) -> exceptionRef.set(e));
+ vv.whenComplete((t, v, e) -> {
+ exceptionRef.set(e);
+
+ return nullCompletedFuture();
+ });
vv.complete(0L);
@@ -217,7 +221,11 @@ public class IncrementalVersionedValueTest extends
BaseIgniteAbstractTest {
IncrementalVersionedValue<Map<UUID, String>> schemasVv = new
IncrementalVersionedValue<>("test", register, HashMap::new);
IncrementalVersionedValue<Map<UUID, String>> assignmentsVv = new
IncrementalVersionedValue<>("test", register, HashMap::new);
- schemasVv.whenComplete((token, value, ex) -> tablesVv.complete(token));
+ schemasVv.whenComplete((token, value, ex) -> {
+ tablesVv.complete(token);
+
+ return nullCompletedFuture();
+ });
BiFunction<Long, UUID, CompletableFuture<String>> schemaRegistry =
(token, uuid) -> schemasVv.get(token).thenApply(schemas ->
schemas.get(uuid));
@@ -403,6 +411,8 @@ public class IncrementalVersionedValueTest extends
BaseIgniteAbstractTest {
CompletionListener<Integer> listener = mock(CompletionListener.class);
+ when(listener.whenComplete(anyLong(), any(),
any())).thenReturn(nullCompletedFuture());
+
vv.whenComplete(listener);
// Test complete.
@@ -562,4 +572,33 @@ public class IncrementalVersionedValueTest extends
BaseIgniteAbstractTest {
assertEquals(expectedDefault == null ? null : expectedDefault + 2,
f.join());
}
+
+ @Test
+ public void testDependingVv() {
+ var versionedVv = new
IncrementalVersionedValue<Integer>("versionedVv", register);
+ var dependentVv = new IncrementalVersionedValue<Void>("dependentVv",
dependingOn(versionedVv));
+
+ CompletableFuture<Integer> versionedVvUpdateResult = new
CompletableFuture<>();
+
+ versionedVv.update(0, (integer, throwable) -> versionedVvUpdateResult);
+
+ CompletableFuture<Void> dependentVvUpdateResult = new
CompletableFuture<>();
+
+ dependentVv.update(0, (integer, throwable) -> dependentVvUpdateResult);
+
+ CompletableFuture<?> registerFut = register.updateRevision(0L);
+
+ assertFalse(registerFut.isDone());
+
+ IgniteTestUtils.runAsync(() ->
versionedVvUpdateResult.complete(TEST_VALUE));
+
+ assertThat(versionedVv.get(0), willCompleteSuccessfully());
+ assertFalse(dependentVv.get(0).isDone());
+ assertFalse(registerFut.isDone());
+
+ IgniteTestUtils.runAsync(() -> dependentVvUpdateResult.complete(null));
+
+ assertThat(dependentVv.get(0), willCompleteSuccessfully());
+ assertThat(registerFut, willCompleteSuccessfully());
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e3fb64f6f06..2df4c02721e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -2274,6 +2274,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
} else {
getLatestTableFuture.completeExceptionally(th);
}
+
+ return nullCompletedFuture();
};
assignmentsUpdatedVv.whenComplete(tablesListener);