This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b2fcc49669 IGNITE-19984 Improve "dependingOn" documentation and add 
clarifying tests. (#2319)
b2fcc49669 is described below

commit b2fcc4966989535a1b33c626e09efe59ed4b8a7b
Author: Ivan Bessonov <[email protected]>
AuthorDate: Mon Jul 17 12:29:00 2023 +0300

    IGNITE-19984 Improve "dependingOn" documentation and add clarifying tests. 
(#2319)
---
 .../causality/IncrementalVersionedValue.java       | 14 +++++--
 .../causality/IncrementalVersionedValueTest.java   | 49 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 3 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
index 62e823b8f4..8cdd89c3cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/causality/IncrementalVersionedValue.java
@@ -63,7 +63,14 @@ public class IncrementalVersionedValue<T> implements 
VersionedValue<T> {
 
     /**
      * This registry chains two versioned values. The value, that uses this 
registry in the constructor, will be completed strictly after
-     * the value, passed into this method.
+     * the value, passed into this method, meaning that {@code 
resultVv.get(token).isDone();} will always imply
+     * {@code vv.get(token).isDone();} for the same token value.
+     *
+     * <p>While affecting the state of resulting futures, this dependency 
doesn't affect the order of {@link #update(long, BiFunction)}
+     * closures execution. These closures will still be called independently 
once the required parameter value is available.
+     *
+     * <p>In the case of "fresh" VV with no updates, first closure is always 
being executed synchronously inside of the
+     * {@link #update(long, BiFunction)} call.
      */
     public static Consumer<LongFunction<CompletableFuture<?>>> 
dependingOn(IncrementalVersionedValue<?> vv) {
         return callback -> vv.whenComplete((causalityToken, value, ex) -> 
callback.apply(causalityToken));
@@ -162,8 +169,9 @@ public class IncrementalVersionedValue<T> implements 
VersionedValue<T> {
      * previous token, then updater is used to process the exception and 
calculate a new value.<br> This method can be called multiple times
      * for the same token, and doesn't complete the future created for this 
token. The future is supposed to be completed by storage
      * revision update or a call of {@link #complete(long)} in this case. If 
this method has been called at least once on the given token,
-     * the updater will receive a value that was evaluated by updater on 
previous call, as intermediate result.<br> As the order of multiple
-     * calls of this method on the same token is unknown, operations done by 
the updater must be commutative. For example:
+     * the updater will receive a value that was evaluated by updater on 
previous call, as intermediate result. If no update were done on
+     * the given token, the updated will immediately receive the value from 
the previous token, if it's completed. <br> As the order of
+     * multiple calls of this method on the same token is unknown, operations 
done by the updater must be commutative. For example:
      * <ul>
      *     <li>this method was called for token N-1 and updater evaluated the 
value V1;</li>
      *     <li>a storage revision update happened;</li>
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
index 9e7a14d00c..5759e53d80 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/causality/IncrementalVersionedValueTest.java
@@ -19,9 +19,12 @@ package org.apache.ignite.internal.causality;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -31,10 +34,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -341,6 +346,50 @@ public class IncrementalVersionedValueTest {
         assertThat(future3, willBe(2));
     }
 
+    /**
+     * Tests that {@link 
IncrementalVersionedValue#dependingOn(IncrementalVersionedValue)} provides 
causality between 2 different values.
+     */
+    @RepeatedTest(100)
+    public void testDependingOn() {
+        var vv0 = new IncrementalVersionedValue<>(register, () -> 1);
+
+        var vv1 = new IncrementalVersionedValue<>(dependingOn(vv0), () -> 1);
+
+        int token = 1;
+
+        vv0.update(token, (i, e) -> supplyAsync(() -> i + 1));
+
+        vv1.update(token, (i, e) -> supplyAsync(() -> i + 1));
+
+        register.moveRevision(token);
+
+        assertThat(vv1.get(token), willCompleteSuccessfully());
+
+        assertTrue(vv0.get(token).isDone());
+    }
+
+    /**
+     * Tests that {@link IncrementalVersionedValue#update(long, BiFunction)} 
closure is called immediately when underlying value is
+     * accessible, i.e. when there were no other updates.
+     */
+    @Test
+    public void testImmediateUpdate() {
+        var vv = new IncrementalVersionedValue<>(register, () -> 1);
+
+        //noinspection unchecked
+        BiFunction<Integer, Throwable, CompletableFuture<Integer>> closure = 
mock(BiFunction.class);
+
+        when(closure.apply(any(), any())).thenReturn(completedFuture(null));
+
+        int token = 0;
+
+        vv.update(token, closure);
+
+        verify(closure).apply(eq(1), eq(null));
+
+        assertFalse(vv.get(token).isDone());
+    }
+
     /**
      * Test {@link IncrementalVersionedValue#whenComplete}.
      */

Reply via email to