This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 48ca4d4632 IGNITE-19963 Move completion of ClockWaiter futures to a
special thread pool (#2310)
48ca4d4632 is described below
commit 48ca4d4632d8dc2d55fbde4bbfa7a6c52a841b30
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Jul 13 14:00:32 2023 +0400
IGNITE-19963 Move completion of ClockWaiter futures to a special thread
pool (#2310)
---
.../ignite/internal/catalog/ClockWaiter.java | 95 ++++++++++++++--------
.../ignite/internal/catalog/ClockWaiterTest.java | 3 +-
.../ignite/internal/hlc/ClockUpdateListener.java | 12 ++-
.../ignite/internal/hlc/HybridClockImpl.java | 24 ++++--
.../apache/ignite/internal/HybridClockTest.java | 8 +-
.../apache/ignite/internal/TestHybridClock.java | 2 -
6 files changed, 96 insertions(+), 48 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
index 8c5c65df22..7141237fbd 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
@@ -17,14 +17,18 @@
package org.apache.ignite.internal.catalog;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.hlc.ClockUpdateListener;
@@ -35,6 +39,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;
import org.apache.ignite.lang.NodeStoppingException;
@@ -59,18 +64,39 @@ public class ClockWaiter implements IgniteComponent {
private final ClockUpdateListener updateListener = this::onUpdate;
+ private final Runnable triggerClockUpdate = this::triggerTrackerUpdate;
+
+ /** Executor on which short-lived tasks are scheduled that are needed to
timely complete awaiting futures. */
private volatile ScheduledExecutorService scheduler;
+ /** Executor that executes completion of futures returned to the user, so
it might take arbitrarily heavy operations. */
+ private final ExecutorService futureExecutor;
+
+ /**
+ * Creates a new {@link ClockWaiter}.
+ *
+ * @param nodeName Name of the current Ignite node.
+ * @param clock Clock to look at.
+ */
public ClockWaiter(String nodeName, HybridClock clock) {
this.nodeName = nodeName;
this.clock = clock;
+
+ futureExecutor = new ThreadPoolExecutor(
+ 0,
+ 4,
+ 1,
+ TimeUnit.MINUTES,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory(nodeName +
"-clock-waiter-future-executor", LOG)
+ );
}
@Override
public void start() {
clock.addUpdateListener(updateListener);
- scheduler = Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+ scheduler = Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory(nodeName + "-clock-waiter-scheduler", LOG));
}
@Override
@@ -91,6 +117,9 @@ public class ClockWaiter implements IgniteComponent {
// user-facing futures we return from the tracker), but we don't need
them for anything else,
// so it's simpler to just use shutdownNow().
scheduler.shutdownNow();
+
+ IgniteUtils.shutdownAndAwaitTermination(futureExecutor, 10,
TimeUnit.SECONDS);
+
scheduler.awaitTermination(10, TimeUnit.SECONDS);
}
@@ -109,6 +138,9 @@ public class ClockWaiter implements IgniteComponent {
/**
* Wait for the clock to reach the given timestamp.
*
+ * <p>If completion of the returned future triggers some I/O operations or
causes the code to block, it is highly
+ * recommended to execute those completion stages on a specific thread
pool to avoid the waiter's pool starvation.
+ *
* @param targetTimestamp Timestamp to wait for.
* @return A future that completes when the timestamp is reached by the
clock's time.
*/
@@ -125,47 +157,44 @@ public class ClockWaiter implements IgniteComponent {
}
private CompletableFuture<Void> doWaitFor(HybridTimestamp targetTimestamp)
{
+ HybridTimestamp now = clock.now();
+
+ if (targetTimestamp.compareTo(now) <= 0) {
+ return completedFuture(null);
+ }
+
CompletableFuture<Void> future =
nowTracker.waitFor(targetTimestamp.longValue());
- ScheduledFuture<?> scheduledFuture;
+ // Adding 1 to account for a possible non-null logical part of the
targetTimestamp.
+ long millisToWait = targetTimestamp.getPhysical() - now.getPhysical()
+ 1;
- if (!future.isDone()) {
- // This triggers a clock update.
- HybridTimestamp now = clock.now();
+ ScheduledFuture<?> scheduledFuture =
scheduler.schedule(triggerClockUpdate, millisToWait, TimeUnit.MILLISECONDS);
- if (targetTimestamp.compareTo(now) <= 0) {
- assert future.isDone();
+ // The future might be completed in a random thread, so let's move its
completion execution to a special thread pool
+ // because the user's code following the future completion might run
arbitrarily heavy operations and we don't want
+ // to put them on an innocent thread invoking now()/update() on the
clock.
+ return future
+ .handleAsync((res, ex) -> {
+ scheduledFuture.cancel(true);
- scheduledFuture = null;
- } else {
- // Adding 1 to account for a possible non-null logical part of
the targetTimestamp.
- long millisToWait = targetTimestamp.getPhysical() -
now.getPhysical() + 1;
+ if (ex != null) {
+ translateTrackerClosedException(ex);
+ }
- scheduledFuture = scheduler.schedule(this::triggerClockUpdate,
millisToWait, TimeUnit.MILLISECONDS);
- }
+ return res;
+ }, futureExecutor);
+ }
+
+ private static void translateTrackerClosedException(Throwable ex) {
+ if (ex instanceof TrackerClosedException) {
+ throw new CancellationException();
} else {
- scheduledFuture = null;
+ throw new CompletionException(ex);
}
-
- return future.handle((res, ex) -> {
- if (scheduledFuture != null) {
- scheduledFuture.cancel(true);
- }
-
- if (ex != null) {
- // Let's replace a TrackerClosedException with a
CancellationException as the latter makes more sense for the clients.
- if (ex instanceof TrackerClosedException) {
- throw new CancellationException();
- } else {
- throw new CompletionException(ex);
- }
- }
-
- return res;
- });
}
- private void triggerClockUpdate() {
- clock.now();
+ private void triggerTrackerUpdate() {
+ onUpdate(clock.nowLong());
}
+
}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
index e970e4027c..e175cca7e5 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.catalog;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -68,7 +69,7 @@ class ClockWaiterTest {
clock.update(oneYearAhead);
- assertThat(future.isDone(), is(true));
+ assertThat(future, willCompleteSuccessfully());
}
private HybridTimestamp getOneYearAhead() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
index e550a9b486..e8d128c8a6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
@@ -18,13 +18,19 @@
package org.apache.ignite.internal.hlc;
/**
- * Used to track updates of a {@link HybridClock}: it gets notified each time
the clock 'ticks', including
- * adjustments caused by external events.
+ * Used to track updates of a {@link HybridClock}: it gets notified each time
{@link HybridClock#update(HybridTimestamp)},
+ * is invoked.
*/
@FunctionalInterface
public interface ClockUpdateListener {
/**
- * Called when the clock's current time advances.
+ * Called when the clock's current time advances due to a call to {@link
HybridClock#update(HybridTimestamp)}.
+ *
+ * <p>This does NOT get called when the clock current time gets advanced
by a call to
+ * {@link HybridClock#now()}/{@link HybridClock#nowLong()}.
+ *
+ * <p>This method must NOT do any I/O operations or block. If such
operations are needed, it should schedule them
+ * on a thread pool.
*
* @param newTs New timestamp on the clock (represented as a long value,
see {@link HybridTimestamp#longValue()}.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
index 4209d31674..843dcb7e18 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
@@ -26,12 +26,16 @@ import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.tostring.S;
/**
* A Hybrid Logical Clock implementation.
*/
public class HybridClockImpl implements HybridClock {
+ private final IgniteLogger log = Loggers.forClass(HybridClockImpl.class);
+
/**
* Var handle for {@link #latestTime}.
*/
@@ -71,15 +75,23 @@ public class HybridClockImpl implements HybridClock {
long newLatestTime = max(oldLatestTime + 1, now);
if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime))
{
- notifyUpdateListeners(newLatestTime);
-
return newLatestTime;
}
}
}
private void notifyUpdateListeners(long newTs) {
- updateListeners.forEach(listener -> listener.onUpdate(newTs));
+ for (ClockUpdateListener listener : updateListeners) {
+ try {
+ listener.onUpdate(newTs);
+ } catch (Throwable e) {
+ log.error("ClockUpdateListener#onUpdate() failed for {} at
{}", e, listener, newTs);
+
+ if (e instanceof Error) {
+ throw e;
+ }
+ }
+ }
}
@Override
@@ -88,10 +100,12 @@ public class HybridClockImpl implements HybridClock {
}
/**
- * Creates a timestamp for a received event.
+ * Updates the clock in accordance with an external event timestamp. If
the supplied timestamp is ahead of the
+ * current clock timestamp, the clock gets adjusted to make sure it never
returns any timestamp before (or equal to)
+ * the supplied external timestamp.
*
* @param requestTime Timestamp from request.
- * @return The hybrid timestamp.
+ * @return The resulting timestamp (guaranteed to exceed both previous
clock 'currentTs' and the supplied external ts).
*/
@Override
public HybridTimestamp update(HybridTimestamp requestTime) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
index 41fd6174ac..7aafc60950 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
@@ -120,25 +120,25 @@ class HybridClockTest {
}
@Test
- void updateListenerGetsNotifiedOnUpdateCausedByNowCall() {
+ void updateListenerIsNotNotifiedOnNowCall() {
HybridClock clock = new HybridClockImpl();
clock.addUpdateListener(updateListener);
HybridTimestamp ts = clock.now();
- verify(updateListener).onUpdate(ts.longValue());
+ verify(updateListener, never()).onUpdate(ts.longValue());
}
@Test
- void updateListenerGetsNotifiedOnUpdateCausedByNowLongCall() {
+ void updateListenerIsNotNotifiedOnNowLongCall() {
HybridClock clock = new HybridClockImpl();
clock.addUpdateListener(updateListener);
long ts = clock.nowLong();
- verify(updateListener).onUpdate(ts);
+ verify(updateListener, never()).onUpdate(ts);
}
@Test
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
index aa2d5cec37..3711106746 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
@@ -76,8 +76,6 @@ public class TestHybridClock implements HybridClock {
long newLatestTime = max(oldLatestTime + 1, now);
if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime))
{
- notifyUpdateListeners(newLatestTime);
-
return newLatestTime;
}
}