This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 770030b063 IGNITE-22038 Make idle safe time sending more robust (#3602)
770030b063 is described below
commit 770030b06328132d9abe7c81384531c47bddf423
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Apr 12 23:04:25 2024 +0400
IGNITE-22038 Make idle safe time sending more robust (#3602)
---
.../internal/network/NettyWorkersRegistrar.java | 8 +++-
modules/replicator/build.gradle | 9 +++--
.../ItPlacementDriverReplicaSideTest.java | 4 +-
.../ignite/internal/replicator/ReplicaManager.java | 43 +++++++++++++++++-----
.../internal/replicator/ReplicaManagerTest.java | 4 +-
.../runner/app/ItIgniteNodeRestartTest.java | 3 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
modules/table/build.gradle | 1 +
.../ignite/distributed/ReplicaUnavailableTest.java | 4 +-
.../rebalance/ItRebalanceDistributedTest.java | 3 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 4 +-
.../internal/worker/CriticalWorkerWatchdog.java | 11 ++++++
12 files changed, 75 insertions(+), 22 deletions(-)
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java
index 6ad0091041..d164cbd74a 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/NettyWorkersRegistrar.java
@@ -109,8 +109,12 @@ public class NettyWorkersRegistrar implements
IgniteComponent {
}
private void sendHearbeats() {
- for (NettyWorker worker : workers) {
- worker.sendHeartbeat();
+ try {
+ for (NettyWorker worker : workers) {
+ worker.sendHeartbeat();
+ }
+ } catch (Exception | AssertionError ignored) {
+ // Ignore.
}
}
diff --git a/modules/replicator/build.gradle b/modules/replicator/build.gradle
index d38e2aeed5..08d76b32be 100644
--- a/modules/replicator/build.gradle
+++ b/modules/replicator/build.gradle
@@ -34,6 +34,7 @@ dependencies {
implementation project(':ignite-cluster-management')
implementation project(':ignite-network-api')
implementation project(':ignite-placement-driver-api')
+ implementation project(':ignite-failure-handler')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
implementation libs.auto.service.annotations
@@ -45,12 +46,14 @@ dependencies {
integrationTestImplementation project(':ignite-cluster-management')
integrationTestImplementation testFixtures(project)
integrationTestImplementation testFixtures(project(':ignite-core'))
- integrationTestImplementation
testFixtures(project(':ignite-configuration:'))
+ integrationTestImplementation
testFixtures(project(':ignite-configuration'))
integrationTestImplementation testFixtures(project(':ignite-network'))
- integrationTestImplementation
testFixtures(project(':ignite-placement-driver-api:'))
+ integrationTestImplementation
testFixtures(project(':ignite-placement-driver-api'))
+ integrationTestImplementation
testFixtures(project(':ignite-failure-handler'))
testImplementation testFixtures(project(':ignite-core'))
- testImplementation testFixtures(project(':ignite-placement-driver-api:'))
+ testImplementation testFixtures(project(':ignite-placement-driver-api'))
+ testImplementation(testFixtures(project(':ignite-failure-handler')))
testImplementation libs.hamcrest.core
testImplementation libs.mockito.core
testImplementation libs.mockito.junit
diff --git
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index a52d4d8e9f..1ee243bad2 100644
---
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -53,6 +53,7 @@ import java.util.stream.IntStream;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
@@ -188,7 +189,8 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
new TestClockService(clock),
Set.of(ReplicaMessageTestGroup.class),
new TestPlacementDriver(primaryReplicaSupplier),
- partitionOperationsExecutor
+ partitionOperationsExecutor,
+ new NoOpFailureProcessor()
);
replicaManagers.put(nodeName, replicaManager);
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 1b80f01936..ff1280f818 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -49,6 +49,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.event.AbstractEventProducer;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.failure.FailureType;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -139,6 +142,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
private final Executor requestsExecutor;
+ private final FailureProcessor failureProcessor;
+
/** Set of message groups to handler as replica requests. */
private final Set<Class<?>> messageGroupsToHandle;
@@ -166,7 +171,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
ClockService clockService,
Set<Class<?>> messageGroupsToHandle,
PlacementDriver placementDriver,
- Executor requestsExecutor
+ Executor requestsExecutor,
+ FailureProcessor failureProcessor
) {
this(
nodeName,
@@ -176,7 +182,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
messageGroupsToHandle,
placementDriver,
requestsExecutor,
- () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
+ () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+ failureProcessor
);
}
@@ -200,7 +207,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
Set<Class<?>> messageGroupsToHandle,
PlacementDriver placementDriver,
Executor requestsExecutor,
- LongSupplier idleSafeTimePropagationPeriodMsSupplier
+ LongSupplier idleSafeTimePropagationPeriodMsSupplier,
+ FailureProcessor failureProcessor
) {
this.clusterNetSvc = clusterNetSvc;
this.cmgMgr = cmgMgr;
@@ -211,6 +219,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
this.placementDriver = placementDriver;
this.requestsExecutor = requestsExecutor;
this.idleSafeTimePropagationPeriodMsSupplier =
idleSafeTimePropagationPeriodMsSupplier;
+ this.failureProcessor = failureProcessor;
scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool(
1,
@@ -764,15 +773,29 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* Idle safe time sync for replicas.
*/
private void idleSafeTimeSync() {
- replicas.values().forEach(r -> {
- if (r.isDone()) {
- ReplicaSafeTimeSyncRequest req =
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
- .groupId(r.join().groupId())
- .build();
+ for (Entry<ReplicationGroupId, CompletableFuture<Replica>> entry :
replicas.entrySet()) {
+ try {
+ sendSafeTimeSyncIfReplicaReady(entry.getValue());
+ } catch (Exception | AssertionError e) {
+ LOG.warn("Error while trying to send a safe time sync request
to {}", e, entry.getKey());
+ } catch (Error e) {
+ LOG.error("Error while trying to send a safe time sync request
to {}", e, entry.getKey());
- r.join().processRequest(req, localNodeId);
+ failureProcessor.process(new
FailureContext(FailureType.CRITICAL_ERROR, e));
}
- });
+ }
+ }
+
+ private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica>
replicaFuture) {
+ if (isCompletedSuccessfully(replicaFuture)) {
+ Replica replica = replicaFuture.join();
+
+ ReplicaSafeTimeSyncRequest req =
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+ .groupId(replica.groupId())
+ .build();
+
+ replica.processRequest(req, localNodeId);
+ }
}
/**
diff --git
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index 222701613b..75c20b53dd 100644
---
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -107,7 +108,8 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
new TestClockService(clock),
Set.of(),
placementDriver,
- requestsExecutor
+ requestsExecutor,
+ new NoOpFailureProcessor()
);
replicaManager.start();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index a55f3a7d25..e355a6e20d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -479,7 +479,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
Set.of(TableMessageGroup.class, TxMessageGroup.class),
placementDriverManager.placementDriver(),
threadPoolsManager.partitionOperationsExecutor(),
- partitionIdleSafeTimePropagationPeriodMsSupplier
+ partitionIdleSafeTimePropagationPeriodMsSupplier,
+ failureProcessor
);
var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index fd56fa5cca..3714a4de87 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -618,7 +618,8 @@ public class IgniteImpl implements Ignite {
Set.of(TableMessageGroup.class, TxMessageGroup.class),
placementDriverMgr.placementDriver(),
threadPoolsManager.partitionOperationsExecutor(),
- partitionIdleSafeTimePropagationPeriodMsSupplier
+ partitionIdleSafeTimePropagationPeriodMsSupplier,
+ failureProcessor
);
metricManager.configure(clusterConfigRegistry.getConfiguration(MetricConfiguration.KEY));
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 5f147caa2c..a9cce38262 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -108,6 +108,7 @@ dependencies {
testFixturesImplementation(testFixtures(project(':ignite-network')))
testFixturesImplementation(testFixtures(project(':ignite-placement-driver-api')))
testFixturesImplementation(testFixtures(project(':ignite-low-watermark')))
+
testFixturesImplementation(testFixtures(project(':ignite-failure-handler')))
testFixturesImplementation libs.jetbrains.annotations
testFixturesImplementation libs.fastutil.core
testFixturesImplementation libs.mockito.core
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index f21a94e0bc..c0058c145b 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
@@ -155,7 +156,8 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
new TestClockService(clock),
Set.of(TableMessageGroup.class, TxMessageGroup.class),
new
TestPlacementDriver(clusterService.topologyService().localMember()),
- requestsExecutor
+ requestsExecutor,
+ new NoOpFailureProcessor()
);
replicaManager.start();
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 5f868d7efc..6eea705d62 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1178,7 +1178,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
Set.of(TableMessageGroup.class, TxMessageGroup.class),
placementDriver,
threadPoolsManager.partitionOperationsExecutor(),
- partitionIdleSafeTimePropagationPeriodMsSupplier
+ partitionIdleSafeTimePropagationPeriodMsSupplier,
+ new NoOpFailureProcessor()
));
LongSupplier delayDurationMsSupplier = () -> 10L;
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 4ecb62b495..850cf8e58f 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -70,6 +70,7 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.failure.NoOpFailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -406,7 +407,8 @@ public class ItTxTestCluster {
clockService,
Set.of(TableMessageGroup.class, TxMessageGroup.class),
placementDriver,
- partitionOperationsExecutor
+ partitionOperationsExecutor,
+ new NoOpFailureProcessor()
);
replicaMgr.start();
diff --git
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalWorkerWatchdog.java
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalWorkerWatchdog.java
index 767883e5d5..1ca3c8ecac 100644
---
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalWorkerWatchdog.java
+++
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalWorkerWatchdog.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.worker;
+import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
import static
org.apache.ignite.internal.failure.FailureType.SYSTEM_WORKER_BLOCKED;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.lang.ErrorGroups.CriticalWorkers.SYSTEM_WORKER_BLOCKED_ERR;
@@ -112,6 +113,16 @@ public class CriticalWorkerWatchdog implements
CriticalWorkerRegistry, IgniteCom
}
private void probeLiveness() {
+ try {
+ doProbeLiveness();
+ } catch (Exception | AssertionError e) {
+ LOG.debug("Error while probing liveness", e);
+ } catch (Error e) {
+ failureProcessor.process(new FailureContext(CRITICAL_ERROR, e));
+ }
+ }
+
+ private void doProbeLiveness() {
long maxAllowedLag = configuration.maxAllowedLag().value();
Long2LongMap delayedThreadIdsToDelays =
getDelayedThreadIdsAndDelays(maxAllowedLag);