This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 770030b063 IGNITE-22038 Make idle safe time sending more robust (#3602)
770030b063 is described below

commit 770030b06328132d9abe7c81384531c47bddf423
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Apr 12 23:04:25 2024 +0400

    IGNITE-22038 Make idle safe time sending more robust (#3602)
---
 .../internal/network/NettyWorkersRegistrar.java    |  8 +++-
 modules/replicator/build.gradle                    |  9 +++--
 .../ItPlacementDriverReplicaSideTest.java          |  4 +-
 .../ignite/internal/replicator/ReplicaManager.java | 43 +++++++++++++++++-----
 .../internal/replicator/ReplicaManagerTest.java    |  4 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |  3 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  3 +-
 modules/table/build.gradle                         |  1 +
 .../ignite/distributed/ReplicaUnavailableTest.java |  4 +-
 .../rebalance/ItRebalanceDistributedTest.java      |  3 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |  4 +-
 .../internal/worker/CriticalWorkerWatchdog.java    | 11 ++++++
 12 files changed, 75 insertions(+), 22 deletions(-)

diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java
index 6ad0091041..d164cbd74a 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java
@@ -109,8 +109,12 @@ public class NettyWorkersRegistrar implements 
IgniteComponent {
     }
 
     private void sendHearbeats() {
-        for (NettyWorker worker : workers) {
-            worker.sendHeartbeat();
+        try {
+            for (NettyWorker worker : workers) {
+                worker.sendHeartbeat();
+            }
+        } catch (Exception | AssertionError ignored) {
+            // Ignore.
         }
     }
 
diff --git a/modules/replicator/build.gradle b/modules/replicator/build.gradle
index d38e2aeed5..08d76b32be 100644
--- a/modules/replicator/build.gradle
+++ b/modules/replicator/build.gradle
@@ -34,6 +34,7 @@ dependencies {
     implementation project(':ignite-cluster-management')
     implementation project(':ignite-network-api')
     implementation project(':ignite-placement-driver-api')
+    implementation project(':ignite-failure-handler')
     implementation libs.jetbrains.annotations
     implementation libs.fastutil.core
     implementation libs.auto.service.annotations
@@ -45,12 +46,14 @@ dependencies {
     integrationTestImplementation project(':ignite-cluster-management')
     integrationTestImplementation testFixtures(project)
     integrationTestImplementation testFixtures(project(':ignite-core'))
-    integrationTestImplementation 
testFixtures(project(':ignite-configuration:'))
+    integrationTestImplementation 
testFixtures(project(':ignite-configuration'))
     integrationTestImplementation testFixtures(project(':ignite-network'))
-    integrationTestImplementation 
testFixtures(project(':ignite-placement-driver-api:'))
+    integrationTestImplementation 
testFixtures(project(':ignite-placement-driver-api'))
+    integrationTestImplementation 
testFixtures(project(':ignite-failure-handler'))
 
     testImplementation testFixtures(project(':ignite-core'))
-    testImplementation testFixtures(project(':ignite-placement-driver-api:'))
+    testImplementation testFixtures(project(':ignite-placement-driver-api'))
+    testImplementation(testFixtures(project(':ignite-failure-handler')))
     testImplementation libs.hamcrest.core
     testImplementation libs.mockito.core
     testImplementation libs.mockito.junit
diff --git 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index a52d4d8e9f..1ee243bad2 100644
--- 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -53,6 +53,7 @@ import java.util.stream.IntStream;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureProcessor;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.TestClockService;
@@ -188,7 +189,8 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
                     new TestClockService(clock),
                     Set.of(ReplicaMessageTestGroup.class),
                     new TestPlacementDriver(primaryReplicaSupplier),
-                    partitionOperationsExecutor
+                    partitionOperationsExecutor,
+                    new NoOpFailureProcessor()
             );
 
             replicaManagers.put(nodeName, replicaManager);
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 1b80f01936..ff1280f818 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -49,6 +49,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.LongSupplier;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.event.AbstractEventProducer;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.failure.FailureType;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -139,6 +142,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
     private final Executor requestsExecutor;
 
+    private final FailureProcessor failureProcessor;
+
     /** Set of message groups to handler as replica requests. */
     private final Set<Class<?>> messageGroupsToHandle;
 
@@ -166,7 +171,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             ClockService clockService,
             Set<Class<?>> messageGroupsToHandle,
             PlacementDriver placementDriver,
-            Executor requestsExecutor
+            Executor requestsExecutor,
+            FailureProcessor failureProcessor
     ) {
         this(
                 nodeName,
@@ -176,7 +182,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 messageGroupsToHandle,
                 placementDriver,
                 requestsExecutor,
-                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
+                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+                failureProcessor
         );
     }
 
@@ -200,7 +207,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             Set<Class<?>> messageGroupsToHandle,
             PlacementDriver placementDriver,
             Executor requestsExecutor,
-            LongSupplier idleSafeTimePropagationPeriodMsSupplier
+            LongSupplier idleSafeTimePropagationPeriodMsSupplier,
+            FailureProcessor failureProcessor
     ) {
         this.clusterNetSvc = clusterNetSvc;
         this.cmgMgr = cmgMgr;
@@ -211,6 +219,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         this.placementDriver = placementDriver;
         this.requestsExecutor = requestsExecutor;
         this.idleSafeTimePropagationPeriodMsSupplier = 
idleSafeTimePropagationPeriodMsSupplier;
+        this.failureProcessor = failureProcessor;
 
         scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool(
                 1,
@@ -764,15 +773,29 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      * Idle safe time sync for replicas.
      */
     private void idleSafeTimeSync() {
-        replicas.values().forEach(r -> {
-            if (r.isDone()) {
-                ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
-                        .groupId(r.join().groupId())
-                        .build();
+        for (Entry<ReplicationGroupId, CompletableFuture<Replica>> entry : 
replicas.entrySet()) {
+            try {
+                sendSafeTimeSyncIfReplicaReady(entry.getValue());
+            } catch (Exception | AssertionError e) {
+                LOG.warn("Error while trying to send a safe time sync request 
to {}", e, entry.getKey());
+            } catch (Error e) {
+                LOG.error("Error while trying to send a safe time sync request 
to {}", e, entry.getKey());
 
-                r.join().processRequest(req, localNodeId);
+                failureProcessor.process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
             }
-        });
+        }
+    }
+
+    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> 
replicaFuture) {
+        if (isCompletedSuccessfully(replicaFuture)) {
+            Replica replica = replicaFuture.join();
+
+            ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+                    .groupId(replica.groupId())
+                    .build();
+
+            replica.processRequest(req, localNodeId);
+        }
     }
 
     /**
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index 222701613b..75c20b53dd 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.failure.NoOpFailureProcessor;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -107,7 +108,8 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
                 new TestClockService(clock),
                 Set.of(),
                 placementDriver,
-                requestsExecutor
+                requestsExecutor,
+                new NoOpFailureProcessor()
         );
 
         replicaManager.start();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index a55f3a7d25..e355a6e20d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -479,7 +479,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 Set.of(TableMessageGroup.class, TxMessageGroup.class),
                 placementDriverManager.placementDriver(),
                 threadPoolsManager.partitionOperationsExecutor(),
-                partitionIdleSafeTimePropagationPeriodMsSupplier
+                partitionIdleSafeTimePropagationPeriodMsSupplier,
+                failureProcessor
         );
 
         var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index fd56fa5cca..3714a4de87 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -618,7 +618,8 @@ public class IgniteImpl implements Ignite {
                 Set.of(TableMessageGroup.class, TxMessageGroup.class),
                 placementDriverMgr.placementDriver(),
                 threadPoolsManager.partitionOperationsExecutor(),
-                partitionIdleSafeTimePropagationPeriodMsSupplier
+                partitionIdleSafeTimePropagationPeriodMsSupplier,
+                failureProcessor
         );
 
         
metricManager.configure(clusterConfigRegistry.getConfiguration(MetricConfiguration.KEY));
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 5f147caa2c..a9cce38262 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -108,6 +108,7 @@ dependencies {
     testFixturesImplementation(testFixtures(project(':ignite-network')))
     
testFixturesImplementation(testFixtures(project(':ignite-placement-driver-api')))
     testFixturesImplementation(testFixtures(project(':ignite-low-watermark')))
+    
testFixturesImplementation(testFixtures(project(':ignite-failure-handler')))
     testFixturesImplementation libs.jetbrains.annotations
     testFixturesImplementation libs.fastutil.core
     testFixturesImplementation libs.mockito.core
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index f21a94e0bc..c0058c145b 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureProcessor;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.TestClockService;
@@ -155,7 +156,8 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                 new TestClockService(clock),
                 Set.of(TableMessageGroup.class, TxMessageGroup.class),
                 new 
TestPlacementDriver(clusterService.topologyService().localMember()),
-                requestsExecutor
+                requestsExecutor,
+                new NoOpFailureProcessor()
         );
 
         replicaManager.start();
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 5f868d7efc..6eea705d62 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1178,7 +1178,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     Set.of(TableMessageGroup.class, TxMessageGroup.class),
                     placementDriver,
                     threadPoolsManager.partitionOperationsExecutor(),
-                    partitionIdleSafeTimePropagationPeriodMsSupplier
+                    partitionIdleSafeTimePropagationPeriodMsSupplier,
+                    new NoOpFailureProcessor()
             ));
 
             LongSupplier delayDurationMsSupplier = () -> 10L;
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 4ecb62b495..850cf8e58f 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -70,6 +70,7 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.failure.NoOpFailureProcessor;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -406,7 +407,8 @@ public class ItTxTestCluster {
                     clockService,
                     Set.of(TableMessageGroup.class, TxMessageGroup.class),
                     placementDriver,
-                    partitionOperationsExecutor
+                    partitionOperationsExecutor,
+                    new NoOpFailureProcessor()
             );
 
             replicaMgr.start();
diff --git 
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalWorkerWatchdog.java
 
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalWorkerWatchdog.java
index 767883e5d5..1ca3c8ecac 100644
--- 
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalWorkerWatchdog.java
+++ 
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalWorkerWatchdog.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.worker;
 
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
 import static 
org.apache.ignite.internal.failure.FailureType.SYSTEM_WORKER_BLOCKED;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.lang.ErrorGroups.CriticalWorkers.SYSTEM_WORKER_BLOCKED_ERR;
@@ -112,6 +113,16 @@ public class CriticalWorkerWatchdog implements 
CriticalWorkerRegistry, IgniteCom
     }
 
     private void probeLiveness() {
+        try {
+            doProbeLiveness();
+        } catch (Exception | AssertionError e) {
+            LOG.debug("Error while probing liveness", e);
+        } catch (Error e) {
+            failureProcessor.process(new FailureContext(CRITICAL_ERROR, e));
+        }
+    }
+
+    private void doProbeLiveness() {
         long maxAllowedLag = configuration.maxAllowedLag().value();
 
         Long2LongMap delayedThreadIdsToDelays = 
getDelayedThreadIdsAndDelays(maxAllowedLag);

Reply via email to