This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a936ffa476 IGNITE-22759 Do not do partition SafeTime sync if previous 
attempt is not finished (#4684)
a936ffa476 is described below

commit a936ffa4765b90535acc2be711eb4f28216a51cb
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Nov 8 16:52:45 2024 +0400

    IGNITE-22759 Do not do partition SafeTime sync if previous attempt is not 
finished (#4684)
---
 .../ignite/client/handler/FakePlacementDriver.java |  5 ++
 .../ignite/internal/index/TestPlacementDriver.java |  5 ++
 .../impl/ItMetaStorageCompactionTriggerTest.java   |  3 ++
 .../replicator/utils/TestPlacementDriver.java      |  5 ++
 .../internal/placementdriver/PlacementDriver.java  |  7 +++
 .../placementdriver/TestPlacementDriver.java       |  5 ++
 .../placementdriver/PlacementDriverManager.java    |  5 ++
 .../ignite/internal/replicator/ReplicaManager.java | 57 +++++++++++++++++++---
 .../message/ReplicaSafeTimeSyncRequest.java        |  3 ++
 .../replicator/PartitionReplicaListener.java       | 20 +++++---
 .../wrappers/DelegatingPlacementDriver.java        |  5 ++
 11 files changed, 105 insertions(+), 15 deletions(-)

diff --git 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 85de0bd044..790a115352 100644
--- 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++ 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -152,4 +152,9 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
             }
         };
     }
+
+    @Override
+    public boolean isActualAt(HybridTimestamp timestamp) {
+        return true;
+    }
 }
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
index 12904f9573..344ef5a0e2 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
@@ -90,4 +90,9 @@ class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
                 )
         ));
     }
+
+    @Override
+    public boolean isActualAt(HybridTimestamp timestamp) {
+        return true;
+    }
 }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
index e072eaad36..df36e71c1b 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
@@ -43,16 +43,19 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.InitParametersBuilder;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.distributionzones.rebalance.DistributionZoneRebalanceEngine;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 /** Integration test for {@link MetaStorageCompactionTrigger}. */
+@WithSystemProperty(key = 
DistributionZoneRebalanceEngine.SKIP_REBALANCE_TRIGGERS_RECOVERY, value = 
"true")
 public class ItMetaStorageCompactionTriggerTest extends 
ClusterPerClassIntegrationTest {
     @Override
     protected int initialNodes() {
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
index 03e6182297..1cd3ade248 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
@@ -114,4 +114,9 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
             }
         });
     }
+
+    @Override
+    public boolean isActualAt(HybridTimestamp timestamp) {
+        return true;
+    }
 }
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
index b2d6425e17..0a6a099364 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
@@ -30,4 +30,11 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
  * replication group.</p>
  */
 public interface PlacementDriver extends LeasePlacementDriver, 
AssignmentsPlacementDriver {
+    /**
+     * Returns whether the state (on primary replicas/assignments) the 
placement driver operates upon is already present (and doesn't need
+     * to be awaited for) for the given timestamp.
+     *
+     * @param timestamp Timestamp of interest.
+     */
+    boolean isActualAt(HybridTimestamp timestamp);
 }
diff --git 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index e3895c912b..bff99cd020 100644
--- 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -115,4 +115,9 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
     public void setPrimaryReplicaSupplier(Supplier<? extends ReplicaMeta> 
primaryReplicaSupplier) {
         this.primaryReplicaSupplier = primaryReplicaSupplier;
     }
+
+    @Override
+    public boolean isActualAt(HybridTimestamp timestamp) {
+        return true;
+    }
 }
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 058247af6e..bed0e63056 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -278,6 +278,11 @@ public class PlacementDriverManager implements 
IgniteComponent {
 
     private PlacementDriver createPlacementDriver() {
         return new PlacementDriver() {
+            @Override
+            public boolean isActualAt(HybridTimestamp timestamp) {
+                return 
metastore.clusterTime().currentSafeTime().compareTo(timestamp) >= 0;
+            }
+
             @Override
             public CompletableFuture<List<TokenizedAssignments>> 
getAssignments(
                     List<? extends ReplicationGroupId> replicationGroupIds,
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 666f181f15..b213e36f65 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -223,6 +223,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
     // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this 
code
     private Function<ReplicaRequest, ReplicationGroupId> groupIdConverter = r 
-> r.groupId().asReplicationGroupId();
 
+    private volatile @Nullable HybridTimestamp lastIdleSafeTimeProposal;
+
     /**
      * Constructor for a replica service.
      *
@@ -335,6 +337,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         this.partitionRaftConfigurer = partitionRaftConfigurer;
         this.replicaStateManager = new 
ReplicaStateManager(replicaStartStopExecutor, clockService, placementDriver, 
this);
 
+        // This pool MUST be single-threaded to make sure idle safe time 
propagation attempts are not reordered on it.
         scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool(
                 1,
                 NamedThreadFactory.create(nodeName, 
"scheduled-idle-safe-time-sync-thread", LOG)
@@ -904,6 +907,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                     } catch (NodeStoppingException ignored) {
                         // No-op.
                     }
+
                     return v;
                 });
     }
@@ -1073,9 +1077,20 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      * Idle safe time sync for replicas.
      */
     private void idleSafeTimeSync() {
+        if (!shouldAdvanceIdleSafeTime()) {
+            // If previous attempt may still be waiting on the Metastorage 
SafeTime, we should not send the command ourselves as this
+            // might be an indicator that Metastorage SafeTime has stuck for 
some time; if we send the command, it will have to add its
+            // future, increasing (most probably, uselessly) heap pressure.
+            return;
+        }
+
+        HybridTimestamp proposedSafeTime = clockService.now();
+
+        lastIdleSafeTimeProposal = proposedSafeTime;
+
         for (Entry<ReplicationGroupId, CompletableFuture<Replica>> entry : 
replicas.entrySet()) {
             try {
-                sendSafeTimeSyncIfReplicaReady(entry.getValue());
+                sendSafeTimeSyncIfReplicaReady(entry.getValue(), 
proposedSafeTime);
             } catch (Exception | AssertionError e) {
                 LOG.warn("Error while trying to send a safe time sync request 
[groupId={}]", e, entry.getKey());
             } catch (Error e) {
@@ -1086,16 +1101,38 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         }
     }
 
-    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> 
replicaFuture) {
-        if (isCompletedSuccessfully(replicaFuture)) {
-            Replica replica = replicaFuture.join();
+    private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica> 
replicaFuture, HybridTimestamp proposedSafeTime) {
+        if (!isCompletedSuccessfully(replicaFuture)) {
+            return;
+        }
 
-            ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
-                    .groupId(toReplicationGroupIdMessage(replica.groupId()))
-                    .build();
+        Replica replica = replicaFuture.join();
 
-            replica.processRequest(req, localNodeId);
+        ReplicaSafeTimeSyncRequest req = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+                .groupId(toReplicationGroupIdMessage(replica.groupId()))
+                .proposedSafeTime(proposedSafeTime)
+                .build();
+
+        replica.processRequest(req, localNodeId).whenComplete((res, ex) -> {
+            if (ex != null) {
+                LOG.error("Could not advance safe time for {} to {}", ex, 
replica.groupId(), proposedSafeTime);
+            }
+        });
+    }
+
+    private boolean shouldAdvanceIdleSafeTime() {
+        HybridTimestamp lastProposal = lastIdleSafeTimeProposal;
+        if (lastProposal == null) {
+            // No previous attempt, we have to do it ourselves.
+            return true;
         }
+
+        // This is the actuality time that was needed to be achieved for 
previous attempt to check that this node is still a primary.
+        // If it's already achieved, then previous attempt is unblocked (most 
probably already finished), so we should proceed.
+        // If it's not achieved yet, then the previous attempt is still 
waiting, so we should skip this round of idle safe time propagation.
+        HybridTimestamp requiredLastAttemptActualityTime = 
lastProposal.addPhysicalTime(clockService.maxClockSkewMillis());
+
+        return placementDriver.isActualAt(requiredLastAttemptActualityTime);
     }
 
     /**
@@ -1642,4 +1679,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
         throw new AssertionError("Not supported: " + replicationGroupId);
     }
+
+    private static class ReplicaContext {
+        private volatile @Nullable HybridTimestamp lastIdleSafeTimeProposal;
+    }
 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
index 2dee151d72..a4aad7f6f0 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.replicator.message;
 
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.annotations.Transferable;
 
 /**
@@ -24,4 +25,6 @@ import 
org.apache.ignite.internal.network.annotations.Transferable;
  */
 @Transferable(ReplicaMessageGroup.SAFE_TIME_SYNC_REQUEST)
 public interface ReplicaSafeTimeSyncRequest extends ReplicaRequest {
+    /** Safe time this request proposes. */
+    HybridTimestamp proposedSafeTime();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 83eaed3668..14716b8430 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1132,7 +1132,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         CompletableFuture<Object> resultFuture = new CompletableFuture<>();
 
         applyCmdWithRetryOnSafeTimeReorderException(
-                
REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTime(clockService.now()).build(),
+                
REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTime(request.proposedSafeTime()).build(),
                 resultFuture
         );
 
@@ -3570,17 +3570,23 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             }
 
             return placementDriver.getPrimaryReplica(replicationGroupId, 
current).thenApply(validateClo);
-        } else if (request instanceof ReadOnlyReplicaRequest || request 
instanceof ReplicaSafeTimeSyncRequest) {
-            return placementDriver.getPrimaryReplica(replicationGroupId, 
current)
-                    .thenApply(primaryReplica -> new IgniteBiTuple<>(
-                            primaryReplica != null && 
isLocalPeer(primaryReplica.getLeaseholderId()),
-                            null
-                    ));
+        } else if (request instanceof ReadOnlyReplicaRequest) {
+            return isLocalNodePrimaryReplicaAt(current);
+        } else if (request instanceof ReplicaSafeTimeSyncRequest) {
+            return isLocalNodePrimaryReplicaAt(((ReplicaSafeTimeSyncRequest) 
request).proposedSafeTime());
         } else {
             return completedFuture(new IgniteBiTuple<>(null, null));
         }
     }
 
+    private CompletableFuture<IgniteBiTuple<Boolean, Long>> 
isLocalNodePrimaryReplicaAt(HybridTimestamp timestamp) {
+        return placementDriver.getPrimaryReplica(replicationGroupId, timestamp)
+                .thenApply(primaryReplica -> new IgniteBiTuple<>(
+                        primaryReplica != null && 
isLocalPeer(primaryReplica.getLeaseholderId()),
+                        null
+                ));
+    }
+
     /**
      * Resolves read result to the corresponding binary row. Following rules 
are used for read result resolution:
      * <ol>
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
index 2a08a7dd9a..44017347c4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
@@ -78,4 +78,9 @@ abstract class DelegatingPlacementDriver implements 
PlacementDriver {
     ) {
         return delegate.getAssignments(replicationGroupIds, 
clusterTimeToAwait);
     }
+
+    @Override
+    public boolean isActualAt(HybridTimestamp timestamp) {
+        return delegate.isActualAt(timestamp);
+    }
 }

Reply via email to