This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a936ffa476 IGNITE-22759 Do not do partition SafeTime sync if previous
attempt is not finished (#4684)
a936ffa476 is described below
commit a936ffa4765b90535acc2be711eb4f28216a51cb
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Nov 8 16:52:45 2024 +0400
IGNITE-22759 Do not do partition SafeTime sync if previous attempt is not
finished (#4684)
---
.../ignite/client/handler/FakePlacementDriver.java | 5 ++
.../ignite/internal/index/TestPlacementDriver.java | 5 ++
.../impl/ItMetaStorageCompactionTriggerTest.java | 3 ++
.../replicator/utils/TestPlacementDriver.java | 5 ++
.../internal/placementdriver/PlacementDriver.java | 7 +++
.../placementdriver/TestPlacementDriver.java | 5 ++
.../placementdriver/PlacementDriverManager.java | 5 ++
.../ignite/internal/replicator/ReplicaManager.java | 57 +++++++++++++++++++---
.../message/ReplicaSafeTimeSyncRequest.java | 3 ++
.../replicator/PartitionReplicaListener.java | 20 +++++---
.../wrappers/DelegatingPlacementDriver.java | 5 ++
11 files changed, 105 insertions(+), 15 deletions(-)
diff --git
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index 85de0bd044..790a115352 100644
---
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -152,4 +152,9 @@ public class FakePlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
}
};
}
+
+ @Override
+ public boolean isActualAt(HybridTimestamp timestamp) {
+ return true;
+ }
}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
index 12904f9573..344ef5a0e2 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
@@ -90,4 +90,9 @@ class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
)
));
}
+
+ @Override
+ public boolean isActualAt(HybridTimestamp timestamp) {
+ return true;
+ }
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
index e072eaad36..df36e71c1b 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageCompactionTriggerTest.java
@@ -43,16 +43,19 @@ import java.util.concurrent.CountDownLatch;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
+import
org.apache.ignite.internal.distributionzones.rebalance.DistributionZoneRebalanceEngine;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
/** Integration test for {@link MetaStorageCompactionTrigger}. */
+@WithSystemProperty(key =
DistributionZoneRebalanceEngine.SKIP_REBALANCE_TRIGGERS_RECOVERY, value =
"true")
public class ItMetaStorageCompactionTriggerTest extends
ClusterPerClassIntegrationTest {
@Override
protected int initialNodes() {
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
index 03e6182297..1cd3ade248 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/utils/TestPlacementDriver.java
@@ -114,4 +114,9 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
}
});
}
+
+ @Override
+ public boolean isActualAt(HybridTimestamp timestamp) {
+ return true;
+ }
}
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
index b2d6425e17..0a6a099364 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
@@ -30,4 +30,11 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
* replication group.</p>
*/
public interface PlacementDriver extends LeasePlacementDriver,
AssignmentsPlacementDriver {
+ /**
+ * Returns whether the state (on primary replicas/assignments) the
placement driver operates upon is already present (and doesn't need
+ * to be awaited for) for the given timestamp.
+ *
+ * @param timestamp Timestamp of interest.
+ */
+ boolean isActualAt(HybridTimestamp timestamp);
}
diff --git
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index e3895c912b..bff99cd020 100644
---
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -115,4 +115,9 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
public void setPrimaryReplicaSupplier(Supplier<? extends ReplicaMeta>
primaryReplicaSupplier) {
this.primaryReplicaSupplier = primaryReplicaSupplier;
}
+
+ @Override
+ public boolean isActualAt(HybridTimestamp timestamp) {
+ return true;
+ }
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 058247af6e..bed0e63056 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -278,6 +278,11 @@ public class PlacementDriverManager implements
IgniteComponent {
private PlacementDriver createPlacementDriver() {
return new PlacementDriver() {
+ @Override
+ public boolean isActualAt(HybridTimestamp timestamp) {
+ return
metastore.clusterTime().currentSafeTime().compareTo(timestamp) >= 0;
+ }
+
@Override
public CompletableFuture<List<TokenizedAssignments>>
getAssignments(
List<? extends ReplicationGroupId> replicationGroupIds,
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 666f181f15..b213e36f65 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -223,6 +223,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this
code
private Function<ReplicaRequest, ReplicationGroupId> groupIdConverter = r
-> r.groupId().asReplicationGroupId();
+ private volatile @Nullable HybridTimestamp lastIdleSafeTimeProposal;
+
/**
* Constructor for a replica service.
*
@@ -335,6 +337,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
this.partitionRaftConfigurer = partitionRaftConfigurer;
this.replicaStateManager = new
ReplicaStateManager(replicaStartStopExecutor, clockService, placementDriver,
this);
+ // This pool MUST be single-threaded to make sure idle safe time
propagation attempts are not reordered on it.
scheduledIdleSafeTimeSyncExecutor = Executors.newScheduledThreadPool(
1,
NamedThreadFactory.create(nodeName,
"scheduled-idle-safe-time-sync-thread", LOG)
@@ -904,6 +907,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
} catch (NodeStoppingException ignored) {
// No-op.
}
+
return v;
});
}
@@ -1073,9 +1077,20 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* Idle safe time sync for replicas.
*/
private void idleSafeTimeSync() {
+ if (!shouldAdvanceIdleSafeTime()) {
+ // If previous attempt may still be waiting on the Metastorage
SafeTime, we should not send the command ourselves as this
+ // might be an indicator that Metastorage SafeTime has stuck for
some time; if we send the command, it will have to add its
+ // future, increasing (most probably, uselessly) heap pressure.
+ return;
+ }
+
+ HybridTimestamp proposedSafeTime = clockService.now();
+
+ lastIdleSafeTimeProposal = proposedSafeTime;
+
for (Entry<ReplicationGroupId, CompletableFuture<Replica>> entry :
replicas.entrySet()) {
try {
- sendSafeTimeSyncIfReplicaReady(entry.getValue());
+ sendSafeTimeSyncIfReplicaReady(entry.getValue(),
proposedSafeTime);
} catch (Exception | AssertionError e) {
LOG.warn("Error while trying to send a safe time sync request
[groupId={}]", e, entry.getKey());
} catch (Error e) {
@@ -1086,16 +1101,38 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
}
}
- private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica>
replicaFuture) {
- if (isCompletedSuccessfully(replicaFuture)) {
- Replica replica = replicaFuture.join();
+ private void sendSafeTimeSyncIfReplicaReady(CompletableFuture<Replica>
replicaFuture, HybridTimestamp proposedSafeTime) {
+ if (!isCompletedSuccessfully(replicaFuture)) {
+ return;
+ }
- ReplicaSafeTimeSyncRequest req =
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
- .groupId(toReplicationGroupIdMessage(replica.groupId()))
- .build();
+ Replica replica = replicaFuture.join();
- replica.processRequest(req, localNodeId);
+ ReplicaSafeTimeSyncRequest req =
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+ .groupId(toReplicationGroupIdMessage(replica.groupId()))
+ .proposedSafeTime(proposedSafeTime)
+ .build();
+
+ replica.processRequest(req, localNodeId).whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.error("Could not advance safe time for {} to {}", ex,
replica.groupId(), proposedSafeTime);
+ }
+ });
+ }
+
+ private boolean shouldAdvanceIdleSafeTime() {
+ HybridTimestamp lastProposal = lastIdleSafeTimeProposal;
+ if (lastProposal == null) {
+ // No previous attempt, we have to do it ourselves.
+ return true;
}
+
+ // This is the actuality time that was needed to be achieved for
previous attempt to check that this node is still a primary.
+ // If it's already achieved, then previous attempt is unblocked (most
probably already finished), so we should proceed.
+ // If it's not achieved yet, then the previous attempt is still
waiting, so we should skip this round of idle safe time propagation.
+ HybridTimestamp requiredLastAttemptActualityTime =
lastProposal.addPhysicalTime(clockService.maxClockSkewMillis());
+
+ return placementDriver.isActualAt(requiredLastAttemptActualityTime);
}
/**
@@ -1642,4 +1679,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
throw new AssertionError("Not supported: " + replicationGroupId);
}
+
+ private static class ReplicaContext {
+ private volatile @Nullable HybridTimestamp lastIdleSafeTimeProposal;
+ }
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
index 2dee151d72..a4aad7f6f0 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaSafeTimeSyncRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.replicator.message;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.annotations.Transferable;
/**
@@ -24,4 +25,6 @@ import
org.apache.ignite.internal.network.annotations.Transferable;
*/
@Transferable(ReplicaMessageGroup.SAFE_TIME_SYNC_REQUEST)
public interface ReplicaSafeTimeSyncRequest extends ReplicaRequest {
+ /** Safe time this request proposes. */
+ HybridTimestamp proposedSafeTime();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 83eaed3668..14716b8430 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -1132,7 +1132,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
applyCmdWithRetryOnSafeTimeReorderException(
-
REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTime(clockService.now()).build(),
+
REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().safeTime(request.proposedSafeTime()).build(),
resultFuture
);
@@ -3570,17 +3570,23 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
return placementDriver.getPrimaryReplica(replicationGroupId,
current).thenApply(validateClo);
- } else if (request instanceof ReadOnlyReplicaRequest || request
instanceof ReplicaSafeTimeSyncRequest) {
- return placementDriver.getPrimaryReplica(replicationGroupId,
current)
- .thenApply(primaryReplica -> new IgniteBiTuple<>(
- primaryReplica != null &&
isLocalPeer(primaryReplica.getLeaseholderId()),
- null
- ));
+ } else if (request instanceof ReadOnlyReplicaRequest) {
+ return isLocalNodePrimaryReplicaAt(current);
+ } else if (request instanceof ReplicaSafeTimeSyncRequest) {
+ return isLocalNodePrimaryReplicaAt(((ReplicaSafeTimeSyncRequest)
request).proposedSafeTime());
} else {
return completedFuture(new IgniteBiTuple<>(null, null));
}
}
+ private CompletableFuture<IgniteBiTuple<Boolean, Long>>
isLocalNodePrimaryReplicaAt(HybridTimestamp timestamp) {
+ return placementDriver.getPrimaryReplica(replicationGroupId, timestamp)
+ .thenApply(primaryReplica -> new IgniteBiTuple<>(
+ primaryReplica != null &&
isLocalPeer(primaryReplica.getLeaseholderId()),
+ null
+ ));
+ }
+
/**
* Resolves read result to the corresponding binary row. Following rules
are used for read result resolution:
* <ol>
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
index 2a08a7dd9a..44017347c4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/wrappers/DelegatingPlacementDriver.java
@@ -78,4 +78,9 @@ abstract class DelegatingPlacementDriver implements
PlacementDriver {
) {
return delegate.getAssignments(replicationGroupIds,
clusterTimeToAwait);
}
+
+ @Override
+ public boolean isActualAt(HybridTimestamp timestamp) {
+ return delegate.isActualAt(timestamp);
+ }
}