This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b7161ec2b29 IGNITE-25262 Restore double-write protection with 
colocation (#5742)
b7161ec2b29 is described below

commit b7161ec2b29a665c5066727ab256a36f1982b964
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Sat May 3 15:17:03 2025 +0400

    IGNITE-25262 Restore double-write protection with colocation (#5742)
---
 .../raft/ItZonePartitionRaftListenerRecoveryTest.java    |  3 ++-
 .../replicator/raft/ZonePartitionRaftListenerTest.java   |  3 ++-
 .../distributed/ReplicasSafeTimePropagationTest.java     |  4 ++--
 .../ignite/internal/table/distributed/TableManager.java  |  6 ++++--
 .../table/distributed/raft/PartitionListener.java        | 16 ++++++++++++----
 .../distributed/raft/PartitionCommandListenerTest.java   |  6 ++++--
 .../org/apache/ignite/distributed/ItTxTestCluster.java   |  6 ++++--
 .../internal/table/impl/DummyInternalTableImpl.java      |  3 ++-
 8 files changed, 32 insertions(+), 15 deletions(-)

diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
index 549a143bca4..97cab6a6d2f 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
@@ -401,7 +401,8 @@ class ItZonePartitionRaftListenerRecoveryTest extends 
IgniteAbstractTest {
                 minimumRequiredTimeCollectorService,
                 executor,
                 placementDriver,
-                clockService
+                clockService,
+                new ZonePartitionId(PARTITION_ID.zoneId(), 
PARTITION_ID.partitionId())
         );
     }
 
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index 4b2146c7203..1a1a68997e0 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -394,7 +394,8 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
                 mock(MinimumRequiredTimeCollectorService.class),
                 mock(Executor.class),
                 placementDriver,
-                clockService
+                clockService,
+                new ZonePartitionId(ZONE_ID, PARTITION_ID)
         );
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
index 25e68f613aa..bfd9dc299b3 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java
@@ -396,8 +396,8 @@ public class ReplicasSafeTimePropagationTest extends 
IgniteAbstractTest {
                             mock(MinimumRequiredTimeCollectorService.class),
                             mock(Executor.class),
                             placementDriver,
-                            clockService
-
+                            clockService,
+                            GROUP_ID
                     ) {
                         @Override
                         public void 
onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 1ad0e7ba402..ee8cc1e65ab 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -965,7 +965,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                     minTimeCollectorService,
                     partitionOperationsExecutor,
                     executorInclinedPlacementDriver,
-                    clockService
+                    clockService,
+                    zonePartitionId
             );
 
             var partitionStorageAccess = new PartitionMvStorageAccessImpl(
@@ -1298,7 +1299,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                             minTimeCollectorService,
                             partitionOperationsExecutor,
                             executorInclinedPlacementDriver,
-                            clockService
+                            clockService,
+                            replicaGrpId
                     );
 
                     minTimeCollectorService.addPartition(new 
TablePartitionId(tableId, partId));
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index d439a80067a..cb65fb53485 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -138,7 +138,14 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
 
     private final ClockService clockService;
 
-    private volatile ReplicaMeta lastKnownLease;
+    /**
+     * Partition group ID that is actually used for replication.
+     *
+     * <p>It is a zone partition ID when colocation is enabled, and table 
partition ID otherwise.
+     */
+    private final ReplicationGroupId realReplicationGroupId;
+
+    private ReplicaMeta lastKnownLease;
 
     /** Constructor. */
     public PartitionListener(
@@ -155,7 +162,8 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
             MinimumRequiredTimeCollectorService minTimeCollectorService,
             Executor partitionOperationsExecutor,
             LeasePlacementDriver placementDriver,
-            ClockService clockService
+            ClockService clockService,
+            ReplicationGroupId realReplicationGroupId
     ) {
         this.txManager = txManager;
         this.storage = partitionDataStorage;
@@ -167,6 +175,7 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
         this.localNodeId = localNodeId;
         this.placementDriver = placementDriver;
         this.clockService = clockService;
+        this.realReplicationGroupId = realReplicationGroupId;
 
         onSnapshotSaveHandler = new 
OnSnapshotSaveHandler(txStatePartitionStorage, partitionOperationsExecutor);
 
@@ -216,8 +225,7 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
         HybridTimestamp currentTime = clockService.current();
 
         if (lastKnownLease == null || 
lastKnownLease.getExpirationTime().compareTo(currentTime) < 0) {
-            ReplicationGroupId groupId = new 
TablePartitionId(storage.tableId(), storage.partitionId());
-            lastKnownLease = placementDriver.getCurrentPrimaryReplica(groupId, 
currentTime);
+            lastKnownLease = 
placementDriver.getCurrentPrimaryReplica(realReplicationGroupId, currentTime);
         }
 
         if (lastKnownLease == null || 
!lastKnownLease.getLeaseholderId().equals(localNodeId)) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index cc7d509ab17..276476b6f04 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -313,7 +313,8 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 mock(MinimumRequiredTimeCollectorService.class),
                 mock(Executor.class),
                 placementDriver,
-                clockService
+                clockService,
+                new TablePartitionId(TABLE_ID, PARTITION_ID)
         );
 
         // Update(All)Command handling requires both information about raft 
group topology and the primary replica,
@@ -542,7 +543,8 @@ public class PartitionCommandListenerTest extends 
BaseIgniteAbstractTest {
                 mock(MinimumRequiredTimeCollectorService.class),
                 executor,
                 placementDriver,
-                clockService
+                clockService,
+                new TablePartitionId(TABLE_ID, PARTITION_ID)
         );
 
         txStatePartitionStorage.lastApplied(3L, 1L);
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 5a0691d3b70..c8db73b4460 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -897,7 +897,8 @@ public class ItTxTestCluster {
                     mock(MinimumRequiredTimeCollectorService.class),
                     partitionOperationsExecutor,
                     placementDriver,
-                    clockServices.get(assignment)
+                    clockServices.get(assignment),
+                    zonePartitionId
             );
 
             zonePartitionRaftListener.addTableProcessor(tableId, 
tablePartitionRaftListener);
@@ -918,7 +919,8 @@ public class ItTxTestCluster {
                     mock(MinimumRequiredTimeCollectorService.class),
                     partitionOperationsExecutor,
                     placementDriver,
-                    clockServices.get(assignment)
+                    clockServices.get(assignment),
+                    new TablePartitionId(tableId, partId)
             );
         }
     }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 4013e8d0e5c..e0376b4e7aa 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -535,7 +535,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 mock(MinimumRequiredTimeCollectorService.class),
                 mock(Executor.class),
                 placementDriver,
-                clockService
+                clockService,
+                enabledColocation ? zonePartitionId : tablePartitionId
         );
 
         if (enabledColocation) {

Reply via email to