This is an automated email from the ASF dual-hosted git repository. rpuch pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new b7161ec2b29 IGNITE-25262 Restore double-write protection with colocation (#5742) b7161ec2b29 is described below commit b7161ec2b29a665c5066727ab256a36f1982b964 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Sat May 3 15:17:03 2025 +0400 IGNITE-25262 Restore double-write protection with colocation (#5742) --- .../raft/ItZonePartitionRaftListenerRecoveryTest.java | 3 ++- .../replicator/raft/ZonePartitionRaftListenerTest.java | 3 ++- .../distributed/ReplicasSafeTimePropagationTest.java | 4 ++-- .../ignite/internal/table/distributed/TableManager.java | 6 ++++-- .../table/distributed/raft/PartitionListener.java | 16 ++++++++++++---- .../distributed/raft/PartitionCommandListenerTest.java | 6 ++++-- .../org/apache/ignite/distributed/ItTxTestCluster.java | 6 ++++-- .../internal/table/impl/DummyInternalTableImpl.java | 3 ++- 8 files changed, 32 insertions(+), 15 deletions(-) diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java index 549a143bca4..97cab6a6d2f 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java @@ -401,7 +401,8 @@ class ItZonePartitionRaftListenerRecoveryTest extends IgniteAbstractTest { minimumRequiredTimeCollectorService, executor, placementDriver, - clockService + clockService, + new ZonePartitionId(PARTITION_ID.zoneId(), PARTITION_ID.partitionId()) ); } diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java index 4b2146c7203..1a1a68997e0 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java @@ -394,7 +394,8 @@ class ZonePartitionRaftListenerTest extends BaseIgniteAbstractTest { mock(MinimumRequiredTimeCollectorService.class), mock(Executor.class), placementDriver, - clockService + clockService, + new ZonePartitionId(ZONE_ID, PARTITION_ID) ); } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java index 25e68f613aa..bfd9dc299b3 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java @@ -396,8 +396,8 @@ public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest { mock(MinimumRequiredTimeCollectorService.class), mock(Executor.class), placementDriver, - clockService - + clockService, + GROUP_ID ) { @Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 1ad0e7ba402..ee8cc1e65ab 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -965,7 +965,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { minTimeCollectorService, partitionOperationsExecutor, executorInclinedPlacementDriver, - clockService + clockService, + zonePartitionId ); var partitionStorageAccess = new PartitionMvStorageAccessImpl( @@ -1298,7 +1299,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { minTimeCollectorService, partitionOperationsExecutor, executorInclinedPlacementDriver, - clockService + clockService, + replicaGrpId ); minTimeCollectorService.addPartition(new TablePartitionId(tableId, partId)); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index d439a80067a..cb65fb53485 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -138,7 +138,14 @@ public class PartitionListener implements RaftGroupListener, RaftTableProcessor private final ClockService clockService; - private volatile ReplicaMeta lastKnownLease; + /** + * Partition group ID that is actually used for replication. + * + * <p>It is a zone partition ID when colocation is enabled, and table partition ID otherwise. + */ + private final ReplicationGroupId realReplicationGroupId; + + private ReplicaMeta lastKnownLease; /** Constructor. */ public PartitionListener( @@ -155,7 +162,8 @@ public class PartitionListener implements RaftGroupListener, RaftTableProcessor MinimumRequiredTimeCollectorService minTimeCollectorService, Executor partitionOperationsExecutor, LeasePlacementDriver placementDriver, - ClockService clockService + ClockService clockService, + ReplicationGroupId realReplicationGroupId ) { this.txManager = txManager; this.storage = partitionDataStorage; @@ -167,6 +175,7 @@ public class PartitionListener implements RaftGroupListener, RaftTableProcessor this.localNodeId = localNodeId; this.placementDriver = placementDriver; this.clockService = clockService; + this.realReplicationGroupId = realReplicationGroupId; onSnapshotSaveHandler = new OnSnapshotSaveHandler(txStatePartitionStorage, partitionOperationsExecutor); @@ -216,8 +225,7 @@ public class PartitionListener implements RaftGroupListener, RaftTableProcessor HybridTimestamp currentTime = clockService.current(); if (lastKnownLease == null || lastKnownLease.getExpirationTime().compareTo(currentTime) < 0) { - ReplicationGroupId groupId = new TablePartitionId(storage.tableId(), storage.partitionId()); - lastKnownLease = placementDriver.getCurrentPrimaryReplica(groupId, currentTime); + lastKnownLease = placementDriver.getCurrentPrimaryReplica(realReplicationGroupId, currentTime); } if (lastKnownLease == null || !lastKnownLease.getLeaseholderId().equals(localNodeId)) { diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java index cc7d509ab17..276476b6f04 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java @@ -313,7 +313,8 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { mock(MinimumRequiredTimeCollectorService.class), mock(Executor.class), placementDriver, - clockService + clockService, + new TablePartitionId(TABLE_ID, PARTITION_ID) ); // Update(All)Command handling requires both information about raft group topology and the primary replica, @@ -542,7 +543,8 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest { mock(MinimumRequiredTimeCollectorService.class), executor, placementDriver, - clockService + clockService, + new TablePartitionId(TABLE_ID, PARTITION_ID) ); txStatePartitionStorage.lastApplied(3L, 1L); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index 5a0691d3b70..c8db73b4460 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -897,7 +897,8 @@ public class ItTxTestCluster { mock(MinimumRequiredTimeCollectorService.class), partitionOperationsExecutor, placementDriver, - clockServices.get(assignment) + clockServices.get(assignment), + zonePartitionId ); zonePartitionRaftListener.addTableProcessor(tableId, tablePartitionRaftListener); @@ -918,7 +919,8 @@ public class ItTxTestCluster { mock(MinimumRequiredTimeCollectorService.class), partitionOperationsExecutor, placementDriver, - clockServices.get(assignment) + clockServices.get(assignment), + new TablePartitionId(tableId, partId) ); } } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index 4013e8d0e5c..e0376b4e7aa 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -535,7 +535,8 @@ public class DummyInternalTableImpl extends InternalTableImpl { mock(MinimumRequiredTimeCollectorService.class), mock(Executor.class), placementDriver, - clockService + clockService, + enabledColocation ? zonePartitionId : tablePartitionId ); if (enabledColocation) {