This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d96fbba77f IGNITE-18162 Fix a race in PendingComparableValuesTracker
and simplify code. Fixes #1343
d96fbba77f is described below
commit d96fbba77f9d5f40d233df7235b39c0e7cbed8d5
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Nov 15 19:43:39 2022 +0200
IGNITE-18162 Fix a race in PendingComparableValuesTracker and simplify
code. Fixes #1343
Signed-off-by: Slava Koptilin <[email protected]>
---
docs/_docs/thin-clients/index.adoc | 16 +++-
.../util/PendingComparableValuesTracker.java | 87 +++++++----------
.../util/PendingComparableValuesTrackerTest.java | 106 ++++++++++++---------
3 files changed, 113 insertions(+), 96 deletions(-)
diff --git a/docs/_docs/thin-clients/index.adoc
b/docs/_docs/thin-clients/index.adoc
index 0c53246ae2..dcd8e5dfc3 100644
--- a/docs/_docs/thin-clients/index.adoc
+++ b/docs/_docs/thin-clients/index.adoc
@@ -1,3 +1,17 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
= Client Configurations
Apache Ignite 3 clients are lightweight clients that connect to the cluster
via a standard socket connection. It does not become a part of the cluster
topology, never holds any data, and is not used as a destination for compute
calculations.
@@ -527,4 +541,4 @@ tab:C++[]
// Compute is not yet supported in C++
----
---
\ No newline at end of file
+--
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
index a976c01e51..545e8d0d09 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
@@ -19,23 +19,32 @@ package org.apache.ignite.internal.util;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import java.util.Collection;
-import java.util.Map;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicReference;
/**
* Tracker that stores comparable value internally, this value can grow when
{@link #update(Comparable)} method is called. The tracker gives
* ability to wait for certain value, see {@link #waitFor(Comparable)}.
*/
public class PendingComparableValuesTracker<T extends Comparable<T>> {
+ private static final VarHandle CURRENT;
+
+ static {
+ try {
+ CURRENT =
MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class,
"current", Comparable.class);
+ } catch (ReflectiveOperationException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
/** Map of comparable values to corresponding futures. */
- private final ConcurrentSkipListMap<T,
Collection<CompletableFuture<Void>>> valueFutures = new
ConcurrentSkipListMap<>();
+ private final ConcurrentSkipListMap<T, CompletableFuture<Void>>
valueFutures = new ConcurrentSkipListMap<>();
/** Current value. */
- public final AtomicReference<T> current;
+ private volatile T current;
/**
* Constructor with initial value.
@@ -43,7 +52,7 @@ public class PendingComparableValuesTracker<T extends
Comparable<T>> {
* @param initialValue Initial value.
*/
public PendingComparableValuesTracker(T initialValue) {
- current = new AtomicReference<>(initialValue);
+ current = initialValue;
}
/**
@@ -53,69 +62,43 @@ public class PendingComparableValuesTracker<T extends
Comparable<T>> {
* @param newValue New value.
*/
public void update(T newValue) {
- for (Map.Entry<T, Collection<CompletableFuture<Void>>> e :
valueFutures.entrySet()) {
- if (newValue.compareTo(e.getKey()) >= 0) {
- valueFutures.compute(e.getKey(), (k, v) -> {
- if (v != null) {
- v.forEach(f -> f.complete(null));
- }
-
- return null;
- });
- } else {
+ while (true) {
+ T current = this.current;
+
+ if (newValue.compareTo(current) <= 0) {
break;
}
- }
- while (true) {
- T current = this.current.get();
-
- if (newValue.compareTo(current) > 0) {
- if (this.current.compareAndSet(current, newValue)) {
- return;
- }
- } else {
- return;
+ if (CURRENT.compareAndSet(this, current, newValue)) {
+ ConcurrentNavigableMap<T, CompletableFuture<Void>>
smallerFutures = valueFutures.headMap(newValue, true);
+
+ smallerFutures.forEach((k, f) -> f.complete(null));
+
+ smallerFutures.clear();
+
+ break;
}
}
}
/**
- * Provides the future that is completed when this tracker's internal
value reaches given one. If the internal value is greater or
- * equal then the given one, returns completed future.
+ * Provides the future that is completed when this tracker's internal
value reaches given one. If the internal value is greater or equal
+ * then the given one, returns completed future.
*
* @param valueToWait Value to wait.
* @return Future.
*/
public CompletableFuture<Void> waitFor(T valueToWait) {
- if (current.get().compareTo(valueToWait) >= 0) {
+ if (current.compareTo(valueToWait) >= 0) {
return completedFuture(null);
}
- CompletableFuture<Void> future = new CompletableFuture<>();
-
- valueFutures.compute(valueToWait, (k, v) -> {
- if (v == null) {
- v = new ConcurrentLinkedDeque<>();
- }
-
- v.add(future);
+ CompletableFuture<Void> future =
valueFutures.computeIfAbsent(valueToWait, k -> new CompletableFuture<>());
- return v;
- });
-
- if (current.get().compareTo(valueToWait) >= 0) {
+ if (current.compareTo(valueToWait) >= 0) {
future.complete(null);
- valueFutures.compute(valueToWait, (k, v) -> {
- if (v == null) {
- return null;
- } else {
- v.remove(future);
- }
-
- return v;
- });
+ valueFutures.remove(valueToWait);
}
return future;
@@ -127,6 +110,6 @@ public class PendingComparableValuesTracker<T extends
Comparable<T>> {
* @return Current value.
*/
public T current() {
- return current.get();
+ return current;
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
index d61a211569..d30aeeb1ef 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
@@ -17,24 +17,25 @@
package org.apache.ignite.internal.util;
-import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
+import java.util.Collections;
import java.util.List;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
/**
@@ -78,57 +79,76 @@ public class PendingComparableValuesTrackerTest {
PendingComparableValuesTracker<HybridTimestamp> tracker = new
PendingComparableValuesTracker<>(clock.now());
- int threads = Runtime.getRuntime().availableProcessors() * 2;
+ int threads = Runtime.getRuntime().availableProcessors();
- Collection<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>>
allFutures = new ConcurrentLinkedDeque<>();
+ List<CompletableFuture<Void>> allFutures =
Collections.synchronizedList(new ArrayList<>());
- int iterations = 10_000;
+ int iterations = 1_000;
- runMultiThreaded(
- () -> {
- List<IgniteBiTuple<CompletableFuture<Void>,
HybridTimestamp>> prevFutures = new ArrayList<>();
- ThreadLocalRandom random = ThreadLocalRandom.current();
+ runMultiThreaded(() -> {
+ NavigableMap<HybridTimestamp, CompletableFuture<Void>> prevFutures
= new TreeMap<>();
- for (int i = 0; i < iterations; i++) {
- HybridTimestamp now = clock.now();
- tracker.update(now);
- HybridTimestamp timestampToWait =
- new HybridTimestamp(now.getPhysical() + 1,
now.getLogical() + random.nextInt(1000));
+ ThreadLocalRandom random = ThreadLocalRandom.current();
- CompletableFuture<Void> future =
tracker.waitFor(timestampToWait);
+ for (int i = 0; i < iterations; i++) {
+ HybridTimestamp now = clock.now();
- IgniteBiTuple<CompletableFuture<Void>,
HybridTimestamp> pair = new IgniteBiTuple<>(future, timestampToWait);
+ tracker.update(now);
- prevFutures.add(pair);
- allFutures.add(pair);
+ HybridTimestamp timestampToWait =
+ new HybridTimestamp(now.getPhysical() + 1,
now.getLogical() + random.nextInt(1000));
- if (i % 10 == 0) {
- for
(Iterator<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>> it =
prevFutures.iterator();
- it.hasNext();) {
- IgniteBiTuple<CompletableFuture<Void>,
HybridTimestamp> t = it.next();
+ CompletableFuture<Void> future =
tracker.waitFor(timestampToWait);
- if (t.get2().compareTo(now) <= 0) {
- assertTrue(t.get1().isDone(), "now=" + now
+ ", ts=" + t.get2() + ", trackerTs=" + tracker.current());
+ prevFutures.put(timestampToWait, future);
- it.remove();
- }
- }
- }
- }
+ allFutures.add(future);
- return null;
- },
- threads,
- "trackableHybridClockTest"
- );
+ if (i % 10 == 0) {
+ SortedMap<HybridTimestamp, CompletableFuture<Void>>
beforeNow = prevFutures.headMap(now, true);
- Thread.sleep(5);
+ beforeNow.forEach((t, f) -> assertThat(
+ "now=" + now + ", ts=" + t + ", trackerTs=" +
tracker.current(),
+ f, willCompleteSuccessfully())
+ );
- tracker.update(clock.now());
+ beforeNow.clear();
+ }
+ }
- List<IgniteBiTuple<CompletableFuture<Void>, HybridTimestamp>>
uncompleted =
- allFutures.stream().filter(f ->
!f.get1().isDone()).collect(toList());
+ return null;
+ }, threads, "trackableHybridClockTest");
- assertTrue(uncompleted.isEmpty());
+ tracker.update(HybridTimestamp.MAX_VALUE);
+
+
assertThat(CompletableFuture.allOf(allFutures.toArray(CompletableFuture[]::new)),
willCompleteSuccessfully());
+ }
+
+ @RepeatedTest(100)
+ void testConcurrentAccess() {
+ var tracker = new PendingComparableValuesTracker<>(1);
+
+ var barrier = new CyclicBarrier(2);
+
+ CompletableFuture<Void> writerFuture = CompletableFuture.runAsync(()
-> {
+ try {
+ barrier.await();
+ tracker.update(2);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ CompletableFuture<Void> readerFuture = CompletableFuture.runAsync(()
-> {
+ try {
+ barrier.await();
+ tracker.waitFor(2).get(1, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertThat(writerFuture, willCompleteSuccessfully());
+ assertThat(readerFuture, willCompleteSuccessfully());
}
}