This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 08827046a8 IGNITE-20971 Fix deadlock when receiving obsolete lease 
(#2929)
08827046a8 is described below

commit 08827046a8e6409b59b7a7597adc7a650fe091f0
Author: Ivan Bessonov <[email protected]>
AuthorDate: Thu Dec 7 18:00:42 2023 +0300

    IGNITE-20971 Fix deadlock when receiving obsolete lease (#2929)
---
 .../internal/index/IndexBuildController.java       | 11 +++---
 .../metastorage/server/WatchProcessor.java         | 42 +++++++++++++++++++++-
 .../internal/table/distributed/TableManager.java   |  4 +++
 3 files changed, 51 insertions(+), 6 deletions(-)

diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
index 1bc8568931..81ae939949 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
@@ -39,6 +39,7 @@ import 
org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import 
org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
@@ -157,7 +158,7 @@ class IndexBuildController implements ManuallyCloseable {
             for (TablePartitionId primaryReplicaId : primaryReplicaIds) {
                 if (primaryReplicaId.tableId() == 
parameters.indexDescriptor().tableId()) {
                     CompletableFuture<?> startBuildIndexFuture = 
getMvTableStorageFuture(parameters.causalityToken(), primaryReplicaId)
-                            .thenCompose(mvTableStorage -> 
awaitPrimaryReplicaForNow(primaryReplicaId)
+                            .thenCompose(mvTableStorage -> 
awaitPrimaryReplica(primaryReplicaId, clock.now())
                                     .thenAccept(replicaMeta -> 
tryScheduleBuildIndex(
                                             primaryReplicaId,
                                             parameters.indexDescriptor(),
@@ -194,7 +195,7 @@ class IndexBuildController implements ManuallyCloseable {
                 int catalogVersion = catalogService.latestCatalogVersion();
 
                 return getMvTableStorageFuture(parameters.causalityToken(), 
primaryReplicaId)
-                        .thenCompose(mvTableStorage -> 
awaitPrimaryReplicaForNow(primaryReplicaId)
+                        .thenCompose(mvTableStorage -> 
awaitPrimaryReplica(primaryReplicaId, parameters.startTime())
                                 .thenAccept(replicaMeta -> 
tryScheduleBuildIndexesForNewPrimaryReplica(
                                         catalogVersion,
                                         primaryReplicaId,
@@ -267,15 +268,15 @@ class IndexBuildController implements ManuallyCloseable {
         return indexManager.getMvTableStorage(causalityToken, 
replicaId.tableId());
     }
 
-    private CompletableFuture<ReplicaMeta> 
awaitPrimaryReplicaForNow(TablePartitionId replicaId) {
+    private CompletableFuture<ReplicaMeta> 
awaitPrimaryReplica(TablePartitionId replicaId, HybridTimestamp timestamp) {
         return placementDriver
-                .awaitPrimaryReplica(replicaId, clock.now(), 
AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC, SECONDS)
+                .awaitPrimaryReplica(replicaId, timestamp, 
AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC, SECONDS)
                 .handle((replicaMeta, throwable) -> {
                     if (throwable != null) {
                         Throwable unwrapThrowable = 
ExceptionUtils.unwrapCause(throwable);
 
                         if (unwrapThrowable instanceof 
PrimaryReplicaAwaitTimeoutException) {
-                            return awaitPrimaryReplicaForNow(replicaId);
+                            return awaitPrimaryReplica(replicaId, timestamp);
                         } else {
                             return 
CompletableFuture.<ReplicaMeta>failedFuture(unwrapThrowable);
                         }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 918b00cc85..9cb803bce0 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -23,6 +23,7 @@ import static 
java.util.concurrent.CompletableFuture.supplyAsync;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -33,6 +34,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -61,6 +63,18 @@ public class WatchProcessor implements ManuallyCloseable {
 
     private static final IgniteLogger LOG = 
Loggers.forClass(WatchProcessor.class);
 
+    /**
+     * If watch event processing takes more time, than this constant, we will 
log warning message with some information.
+     */
+    private static final int WATCH_EVENT_PROCESSING_LOG_THRESHOLD_MILLIS = 100;
+
+    /**
+     * The number of keys in log message, that will be printed for long events.
+     *
+     * @see #WATCH_EVENT_PROCESSING_LOG_THRESHOLD_MILLIS
+     */
+    private static final int WATCH_EVENT_PROCESSING_LOG_KEYS = 10;
+
     /** Map that contains Watches and corresponding Watch notification process 
(represented as a CompletableFuture). */
     private final List<Watch> watches = new CopyOnWriteArrayList<>();
 
@@ -150,16 +164,22 @@ public class WatchProcessor implements ManuallyCloseable {
 
                     return watchesAndEventsFuture
                             .thenComposeAsync(watchAndEvents -> {
+                                long startTimeNanos = System.nanoTime();
+
                                 CompletableFuture<Void> notifyWatchesFuture = 
notifyWatches(watchAndEvents, newRevision, time);
 
                                 // Revision update is triggered strictly after 
all watch listeners have been notified.
                                 CompletableFuture<Void> 
notifyUpdateRevisionFuture = notifyUpdateRevisionListeners(newRevision);
 
-                                return allOf(notifyWatchesFuture, 
notifyUpdateRevisionFuture)
+                                CompletableFuture<Void> notificationFuture = 
allOf(notifyWatchesFuture, notifyUpdateRevisionFuture)
                                         .thenComposeAsync(
                                                 unused -> 
invokeOnRevisionCallback(watchAndEvents, newRevision, time),
                                                 watchExecutor
                                         );
+
+                                notificationFuture.whenComplete((unused, e) -> 
maybeLogLongProcessing(updatedEntries, startTimeNanos));
+
+                                return notificationFuture;
                             }, watchExecutor);
                 }, watchExecutor);
 
@@ -208,6 +228,26 @@ public class WatchProcessor implements ManuallyCloseable {
         return allOf(notifyWatchFutures);
     }
 
+    private static void maybeLogLongProcessing(List<Entry> updatedEntries, 
long startTimeNanos) {
+        long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() 
- startTimeNanos);
+
+        if (durationMillis > WATCH_EVENT_PROCESSING_LOG_THRESHOLD_MILLIS) {
+            String keysHead = updatedEntries.stream()
+                    .limit(WATCH_EVENT_PROCESSING_LOG_KEYS)
+                    .map(entry -> new String(entry.key(), 
StandardCharsets.UTF_8))
+                    .collect(Collectors.joining(", "));
+
+            String keysTail = updatedEntries.size() > 
WATCH_EVENT_PROCESSING_LOG_KEYS ? ", ..." : "";
+
+            LOG.warn(
+                    "Watch event processing has been too long [duration={}, 
keys=[{}{}]]",
+                    durationMillis,
+                    keysHead,
+                    keysTail
+            );
+        }
+    }
+
     private CompletableFuture<List<WatchAndEvents>> 
collectWatchesAndEvents(List<Entry> updatedEntries, long revision) {
         if (watches.isEmpty()) {
             return emptyListCompletedFuture();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 820c75c212..b392464667 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -2220,6 +2220,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
      * @param future Future.
      */
     private <T> CompletableFuture<T> orStopManagerFuture(CompletableFuture<T> 
future) {
+        if (future.isDone()) {
+            return future;
+        }
+
         return anyOf(future, stopManagerFuture).thenApply(o -> (T) o);
     }
 }

Reply via email to