This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b2fcc49669 IGNITE-19984 Improve "dependingOn" documentation and add
clarifying tests. (#2319)
b2fcc49669 is described below
commit b2fcc4966989535a1b33c626e09efe59ed4b8a7b
Author: Ivan Bessonov <[email protected]>
AuthorDate: Mon Jul 17 12:29:00 2023 +0300
IGNITE-19984 Improve "dependingOn" documentation and add clarifying tests.
(#2319)
---
.../causality/IncrementalVersionedValue.java | 14 +++++--
.../causality/IncrementalVersionedValueTest.java | 49 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 3 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
b/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
index 62e823b8f4..8cdd89c3cf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
@@ -63,7 +63,14 @@ public class IncrementalVersionedValue<T> implements
VersionedValue<T> {
/**
* This registry chains two versioned values. The value, that uses this
registry in the constructor, will be completed strictly after
- * the value, passed into this method.
+ * the value, passed into this method, meaning that {@code
resultVv.get(token).isDone();} will always imply
+ * {@code vv.get(token).isDone();} for the same token value.
+ *
+ * <p>While affecting the state of resulting futures, this dependency
doesn't affect the order of {@link #update(long, BiFunction)}
+ * closures execution. These closures will still be called independently
once the required parameter value is available.
+ *
+ * <p>In the case of "fresh" VV with no updates, first closure is always
being executed synchronously inside of the
+ * {@link #update(long, BiFunction)} call.
*/
public static Consumer<LongFunction<CompletableFuture<?>>>
dependingOn(IncrementalVersionedValue<?> vv) {
return callback -> vv.whenComplete((causalityToken, value, ex) ->
callback.apply(causalityToken));
@@ -162,8 +169,9 @@ public class IncrementalVersionedValue<T> implements
VersionedValue<T> {
* previous token, then updater is used to process the exception and
calculate a new value.<br> This method can be called multiple times
* for the same token, and doesn't complete the future created for this
token. The future is supposed to be completed by storage
* revision update or a call of {@link #complete(long)} in this case. If
this method has been called at least once on the given token,
- * the updater will receive a value that was evaluated by updater on
previous call, as intermediate result.<br> As the order of multiple
- * calls of this method on the same token is unknown, operations done by
the updater must be commutative. For example:
+ * the updater will receive a value that was evaluated by updater on
previous call, as intermediate result. If no update were done on
+ * the given token, the updated will immediately receive the value from
the previous token, if it's completed. <br> As the order of
+ * multiple calls of this method on the same token is unknown, operations
done by the updater must be commutative. For example:
* <ul>
* <li>this method was called for token N-1 and updater evaluated the
value V1;</li>
* <li>a storage revision update happened;</li>
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
index 9e7a14d00c..5759e53d80 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
@@ -19,9 +19,12 @@ package org.apache.ignite.internal.causality;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static
org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -31,10 +34,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashMap;
@@ -341,6 +346,50 @@ public class IncrementalVersionedValueTest {
assertThat(future3, willBe(2));
}
+ /**
+ * Tests that {@link
IncrementalVersionedValue#dependingOn(IncrementalVersionedValue)} provides
causality between 2 different values.
+ */
+ @RepeatedTest(100)
+ public void testDependingOn() {
+ var vv0 = new IncrementalVersionedValue<>(register, () -> 1);
+
+ var vv1 = new IncrementalVersionedValue<>(dependingOn(vv0), () -> 1);
+
+ int token = 1;
+
+ vv0.update(token, (i, e) -> supplyAsync(() -> i + 1));
+
+ vv1.update(token, (i, e) -> supplyAsync(() -> i + 1));
+
+ register.moveRevision(token);
+
+ assertThat(vv1.get(token), willCompleteSuccessfully());
+
+ assertTrue(vv0.get(token).isDone());
+ }
+
+ /**
+ * Tests that {@link IncrementalVersionedValue#update(long, BiFunction)}
closure is called immediately when underlying value is
+ * accessible, i.e. when there were no other updates.
+ */
+ @Test
+ public void testImmediateUpdate() {
+ var vv = new IncrementalVersionedValue<>(register, () -> 1);
+
+ //noinspection unchecked
+ BiFunction<Integer, Throwable, CompletableFuture<Integer>> closure =
mock(BiFunction.class);
+
+ when(closure.apply(any(), any())).thenReturn(completedFuture(null));
+
+ int token = 0;
+
+ vv.update(token, closure);
+
+ verify(closure).apply(eq(1), eq(null));
+
+ assertFalse(vv.get(token).isDone());
+ }
+
/**
* Test {@link IncrementalVersionedValue#whenComplete}.
*/