This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 08827046a8 IGNITE-20971 Fix deadlock when receiving obsolete lease
(#2929)
08827046a8 is described below
commit 08827046a8e6409b59b7a7597adc7a650fe091f0
Author: Ivan Bessonov <[email protected]>
AuthorDate: Thu Dec 7 18:00:42 2023 +0300
IGNITE-20971 Fix deadlock when receiving obsolete lease (#2929)
---
.../internal/index/IndexBuildController.java | 11 +++---
.../metastorage/server/WatchProcessor.java | 42 +++++++++++++++++++++-
.../internal/table/distributed/TableManager.java | 4 +++
3 files changed, 51 insertions(+), 6 deletions(-)
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
index 1bc8568931..81ae939949 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
@@ -39,6 +39,7 @@ import
org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import
org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
@@ -157,7 +158,7 @@ class IndexBuildController implements ManuallyCloseable {
for (TablePartitionId primaryReplicaId : primaryReplicaIds) {
if (primaryReplicaId.tableId() ==
parameters.indexDescriptor().tableId()) {
CompletableFuture<?> startBuildIndexFuture =
getMvTableStorageFuture(parameters.causalityToken(), primaryReplicaId)
- .thenCompose(mvTableStorage ->
awaitPrimaryReplicaForNow(primaryReplicaId)
+ .thenCompose(mvTableStorage ->
awaitPrimaryReplica(primaryReplicaId, clock.now())
.thenAccept(replicaMeta ->
tryScheduleBuildIndex(
primaryReplicaId,
parameters.indexDescriptor(),
@@ -194,7 +195,7 @@ class IndexBuildController implements ManuallyCloseable {
int catalogVersion = catalogService.latestCatalogVersion();
return getMvTableStorageFuture(parameters.causalityToken(),
primaryReplicaId)
- .thenCompose(mvTableStorage ->
awaitPrimaryReplicaForNow(primaryReplicaId)
+ .thenCompose(mvTableStorage ->
awaitPrimaryReplica(primaryReplicaId, parameters.startTime())
.thenAccept(replicaMeta ->
tryScheduleBuildIndexesForNewPrimaryReplica(
catalogVersion,
primaryReplicaId,
@@ -267,15 +268,15 @@ class IndexBuildController implements ManuallyCloseable {
return indexManager.getMvTableStorage(causalityToken,
replicaId.tableId());
}
- private CompletableFuture<ReplicaMeta>
awaitPrimaryReplicaForNow(TablePartitionId replicaId) {
+ private CompletableFuture<ReplicaMeta>
awaitPrimaryReplica(TablePartitionId replicaId, HybridTimestamp timestamp) {
return placementDriver
- .awaitPrimaryReplica(replicaId, clock.now(),
AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC, SECONDS)
+ .awaitPrimaryReplica(replicaId, timestamp,
AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC, SECONDS)
.handle((replicaMeta, throwable) -> {
if (throwable != null) {
Throwable unwrapThrowable =
ExceptionUtils.unwrapCause(throwable);
if (unwrapThrowable instanceof
PrimaryReplicaAwaitTimeoutException) {
- return awaitPrimaryReplicaForNow(replicaId);
+ return awaitPrimaryReplica(replicaId, timestamp);
} else {
return
CompletableFuture.<ReplicaMeta>failedFuture(unwrapThrowable);
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 918b00cc85..9cb803bce0 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -23,6 +23,7 @@ import static
java.util.concurrent.CompletableFuture.supplyAsync;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -33,6 +34,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -61,6 +63,18 @@ public class WatchProcessor implements ManuallyCloseable {
private static final IgniteLogger LOG =
Loggers.forClass(WatchProcessor.class);
+ /**
+ * If watch event processing takes more time, than this constant, we will
log warning message with some information.
+ */
+ private static final int WATCH_EVENT_PROCESSING_LOG_THRESHOLD_MILLIS = 100;
+
+ /**
+ * The number of keys in log message, that will be printed for long events.
+ *
+ * @see #WATCH_EVENT_PROCESSING_LOG_THRESHOLD_MILLIS
+ */
+ private static final int WATCH_EVENT_PROCESSING_LOG_KEYS = 10;
+
/** Map that contains Watches and corresponding Watch notification process
(represented as a CompletableFuture). */
private final List<Watch> watches = new CopyOnWriteArrayList<>();
@@ -150,16 +164,22 @@ public class WatchProcessor implements ManuallyCloseable {
return watchesAndEventsFuture
.thenComposeAsync(watchAndEvents -> {
+ long startTimeNanos = System.nanoTime();
+
CompletableFuture<Void> notifyWatchesFuture =
notifyWatches(watchAndEvents, newRevision, time);
// Revision update is triggered strictly after
all watch listeners have been notified.
CompletableFuture<Void>
notifyUpdateRevisionFuture = notifyUpdateRevisionListeners(newRevision);
- return allOf(notifyWatchesFuture,
notifyUpdateRevisionFuture)
+ CompletableFuture<Void> notificationFuture =
allOf(notifyWatchesFuture, notifyUpdateRevisionFuture)
.thenComposeAsync(
unused ->
invokeOnRevisionCallback(watchAndEvents, newRevision, time),
watchExecutor
);
+
+ notificationFuture.whenComplete((unused, e) ->
maybeLogLongProcessing(updatedEntries, startTimeNanos));
+
+ return notificationFuture;
}, watchExecutor);
}, watchExecutor);
@@ -208,6 +228,26 @@ public class WatchProcessor implements ManuallyCloseable {
return allOf(notifyWatchFutures);
}
+ private static void maybeLogLongProcessing(List<Entry> updatedEntries,
long startTimeNanos) {
+ long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- startTimeNanos);
+
+ if (durationMillis > WATCH_EVENT_PROCESSING_LOG_THRESHOLD_MILLIS) {
+ String keysHead = updatedEntries.stream()
+ .limit(WATCH_EVENT_PROCESSING_LOG_KEYS)
+ .map(entry -> new String(entry.key(),
StandardCharsets.UTF_8))
+ .collect(Collectors.joining(", "));
+
+ String keysTail = updatedEntries.size() >
WATCH_EVENT_PROCESSING_LOG_KEYS ? ", ..." : "";
+
+ LOG.warn(
+ "Watch event processing has been too long [duration={},
keys=[{}{}]]",
+ durationMillis,
+ keysHead,
+ keysTail
+ );
+ }
+ }
+
private CompletableFuture<List<WatchAndEvents>>
collectWatchesAndEvents(List<Entry> updatedEntries, long revision) {
if (watches.isEmpty()) {
return emptyListCompletedFuture();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 820c75c212..b392464667 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -2220,6 +2220,10 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @param future Future.
*/
private <T> CompletableFuture<T> orStopManagerFuture(CompletableFuture<T>
future) {
+ if (future.isDone()) {
+ return future;
+ }
+
return anyOf(future, stopManagerFuture).thenApply(o -> (T) o);
}
}