This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d96fbba77f IGNITE-18162 Fix a race in PendingComparableValuesTracker 
and simplify code. Fixes #1343
d96fbba77f is described below

commit d96fbba77f9d5f40d233df7235b39c0e7cbed8d5
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Nov 15 19:43:39 2022 +0200

    IGNITE-18162 Fix a race in PendingComparableValuesTracker and simplify 
code. Fixes #1343
    
    Signed-off-by: Slava Koptilin <[email protected]>
---
 docs/_docs/thin-clients/index.adoc                 |  16 +++-
 .../util/PendingComparableValuesTracker.java       |  87 +++++++----------
 .../util/PendingComparableValuesTrackerTest.java   | 106 ++++++++++++---------
 3 files changed, 113 insertions(+), 96 deletions(-)

diff --git a/docs/_docs/thin-clients/index.adoc 
b/docs/_docs/thin-clients/index.adoc
index 0c53246ae2..dcd8e5dfc3 100644
--- a/docs/_docs/thin-clients/index.adoc
+++ b/docs/_docs/thin-clients/index.adoc
@@ -1,3 +1,17 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
 = Client Configurations
 
 Apache Ignite 3 clients are lightweight clients that connect to the cluster 
via a standard socket connection. It does not become a part of the cluster 
topology, never holds any data, and is not used as a destination for compute 
calculations.
@@ -527,4 +541,4 @@ tab:C++[]
 // Compute is not yet supported in C++
 ----
 
---
\ No newline at end of file
+--
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
index a976c01e51..545e8d0d09 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
@@ -19,23 +19,32 @@ package org.apache.ignite.internal.util;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 
-import java.util.Collection;
-import java.util.Map;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Tracker that stores comparable value internally, this value can grow when 
{@link #update(Comparable)} method is called. The tracker gives
  * ability to wait for certain value, see {@link #waitFor(Comparable)}.
  */
 public class PendingComparableValuesTracker<T extends Comparable<T>> {
+    private static final VarHandle CURRENT;
+
+    static {
+        try {
+            CURRENT = 
MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class, 
"current", Comparable.class);
+        } catch (ReflectiveOperationException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
     /** Map of comparable values to corresponding futures. */
-    private final ConcurrentSkipListMap<T, 
Collection<CompletableFuture<Void>>> valueFutures = new 
ConcurrentSkipListMap<>();
+    private final ConcurrentSkipListMap<T, CompletableFuture<Void>> 
valueFutures = new ConcurrentSkipListMap<>();
 
     /** Current value. */
-    public final AtomicReference<T> current;
+    private volatile T current;
 
     /**
      * Constructor with initial value.
@@ -43,7 +52,7 @@ public class PendingComparableValuesTracker<T extends 
Comparable<T>> {
      * @param initialValue Initial value.
      */
     public PendingComparableValuesTracker(T initialValue) {
-        current = new AtomicReference<>(initialValue);
+        current = initialValue;
     }
 
     /**
@@ -53,69 +62,43 @@ public class PendingComparableValuesTracker<T extends 
Comparable<T>> {
      * @param newValue New value.
      */
     public void update(T newValue) {
-        for (Map.Entry<T, Collection<CompletableFuture<Void>>> e : 
valueFutures.entrySet()) {
-            if (newValue.compareTo(e.getKey()) >= 0) {
-                valueFutures.compute(e.getKey(), (k, v) -> {
-                    if (v != null) {
-                        v.forEach(f -> f.complete(null));
-                    }
-
-                    return null;
-                });
-            } else {
+        while (true) {
+            T current = this.current;
+
+            if (newValue.compareTo(current) <= 0) {
                 break;
             }
-        }
 
-        while (true) {
-            T current = this.current.get();
-
-            if (newValue.compareTo(current) > 0) {
-                if (this.current.compareAndSet(current, newValue)) {
-                    return;
-                }
-            } else {
-                return;
+            if (CURRENT.compareAndSet(this, current, newValue)) {
+                ConcurrentNavigableMap<T, CompletableFuture<Void>> 
smallerFutures = valueFutures.headMap(newValue, true);
+
+                smallerFutures.forEach((k, f) -> f.complete(null));
+
+                smallerFutures.clear();
+
+                break;
             }
         }
     }
 
     /**
-     * Provides the future that is completed when this tracker's internal 
value reaches given one. If the internal value is greater or
-     * equal then the given one, returns completed future.
+     * Provides the future that is completed when this tracker's internal 
value reaches given one. If the internal value is greater or equal
+     * then the given one, returns completed future.
      *
      * @param valueToWait Value to wait.
      * @return Future.
      */
     public CompletableFuture<Void> waitFor(T valueToWait) {
-        if (current.get().compareTo(valueToWait) >= 0) {
+        if (current.compareTo(valueToWait) >= 0) {
             return completedFuture(null);
         }
 
-        CompletableFuture<Void> future = new CompletableFuture<>();
-
-        valueFutures.compute(valueToWait, (k, v) -> {
-            if (v == null) {
-                v = new ConcurrentLinkedDeque<>();
-            }
-
-            v.add(future);
+        CompletableFuture<Void> future = 
valueFutures.computeIfAbsent(valueToWait, k -> new CompletableFuture<>());
 
-            return v;
-        });
-
-        if (current.get().compareTo(valueToWait) >= 0) {
+        if (current.compareTo(valueToWait) >= 0) {
             future.complete(null);
 
-            valueFutures.compute(valueToWait, (k, v) -> {
-                if (v == null) {
-                    return null;
-                } else {
-                    v.remove(future);
-                }
-
-                return v;
-            });
+            valueFutures.remove(valueToWait);
         }
 
         return future;
@@ -127,6 +110,6 @@ public class PendingComparableValuesTracker<T extends 
Comparable<T>> {
      * @return Current value.
      */
     public T current() {
-        return current.get();
+        return current;
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
index d61a211569..d30aeeb1ef 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
@@ -17,24 +17,25 @@
 
 package org.apache.ignite.internal.util;
 
-import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
+import java.util.Collections;
 import java.util.List;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -78,57 +79,76 @@ public class PendingComparableValuesTrackerTest {
 
         PendingComparableValuesTracker<HybridTimestamp> tracker = new 
PendingComparableValuesTracker<>(clock.now());
 
-        int threads = Runtime.getRuntime().availableProcessors() * 2;
+        int threads = Runtime.getRuntime().availableProcessors();
 
-        Collection<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>> 
allFutures = new ConcurrentLinkedDeque<>();
+        List<CompletableFuture<Void>> allFutures = 
Collections.synchronizedList(new ArrayList<>());
 
-        int iterations = 10_000;
+        int iterations = 1_000;
 
-        runMultiThreaded(
-                () -> {
-                    List<IgniteBiTuple<CompletableFuture<Void>, 
HybridTimestamp>> prevFutures = new ArrayList<>();
-                    ThreadLocalRandom random = ThreadLocalRandom.current();
+        runMultiThreaded(() -> {
+            NavigableMap<HybridTimestamp, CompletableFuture<Void>> prevFutures 
= new TreeMap<>();
 
-                    for (int i = 0; i < iterations; i++) {
-                        HybridTimestamp now = clock.now();
-                        tracker.update(now);
-                        HybridTimestamp timestampToWait =
-                            new HybridTimestamp(now.getPhysical() + 1, 
now.getLogical() + random.nextInt(1000));
+            ThreadLocalRandom random = ThreadLocalRandom.current();
 
-                        CompletableFuture<Void> future = 
tracker.waitFor(timestampToWait);
+            for (int i = 0; i < iterations; i++) {
+                HybridTimestamp now = clock.now();
 
-                        IgniteBiTuple<CompletableFuture<Void>, 
HybridTimestamp> pair = new IgniteBiTuple<>(future, timestampToWait);
+                tracker.update(now);
 
-                        prevFutures.add(pair);
-                        allFutures.add(pair);
+                HybridTimestamp timestampToWait =
+                        new HybridTimestamp(now.getPhysical() + 1, 
now.getLogical() + random.nextInt(1000));
 
-                        if (i % 10 == 0) {
-                            for 
(Iterator<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>> it = 
prevFutures.iterator();
-                                    it.hasNext();) {
-                                IgniteBiTuple<CompletableFuture<Void>, 
HybridTimestamp> t = it.next();
+                CompletableFuture<Void> future = 
tracker.waitFor(timestampToWait);
 
-                                if (t.get2().compareTo(now) <= 0) {
-                                    assertTrue(t.get1().isDone(), "now=" + now 
+ ", ts=" + t.get2() + ", trackerTs=" + tracker.current());
+                prevFutures.put(timestampToWait, future);
 
-                                    it.remove();
-                                }
-                            }
-                        }
-                    }
+                allFutures.add(future);
 
-                    return null;
-                },
-                threads,
-                "trackableHybridClockTest"
-        );
+                if (i % 10 == 0) {
+                    SortedMap<HybridTimestamp, CompletableFuture<Void>> 
beforeNow = prevFutures.headMap(now, true);
 
-        Thread.sleep(5);
+                    beforeNow.forEach((t, f) -> assertThat(
+                            "now=" + now + ", ts=" + t + ", trackerTs=" + 
tracker.current(),
+                            f, willCompleteSuccessfully())
+                    );
 
-        tracker.update(clock.now());
+                    beforeNow.clear();
+                }
+            }
 
-        List<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>> 
uncompleted =
-                allFutures.stream().filter(f -> 
!f.get1().isDone()).collect(toList());
+            return null;
+        }, threads, "trackableHybridClockTest");
 
-        assertTrue(uncompleted.isEmpty());
+        tracker.update(HybridTimestamp.MAX_VALUE);
+
+        
assertThat(CompletableFuture.allOf(allFutures.toArray(CompletableFuture[]::new)),
 willCompleteSuccessfully());
+    }
+
+    @RepeatedTest(100)
+    void testConcurrentAccess() {
+        var tracker = new PendingComparableValuesTracker<>(1);
+
+        var barrier = new CyclicBarrier(2);
+
+        CompletableFuture<Void> writerFuture = CompletableFuture.runAsync(() 
-> {
+            try {
+                barrier.await();
+                tracker.update(2);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        CompletableFuture<Void> readerFuture = CompletableFuture.runAsync(() 
-> {
+            try {
+                barrier.await();
+                tracker.waitFor(2).get(1, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        assertThat(writerFuture, willCompleteSuccessfully());
+        assertThat(readerFuture, willCompleteSuccessfully());
     }
 }

Reply via email to