This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 23d713afad7 IGNITE-24365 Implement 
UpdateMinimumActiveTxBeginTimeReplicaRequest processing for zone replica (#5197)
23d713afad7 is described below

commit 23d713afad78011eb159005d451a2100455c372a
Author: Slava Koptilin <[email protected]>
AuthorDate: Fri Feb 14 16:44:27 2025 +0200

    IGNITE-24365 Implement UpdateMinimumActiveTxBeginTimeReplicaRequest 
processing for zone replica (#5197)
---
 .../compaction/ItCatalogCompactionTest.java        |   2 +-
 .../compaction/CatalogCompactionRunner.java        |  46 ++++---
 .../compaction/CatalogManagerCompactionFacade.java |  39 +++++-
 .../CatalogManagerCompactionFacadeTest.java        |  74 ++++++++++-
 modules/partition-replicator/build.gradle          |   1 +
 .../replicator/ItReplicaLifecycleTest.java         | 140 ++++++++++++++++++-
 .../replicator/ItZoneDataReplicationTest.java      |   7 +-
 .../partition/replicator/fixtures/Node.java        |  45 +++++--
 .../replicator/fixtures/TestPlacementDriver.java   |  21 ++-
 .../PartitionReplicaLifecycleManager.java          |   2 +-
 .../replicator/ZonePartitionReplicaListener.java   | 148 ++++++++++++++++-----
 .../MinimumActiveTxTimeReplicaRequestHandler.java  |  71 ++++++++++
 .../TxFinishReplicaRequestHandler.java             |   5 +-
 .../replicator/handlers/package-info.java          |  23 ++++
 .../replicator/raft/ZonePartitionRaftListener.java |  45 ++++---
 .../{ => handlers}/FinishTxCommandHandler.java     |   4 +-
 .../replicator/raft/handlers/package-info.java     |  24 ++++
 .../raft/ZonePartitionRaftListenerTest.java        |   2 +-
 .../table/distributed/raft/PartitionListener.java  |  51 +++----
 .../MinimumActiveTxTimeCommandHandler.java         |  85 ++++++++++++
 .../distributed/raft/handlers/package-info.java    |  23 ++++
 .../replicator/PartitionReplicaListener.java       |  32 ++---
 22 files changed, 750 insertions(+), 140 deletions(-)

diff --git 
a/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
 
b/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
index 0889134b0e0..d9244e49f30 100644
--- 
a/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
+++ 
b/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
@@ -71,7 +71,7 @@ class ItCatalogCompactionTest extends 
ClusterPerClassIntegrationTest {
      */
     private static final long CHECK_POINT_INTERVAL_MS = LW_UPDATE_TIME_MS / 2;
 
-    /** Show be greater than 2 x {@link #LW_UPDATE_TIME_MS}. */
+    /** Should be greater than 2 x {@link #LW_UPDATE_TIME_MS}. */
     private static final long COMPACTION_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(10);
 
     /** Transactions that are started in the test. */
diff --git 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
index da0d877bdd4..cef41906c6e 100644
--- 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
+++ 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
@@ -18,6 +18,10 @@
 package org.apache.ignite.internal.catalog.compaction;
 
 import static java.util.function.Predicate.not;
+import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
+import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.getBoolean;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
@@ -68,10 +72,11 @@ import 
org.apache.ignite.internal.partition.replicator.network.replication.Updat
 import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.replicator.message.ReplicaMessageUtils;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
-import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
 import org.apache.ignite.internal.schema.SchemaSyncService;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
 import org.apache.ignite.internal.tx.ActiveLocalTxMinimumRequiredTimeProvider;
@@ -152,6 +157,10 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
 
     private volatile UUID localNodeId;
 
+    /* Feature flag for zone based collocation track */
+    // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove it.
+    private final boolean enabledColocationFeature = 
getBoolean(COLOCATION_FEATURE_FLAG, false);
+
     /**
      * Constructs catalog compaction runner.
      */
@@ -403,10 +412,11 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
 
         return schemaSyncService.waitForMetadataCompleteness(nowTs)
                 .thenComposeAsync(ignore -> {
-                    Int2IntMap tablesWithPartitions =
-                            
catalogManagerFacade.collectTablesWithPartitionsBetween(txBeginTime, 
nowTs.longValue());
+                    Int2IntMap idsWithPartitions = enabledColocationFeature
+                            ? 
catalogManagerFacade.collectZonesWithPartitionsBetween(txBeginTime, 
nowTs.longValue())
+                            : 
catalogManagerFacade.collectTablesWithPartitionsBetween(txBeginTime, 
nowTs.longValue());
 
-                    ObjectIterator<Entry> itr = 
tablesWithPartitions.int2IntEntrySet().iterator();
+                    ObjectIterator<Entry> itr = 
idsWithPartitions.int2IntEntrySet().iterator();
 
                     return invokeOnLocalReplicas(txBeginTime, localNodeId, 
itr);
                 }, executor);
@@ -586,22 +596,23 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
-    private CompletableFuture<Void> invokeOnLocalReplicas(long txBeginTime, 
UUID localNodeId, ObjectIterator<Entry> tabTtr) {
-        if (!tabTtr.hasNext()) {
+    private CompletableFuture<Void> invokeOnLocalReplicas(long txBeginTime, 
UUID localNodeId, ObjectIterator<Entry> entryIterator) {
+        if (!entryIterator.hasNext()) {
             return CompletableFutures.nullCompletedFuture();
         }
 
-        Entry tableWithPartitions = tabTtr.next();
-        int tableId = tableWithPartitions.getIntKey();
-        int partitions = tableWithPartitions.getIntValue();
+        Entry idWithPartitions = entryIterator.next();
+        int id = idWithPartitions.getIntKey();
+        int partitions = idWithPartitions.getIntValue();
         List<CompletableFuture<?>> partFutures = new ArrayList<>(partitions);
         HybridTimestamp nowTs = clockService.now();
 
         for (int p = 0; p < partitions; p++) {
-            TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
p);
+            ReplicationGroupId groupReplicationId = enabledColocationFeature
+                    ? new ZonePartitionId(id, p) : new TablePartitionId(id, p);
 
             CompletableFuture<?> fut = placementDriver
-                    .getPrimaryReplica(tablePartitionId, nowTs)
+                    .getPrimaryReplica(groupReplicationId, nowTs)
                     .thenCompose(meta -> {
                         // If primary is not elected yet - we'll update 
replication groups on next iteration.
                         if (meta == null || meta.getLeaseholderId() == null) {
@@ -614,14 +625,13 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
                             return CompletableFutures.nullCompletedFuture();
                         }
 
-                        TablePartitionIdMessage partIdMessage = 
ReplicaMessageUtils.toTablePartitionIdMessage(
-                                REPLICA_MESSAGES_FACTORY,
-                                tablePartitionId
-                        );
+                        ReplicationGroupIdMessage groupIdMessage = 
enabledColocationFeature
+                                ? 
toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, (ZonePartitionId) 
groupReplicationId)
+                                : 
toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, (TablePartitionId) 
groupReplicationId);
 
                         UpdateMinimumActiveTxBeginTimeReplicaRequest msg = 
REPLICATION_MESSAGES_FACTORY
                                 .updateMinimumActiveTxBeginTimeReplicaRequest()
-                                .groupId(partIdMessage)
+                                .groupId(groupIdMessage)
                                 .timestamp(txBeginTime)
                                 .build();
 
@@ -632,7 +642,7 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
         }
 
         return CompletableFutures.allOf(partFutures)
-                .thenComposeAsync(ignore -> invokeOnLocalReplicas(txBeginTime, 
localNodeId, tabTtr), executor);
+                .thenComposeAsync(ignore -> invokeOnLocalReplicas(txBeginTime, 
localNodeId, entryIterator), executor);
     }
 
     private class CatalogCompactionMessageHandler implements 
NetworkMessageHandler {
diff --git 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
index 79d9ad1a61b..6e0959a52ab 100644
--- 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
+++ 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
@@ -38,6 +38,7 @@ class CatalogManagerCompactionFacade {
         this.catalogManager = catalogManager;
     }
 
+    // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove this 
method.
     /**
      * Scans catalog versions in a given time interval (including interval 
boundaries).
      * Extracts all tables contained in these catalog versions and creates a 
mapping
@@ -48,25 +49,53 @@ class CatalogManagerCompactionFacade {
      * @return Mapping tableId to number of partitions in this table.
      */
     Int2IntMap collectTablesWithPartitionsBetween(long minTsInclusive, long 
maxTsInclusive) {
-        Int2IntMap tablesWithPartitions = new Int2IntOpenHashMap();
+        Int2IntMap tableIdsWithPartitions = new Int2IntOpenHashMap();
         int curVer = catalogManager.activeCatalogVersion(minTsInclusive);
         int lastVer = catalogManager.activeCatalogVersion(maxTsInclusive);
 
         do {
             Catalog catalog = catalogManager.catalog(curVer);
 
-            assert catalog != null : "ver=" + curVer + ", last=" + lastVer;
+            assert catalog != null : "Failed to find a catalog for the given 
version [version=" + curVer + ", lastVersion=" + lastVer + ']';
 
             for (CatalogTableDescriptor table : catalog.tables()) {
                 CatalogZoneDescriptor zone = catalog.zone(table.zoneId());
 
-                assert zone != null : table.zoneId();
+                assert zone != null :
+                        "Failed to find a zone for the given catalog version 
[version=" + curVer + ", tableId=" + table.id() + ']';
 
-                tablesWithPartitions.put(table.id(), zone.partitions());
+                tableIdsWithPartitions.put(table.id(), zone.partitions());
             }
         } while (++curVer <= lastVer);
 
-        return tablesWithPartitions;
+        return tableIdsWithPartitions;
+    }
+
+    /**
+     * Scans catalog versions in a given time interval (including interval 
boundaries).
+     * Extracts all zones contained in these catalog versions and creates a 
mapping
+     * zoneId -> number of partitions in this table.
+     *
+     * @param minTsInclusive Lower timestamp (inclusive).
+     * @param maxTsInclusive Upper timestamp (inclusive).
+     * @return Mapping zoneId to number of partitions in this zone.
+     */
+    Int2IntMap collectZonesWithPartitionsBetween(long minTsInclusive, long 
maxTsInclusive) {
+        Int2IntMap zoneIdsWithPartitions = new Int2IntOpenHashMap();
+        int curVer = catalogManager.activeCatalogVersion(minTsInclusive);
+        int lastVer = catalogManager.activeCatalogVersion(maxTsInclusive);
+
+        do {
+            Catalog catalog = catalogManager.catalog(curVer);
+
+            assert catalog != null : "Failed to find a catalog for the given 
version [version=" + curVer + ", lastVersion=" + lastVer + ']';
+
+            for (CatalogZoneDescriptor zone : catalog.zones()) {
+                zoneIdsWithPartitions.put(zone.id(), zone.partitions());
+            }
+        } while (++curVer <= lastVer);
+
+        return zoneIdsWithPartitions;
     }
 
     /**
diff --git 
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
 
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
index 865122afa87..3bc5ac616d3 100644
--- 
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
+++ 
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
@@ -31,8 +31,13 @@ import java.util.List;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
 import org.apache.ignite.internal.catalog.commands.CreateTableCommandBuilder;
+import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
+import org.apache.ignite.internal.catalog.commands.CreateZoneCommandBuilder;
 import org.apache.ignite.internal.catalog.commands.DropTableCommand;
 import org.apache.ignite.internal.catalog.commands.DropTableCommandBuilder;
+import org.apache.ignite.internal.catalog.commands.DropZoneCommand;
+import org.apache.ignite.internal.catalog.commands.DropZoneCommandBuilder;
+import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
 import org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -48,6 +53,7 @@ class CatalogManagerCompactionFacadeTest extends 
AbstractCatalogCompactionTest {
         catalogManagerFacade = new 
CatalogManagerCompactionFacade(catalogManager);
     }
 
+    // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove this 
test.
     @Test
     void testCollectTablesWithPartitionsBetween() {
         CreateTableCommandBuilder tableCmdBuilder = 
CreateTableCommand.builder()
@@ -74,21 +80,24 @@ class CatalogManagerCompactionFacadeTest extends 
AbstractCatalogCompactionTest {
         
assertThat(catalogManager.execute(dropTableCommandBuilder.tableName("test3").build()),
 willCompleteSuccessfully());
 
         {
-            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectTablesWithPartitionsBetween(from1,
+            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
+                    from1,
                     clockService.nowLong());
 
             assertThat(tablesWithParts.keySet(), hasSize(3));
         }
 
         {
-            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectTablesWithPartitionsBetween(from2,
+            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
+                    from2,
                     clockService.nowLong());
 
             assertThat(tablesWithParts.keySet(), hasSize(2));
         }
 
         {
-            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectTablesWithPartitionsBetween(from3,
+            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
+                    from3,
                     clockService.nowLong());
 
             assertThat(tablesWithParts.keySet(), hasSize(1));
@@ -97,13 +106,68 @@ class CatalogManagerCompactionFacadeTest extends 
AbstractCatalogCompactionTest {
         {
             Int2IntMap tablesWithParts = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
                     clockService.nowLong(),
-                    clockService.nowLong()
-            );
+                    clockService.nowLong());
 
             assertThat(tablesWithParts.keySet(), hasSize(0));
         }
     }
 
+    @Test
+    void testCollectZonesWithPartitionsBetween() {
+        CreateZoneCommandBuilder zoneCommandBuilder = 
CreateZoneCommand.builder()
+                
.storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile("ai-persist").build()))
+                .partitions(1);
+
+        DropZoneCommandBuilder dropZoneCommandBuilder = 
DropZoneCommand.builder();
+
+        long from1 = clockService.nowLong();
+
+        
assertThat(catalogManager.execute(zoneCommandBuilder.zoneName("test1").build()),
 willCompleteSuccessfully());
+        
assertThat(catalogManager.execute(dropZoneCommandBuilder.zoneName("test1").build()),
 willCompleteSuccessfully());
+
+        long from2 = clockService.nowLong();
+
+        
assertThat(catalogManager.execute(zoneCommandBuilder.zoneName("test2").build()),
 willCompleteSuccessfully());
+        
assertThat(catalogManager.execute(dropZoneCommandBuilder.zoneName("test2").build()),
 willCompleteSuccessfully());
+
+        long from3 = clockService.nowLong();
+        
assertThat(catalogManager.execute(zoneCommandBuilder.zoneName("test3").build()),
 willCompleteSuccessfully());
+        
assertThat(catalogManager.execute(dropZoneCommandBuilder.zoneName("test3").build()),
 willCompleteSuccessfully());
+
+        // Take into account that there is the default zone.
+        {
+            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectZonesWithPartitionsBetween(
+                    from1,
+                    clockService.nowLong());
+
+            assertThat(tablesWithParts.keySet(), hasSize(4));
+        }
+
+        {
+            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectZonesWithPartitionsBetween(
+                    from2,
+                    clockService.nowLong());
+
+            assertThat(tablesWithParts.keySet(), hasSize(3));
+        }
+
+        {
+            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectZonesWithPartitionsBetween(
+                    from3,
+                    clockService.nowLong());
+
+            assertThat(tablesWithParts.keySet(), hasSize(2));
+        }
+
+        {
+            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectZonesWithPartitionsBetween(
+                    clockService.nowLong(),
+                    clockService.nowLong());
+
+            assertThat(tablesWithParts.keySet(), hasSize(1));
+        }
+    }
+
     @Test
     void testCatalogPriorToVersionAtTsNullable() {
         Catalog earliestCatalog = 
catalogManager.catalog(catalogManager.earliestCatalogVersion());
diff --git a/modules/partition-replicator/build.gradle 
b/modules/partition-replicator/build.gradle
index d8c9d94f5e7..a0faa4a7b96 100644
--- a/modules/partition-replicator/build.gradle
+++ b/modules/partition-replicator/build.gradle
@@ -85,6 +85,7 @@ dependencies {
     integrationTestImplementation project(':ignite-cluster-management')
     integrationTestImplementation project(':ignite-schema')
     integrationTestImplementation project(':ignite-catalog')
+    integrationTestImplementation project(':ignite-catalog-compaction') // 
TODO https://issues.apache.org/jira/browse/IGNITE-22522
     integrationTestImplementation project(':ignite-configuration')
     integrationTestImplementation project(':ignite-configuration-root')
     integrationTestImplementation project(':ignite-configuration-system')
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index bb5d88aa035..ff1513ae7f4 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.partition.replicator;
 
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_TEST_PROFILE_NAME;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
@@ -26,16 +27,20 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTest
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
+import static 
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
 import static 
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
 import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToPublisher;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.apache.ignite.sql.ColumnType.INT32;
 import static org.apache.ignite.sql.ColumnType.INT64;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -50,12 +55,12 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -80,13 +85,18 @@ import 
org.apache.ignite.internal.partition.replicator.fixtures.Node.InvokeInter
 import 
org.apache.ignite.internal.partition.replicator.fixtures.TestPlacementDriver;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
+import 
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils;
+import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
+import 
org.apache.ignite.internal.partitiondistribution.TokenizedAssignmentsImpl;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.configuration.GcConfiguration;
 import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
+import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableTestUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
@@ -157,6 +167,9 @@ public class ItReplicaLifecycleTest extends 
IgniteAbstractTest {
     @InjectConfiguration("mock.profiles = {" + DEFAULT_STORAGE_PROFILE + 
".engine = aipersist, test.engine=test}")
     private static StorageConfiguration storageConfiguration;
 
+    @InjectConfiguration
+    private GcConfiguration gcConfiguration;
+
     @InjectExecutorService
     private static ScheduledExecutorService scheduledExecutorService;
 
@@ -255,7 +268,8 @@ public class ItReplicaLifecycleTest extends 
IgniteAbstractTest {
                 replicationConfiguration,
                 txConfiguration,
                 scheduledExecutorService,
-                invokeInterceptor
+                invokeInterceptor,
+                gcConfiguration
         );
 
         nodes.put(idx, node);
@@ -842,6 +856,97 @@ public class ItReplicaLifecycleTest extends 
IgniteAbstractTest {
         assertDoesNotThrow(tx::commit);
     }
 
+    @Test
+    public void testCatalogCompaction(TestInfo testInfo) throws Exception {
+        // How often we update the low water mark.
+        long lowWatermarkUpdateInterval = 500;
+        updateLowWatermarkConfiguration(lowWatermarkUpdateInterval * 2, 
lowWatermarkUpdateInterval);
+
+        // Prepare a single node cluster.
+        startNodes(testInfo, 1);
+        Node node = getNode(0);
+
+        List<Set<Assignment>> assignments = 
PartitionDistributionUtils.calculateAssignments(
+                nodes.values().stream().map(n -> n.name).collect(toList()), 1, 
1);
+
+        List<TokenizedAssignments> tokenizedAssignments = assignments.stream()
+                .map(a -> new TokenizedAssignmentsImpl(a, Integer.MIN_VALUE))
+                .collect(toList());
+
+        
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+        placementDriver.setAssignments(tokenizedAssignments);
+
+        forceCheckpoint(node);
+
+        String zoneName = "test-zone";
+        createZone(node, zoneName, 1, 1);
+        int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager, 
zoneName, node.hybridClock.nowLong());
+        prepareTableIdToZoneIdConverter(node, zoneId);
+
+        int catalogVersion1 = getLatestCatalogVersion(node);
+
+        String tableName1 = "test_table_1";
+        createTable(node, zoneName, tableName1);
+
+        String tableName2 = "test_table_2";
+        createTable(node, zoneName, tableName2);
+
+        int tableId = TableTestUtils.getTableId(node.catalogManager, 
tableName2, node.hybridClock.nowLong());
+        TableViewInternal tableViewInternal = node.tableManager.table(tableId);
+        KeyValueView<Long, Integer> tableView = 
tableViewInternal.keyValueView(Long.class, Integer.class);
+
+        // Write 2 rows to the table.
+        Map<Long, Integer> valuesToPut = Map.of(0L, 0, 1L, 1);
+        assertDoesNotThrow(() -> tableView.putAll(null, valuesToPut));
+
+        forceCheckpoint(node);
+
+        int catalogVersion2 = getLatestCatalogVersion(node);
+        assertThat("The catalog version did not changed [initial=" + 
catalogVersion1 + ", latest=" + catalogVersion2 + ']',
+                catalogVersion2, greaterThan(catalogVersion1));
+
+        expectEarliestCatalogVersion(node, catalogVersion2 - 1);
+    }
+
+    private static void expectEarliestCatalogVersion(Node node, int 
expectedVersion) throws Exception {
+        boolean result = waitForCondition(() -> 
getEarliestCatalogVersion(node) == expectedVersion, 10_000);
+
+        assertTrue(result,
+                "Failed to wait for the expected catalog version [expected=" + 
expectedVersion
+                        + ", earliest=" + getEarliestCatalogVersion(node)
+                        + ", latest=" + getLatestCatalogVersion(node) + ']');
+    }
+
+    private static int getLatestCatalogVersion(Node node) {
+        Catalog catalog = getLatestCatalog(node);
+
+        return catalog.version();
+    }
+
+    private static int getEarliestCatalogVersion(Node node) {
+        CatalogManager catalogManager = node.catalogManager;
+
+        int ver = catalogManager.earliestCatalogVersion();
+
+        Catalog catalog = catalogManager.catalog(ver);
+
+        Objects.requireNonNull(catalog);
+
+        return catalog.version();
+    }
+
+    private static Catalog getLatestCatalog(Node node) {
+        CatalogManager catalogManager = node.catalogManager;
+
+        int ver = 
catalogManager.activeCatalogVersion(node.hybridClock.nowLong());
+
+        Catalog catalog = catalogManager.catalog(ver);
+
+        Objects.requireNonNull(catalog);
+
+        return catalog;
+    }
+
     private static RemotelyTriggeredResource getVersionedStorageCursor(Node 
node, FullyQualifiedResourceId cursorId) {
         return node.resourcesRegistry.resources().get(cursorId);
     }
@@ -914,7 +1019,7 @@ public class ItReplicaLifecycleTest extends 
IgniteAbstractTest {
                 return false;
             }
 
-            Replica replica = replicaFut.get(1, TimeUnit.SECONDS);
+            Replica replica = replicaFut.get(1, SECONDS);
 
             return replica != null && (((ZonePartitionReplicaListener) 
replica.listener()).tableReplicaListeners().size() == count);
         } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
@@ -947,4 +1052,33 @@ public class ItReplicaLifecycleTest extends 
IgniteAbstractTest {
         verify(internalTable.txStateStorage(), 
timeout(AWAIT_TIMEOUT_MILLIS).atLeast(1))
                 .destroyTxStateStorage(partitionId);
     }
+
+    /**
+     * Update low water mark configuration.
+     *
+     * @param dataAvailabilityTime Data availability time.
+     * @param updateInterval Update interval.
+     */
+    private void updateLowWatermarkConfiguration(long dataAvailabilityTime, 
long updateInterval) {
+        CompletableFuture<?> updateFuture = 
gcConfiguration.lowWatermark().change(change -> {
+            change.changeDataAvailabilityTime(dataAvailabilityTime);
+            change.changeUpdateInterval(updateInterval);
+        });
+
+        assertThat(updateFuture, willSucceedFast());
+    }
+
+    /**
+     * Start the new checkpoint immediately on the provided node.
+     *
+     * @param node Node to start the checkpoint on.
+     */
+    private void forceCheckpoint(Node node) {
+        PersistentPageMemoryStorageEngine storageEngine = 
(PersistentPageMemoryStorageEngine) node
+                .dataStorageManager()
+                .engineByStorageProfile(DEFAULT_STORAGE_PROFILE);
+
+        
assertThat(storageEngine.checkpointManager().forceCheckpoint("test-reason").futureFor(FINISHED),
+                willSucceedIn(10, SECONDS));
+    }
 }
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java
index de8c6c9d12b..ec43d53eea6 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java
@@ -74,6 +74,7 @@ import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfigurat
 import 
org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.schema.configuration.GcConfiguration;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
 import org.apache.ignite.internal.table.InternalTable;
@@ -144,6 +145,9 @@ public class ItZoneDataReplicationTest extends 
IgniteAbstractTest {
     @InjectConfiguration("mock.profiles = {" + DEFAULT_STORAGE_PROFILE + 
".engine = aipersist, test.engine=test}")
     private static StorageConfiguration storageConfiguration;
 
+    @InjectConfiguration
+    private static GcConfiguration gcConfiguration;
+
     @InjectExecutorService
     private static ScheduledExecutorService scheduledExecutorService;
 
@@ -234,7 +238,8 @@ public class ItZoneDataReplicationTest extends 
IgniteAbstractTest {
                 replicationConfiguration,
                 txConfiguration,
                 scheduledExecutorService,
-                null
+                null,
+                gcConfiguration
         );
     }
 
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index cdc63d197a4..2acb5599a5f 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -51,6 +51,7 @@ import java.util.function.LongSupplier;
 import org.apache.ignite.internal.app.ThreadPoolsManager;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
@@ -78,6 +79,7 @@ import 
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationSt
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
 import 
org.apache.ignite.internal.disaster.system.SystemDisasterRecoveryStorage;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceMinimumRequiredTimeProviderImpl;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.hlc.ClockServiceImpl;
@@ -90,6 +92,8 @@ import 
org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.lowwatermark.LowWatermarkImpl;
+import 
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
+import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -129,7 +133,6 @@ import 
org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.SchemaSyncService;
 import org.apache.ignite.internal.schema.configuration.GcConfiguration;
-import 
org.apache.ignite.internal.schema.configuration.GcExtensionConfiguration;
 import 
org.apache.ignite.internal.schema.configuration.GcExtensionConfigurationSchema;
 import 
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
 import 
org.apache.ignite.internal.schema.configuration.StorageUpdateExtensionConfiguration;
@@ -263,6 +266,8 @@ public class Node {
     @Nullable
     private volatile InvokeInterceptor invokeInterceptor;
 
+    private final CatalogCompactionRunner catalogCompactionRunner;
+
     /** Interceptor for {@link MetaStorageManager#invoke} calls. */
     @FunctionalInterface
     public interface InvokeInterceptor {
@@ -285,7 +290,8 @@ public class Node {
             ReplicationConfiguration replicationConfiguration,
             TransactionConfiguration transactionConfiguration,
             ScheduledExecutorService scheduledExecutorService,
-            @Nullable InvokeInterceptor invokeInterceptor
+            @Nullable InvokeInterceptor invokeInterceptor,
+            GcConfiguration gcConfiguration
     ) {
         this.invokeInterceptor = invokeInterceptor;
 
@@ -489,8 +495,6 @@ public class Node {
 
         var registry = new 
MetaStorageRevisionListenerRegistry(metaStorageManager);
 
-        GcConfiguration gcConfig = 
clusterConfigRegistry.getConfiguration(GcExtensionConfiguration.KEY).gc();
-
         DataStorageModules dataStorageModules = new DataStorageModules(List.of(
                 new PersistentPageMemoryDataStorageModule(),
                 new NonVolatileTestDataStorageModule()
@@ -514,7 +518,7 @@ public class Node {
 
         lowWatermark = new LowWatermarkImpl(
                 name,
-                gcConfig.lowWatermark(),
+                gcConfiguration.lowWatermark(),
                 clockService,
                 vaultManager,
                 failureManager,
@@ -575,6 +579,28 @@ public class Node {
 
         schemaSyncService = new 
SchemaSyncServiceImpl(metaStorageManager.clusterTime(), 
delayDurationMsSupplier);
 
+        MinimumRequiredTimeCollectorService minTimeCollectorService = new 
MinimumRequiredTimeCollectorServiceImpl();
+
+        catalogCompactionRunner = new CatalogCompactionRunner(
+                name,
+                (CatalogManagerImpl) catalogManager,
+                clusterService.messagingService(),
+                logicalTopologyService,
+                placementDriver,
+                replicaSvc,
+                clockService,
+                schemaSyncService,
+                clusterService.topologyService(),
+                threadPoolsManager.commonScheduler(),
+                clockService::nowLong,
+                minTimeCollectorService,
+                new 
RebalanceMinimumRequiredTimeProviderImpl(metaStorageManager, catalogManager));
+
+        ((MetaStorageManagerImpl) 
metaStorageManager).addElectionListener(catalogCompactionRunner::updateCoordinator);
+
+        lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
+                params -> 
catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters)
 params).newLowWatermark()));
+
         ScheduledExecutorService rebalanceScheduler = 
Executors.newScheduledThreadPool(
                 REBALANCE_SCHEDULER_POOL_SIZE,
                 NamedThreadFactory.create(name, "test-rebalance-scheduler", 
LOG)
@@ -625,12 +651,10 @@ public class Node {
         StorageUpdateConfiguration storageUpdateConfiguration = 
clusterConfigRegistry
                 
.getConfiguration(StorageUpdateExtensionConfiguration.KEY).storageUpdate();
 
-        MinimumRequiredTimeCollectorService minTimeCollectorService = new 
MinimumRequiredTimeCollectorServiceImpl();
-
         tableManager = new TableManager(
                 name,
                 registry,
-                gcConfig,
+                gcConfiguration,
                 transactionConfiguration,
                 storageUpdateConfiguration,
                 clusterService.messagingService(),
@@ -732,6 +756,7 @@ public class Node {
                 clusterCfgMgr,
                 clockWaiter,
                 catalogManager,
+                catalogCompactionRunner,
                 indexMetaStorage,
                 distributionZoneManager,
                 replicaManager,
@@ -823,4 +848,8 @@ public class Node {
     public TxStatePartitionStorage txStatePartitionStorage(int zoneId, int 
partitionId) {
         return 
partitionReplicaLifecycleManager.txStatePartitionStorage(zoneId, partitionId);
     }
+
+    public DataStorageManager dataStorageManager() {
+        return dataStorageMgr;
+    }
 }
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
index 3ca208cd67d..a755622db34 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.partition.replicator.fixtures;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
@@ -46,6 +47,9 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
 
     private volatile ReplicaMeta primary;
 
+    // Pre-calculated assignments for each partition.
+    private volatile List<TokenizedAssignments> tokenizedAssignments;
+
     /**
      * Set the primary replica.
      *
@@ -79,6 +83,15 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
         setPrimary(node, HybridTimestamp.MIN_VALUE);
     }
 
+    /**
+     * Set pre-calculated assignments for each partition.
+     *
+     * @param tokenizedAssignments Pre-calculated assignments.
+     */
+    public void setAssignments(List<TokenizedAssignments> 
tokenizedAssignments) {
+        this.tokenizedAssignments = tokenizedAssignments;
+    }
+
     @Override
     public CompletableFuture<ReplicaMeta> 
awaitPrimaryReplica(ReplicationGroupId groupId, HybridTimestamp timestamp, long 
timeout,
             TimeUnit unit) {
@@ -105,7 +118,13 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
             List<? extends ReplicationGroupId> replicationGroupIds,
             HybridTimestamp clusterTimeToAwait
     ) {
-        return failedFuture(new 
UnsupportedOperationException("getAssignments() is not supported in 
FakePlacementDriver yet."));
+        List<TokenizedAssignments> assignments = tokenizedAssignments;
+
+        if (assignments == null) {
+            return failedFuture(new AssertionError("Pre-calculated assignments 
are not defined in test PlacementDriver yet."));
+        } else {
+            return completedFuture(assignments);
+        }
     }
 
     private CompletableFuture<ReplicaMeta> 
getPrimaryReplicaMeta(ReplicationGroupId replicationGroupId) {
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 262e9531e0d..339b993a0ce 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -603,11 +603,11 @@ public class PartitionReplicaLifecycleManager extends
             );
 
             var raftGroupListener = new ZonePartitionRaftListener(
+                    zonePartitionId,
                     txStatePartitionStorage,
                     txManager,
                     safeTimeTracker,
                     storageIndexTracker,
-                    zonePartitionId,
                     outgoingSnapshotsManager
             );
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index fdec32b6444..a0903359294 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -26,8 +26,13 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler;
+import 
org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.replicator.ReplicaResult;
@@ -35,6 +40,7 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
 import org.apache.ignite.internal.replicator.message.TableAware;
@@ -42,6 +48,7 @@ import org.apache.ignite.internal.schema.SchemaSyncService;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.VisibleForTesting;
 
 /**
@@ -53,13 +60,20 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
     // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 await for the 
table replica listener if needed.
     private final Map<TablePartitionId, ReplicaListener> replicas = new 
ConcurrentHashMap<>();
 
+    /** Raft client. */
     private final RaftCommandRunner raftClient;
 
+    private final ReplicationRaftCommandApplicator raftCommandApplicator;
+
+    // Replica request handlers.
     private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler;
+    private final MinimumActiveTxTimeReplicaRequestHandler 
minimumActiveTxTimeReplicaRequestHandler;
 
     /**
      * The constructor.
      *
+     * @param replicationGroupId Zone replication group identifier.
+     * @param clockService Clock service.
      * @param raftClient Raft client.
      */
     public ZonePartitionReplicaListener(
@@ -74,6 +88,9 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
     ) {
         this.raftClient = raftClient;
 
+        this.raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftClient, replicationGroupId);
+
+        // Request handlers initialization.
         txFinishReplicaRequestHandler = new TxFinishReplicaRequestHandler(
                 txStatePartitionStorage,
                 clockService,
@@ -82,44 +99,113 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
                 schemaSyncService,
                 catalogService,
                 raftClient,
-                replicationGroupId
-        );
+                replicationGroupId);
+
+        minimumActiveTxTimeReplicaRequestHandler = new 
MinimumActiveTxTimeReplicaRequestHandler(
+                clockService,
+                raftCommandApplicator);
     }
 
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
UUID senderId) {
-        if (!(request instanceof TableAware)) {
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 
implement ReplicaSafeTimeSyncRequest processing.
-            if (request instanceof TxFinishReplicaRequest) {
-                return 
txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request)
-                        .thenApply(res -> new ReplicaResult(res, null));
-            } else {
-                if (request instanceof ReplicaSafeTimeSyncRequest) {
-                    LOG.debug("Non table request is not supported by the zone 
partition yet " + request);
-                } else {
-                    LOG.warn("Non table request is not supported by the zone 
partition yet " + request);
-                }
-            }
+        return ensureReplicaIsPrimary(request)
+                .thenCompose(res -> processRequest(request, res.get1(), 
senderId, res.get2()))
+                .thenApply(res -> {
+                    if (res instanceof ReplicaResult) {
+                        return (ReplicaResult) res;
+                    } else {
+                        return new ReplicaResult(res, null);
+                    }
+                });
+    }
+
+    private CompletableFuture<?> processRequest(
+            ReplicaRequest request,
+            @Nullable Boolean isPrimary,
+            UUID senderId,
+            @Nullable Long leaseStartTime
+    ) {
+        if (request instanceof TableAware) {
+            // This type of request propagates to the table processor directly.
+            return processTableAwareRequest(request, senderId);
+        }
 
-            return completedFuture(new ReplicaResult(null, null));
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 implement 
ReplicaSafeTimeSyncRequest processing.
+        if (request instanceof TxFinishReplicaRequest) {
+            return 
txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request)
+                    .thenApply(res -> new ReplicaResult(res, null));
+        }
+
+        return processZoneReplicaRequest(request, isPrimary, senderId, 
leaseStartTime);
+    }
+
+    /**
+     * Ensure that the primary replica was not changed.
+     *
+     * @param request Replica request.
+     * @return Future with {@link IgniteBiTuple} containing {@code boolean} 
(whether the replica is primary) and the start time of current
+     *     lease. The boolean is not {@code null} only for {@link 
ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The
+     *     lease start time is not {@code null} in case of {@link 
PrimaryReplicaRequest}.
+     */
+    private CompletableFuture<IgniteBiTuple<Boolean, Long>> 
ensureReplicaIsPrimary(ReplicaRequest request) {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-24380
+        // Move PartitionReplicaListener#ensureReplicaIsPrimary to 
ZonePartitionReplicaListener.
+        return completedFuture(new IgniteBiTuple<>(null, null));
+    }
+
+    /**
+     * Processes {@link TableAware} request.
+     *
+     * @param request Request to be processed.
+     * @param senderId Node sender id.
+     * @return Future with the result of the request.
+     */
+    private CompletableFuture<ReplicaResult> 
processTableAwareRequest(ReplicaRequest request, UUID senderId) {
+        assert request instanceof TableAware : "Request should be TableAware 
[request=" + request.getClass().getSimpleName() + ']';
+
+        int partitionId;
+
+        ReplicationGroupId replicationGroupId = 
request.groupId().asReplicationGroupId();
+
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 Refine 
this code when the zone based replication will be done.
+        if (replicationGroupId instanceof  TablePartitionId) {
+            partitionId = ((TablePartitionId) 
replicationGroupId).partitionId();
+        } else if (replicationGroupId instanceof ZonePartitionId) {
+            partitionId = ((ZonePartitionId) replicationGroupId).partitionId();
+        } else {
+            throw new IllegalArgumentException("Requests with replication 
group type "
+                    + request.groupId().getClass() + " is not supported");
+        }
+
+        return replicas.get(new TablePartitionId(((TableAware) 
request).tableId(), partitionId))
+                .invoke(request, senderId);
+    }
+
+    /**
+     * Processes zone replica request.
+     *
+     * @param request Request to be processed.
+     * @param isPrimary {@code true} if the current node is the primary for 
the partition, {@code false} otherwise.
+     * @param senderId Node sender id.
+     * @param leaseStartTime Lease start time.
+     * @return Future with the result of the processing.
+     */
+    private CompletableFuture<?> processZoneReplicaRequest(
+            ReplicaRequest request,
+            @Nullable Boolean isPrimary,
+            UUID senderId,
+            @Nullable Long leaseStartTime
+    ) {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-24526
+        // Need to move the necessary part of 
PartitionReplicaListener#processRequest request processing here
+        if (request instanceof UpdateMinimumActiveTxBeginTimeReplicaRequest) {
+            return 
minimumActiveTxTimeReplicaRequestHandler.handle((UpdateMinimumActiveTxBeginTimeReplicaRequest)
 request);
+        } else if (request instanceof ReplicaSafeTimeSyncRequest) {
+            LOG.debug("Non table request is not supported by the zone 
partition yet " + request);
         } else {
-            int partitionId;
-
-            ReplicationGroupId replicationGroupId = 
request.groupId().asReplicationGroupId();
-
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 Refine 
this code when the zone based replication will done.
-            if (replicationGroupId instanceof  TablePartitionId) {
-                partitionId = ((TablePartitionId) 
replicationGroupId).partitionId();
-            } else if (replicationGroupId instanceof ZonePartitionId) {
-                partitionId = ((ZonePartitionId) 
replicationGroupId).partitionId();
-            } else {
-                throw new IllegalArgumentException("Requests with replication 
group type "
-                        + request.groupId().getClass() + " is not supported");
-            }
-
-            return replicas.get(new TablePartitionId(((TableAware) 
request).tableId(), partitionId))
-                    .invoke(request, senderId);
+            LOG.warn("Non table request is not supported by the zone partition 
yet " + request);
         }
+        return completedFuture(new ReplicaResult(null, null));
     }
 
     @Override
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/MinimumActiveTxTimeReplicaRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/MinimumActiveTxTimeReplicaRequestHandler.java
new file mode 100644
index 00000000000..8a3d35067f1
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/MinimumActiveTxTimeReplicaRequestHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.handlers;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.ClockService;
+import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
+import org.apache.ignite.internal.raft.Command;
+
+/**
+ * Handler for {@link UpdateMinimumActiveTxBeginTimeReplicaRequest}.
+ */
+public class MinimumActiveTxTimeReplicaRequestHandler {
+    /** Factory to create RAFT command messages. */
+    private static final PartitionReplicationMessagesFactory 
PARTITION_REPLICATION_MESSAGES_FACTORY =
+            new PartitionReplicationMessagesFactory();
+
+    /** Applicator that applies RAFT command that is created by this handler. 
*/
+    private final ReplicationRaftCommandApplicator commandApplicator;
+
+    /** Clock service. */
+    private final ClockService clockService;
+
+    /**
+     * Creates a new instance of MinimumActiveTxTimeReplicaRequestHandler.
+     *
+     * @param clockService Clock service.
+     * @param commandApplicator Applicator that applies RAFT command that is 
created by this handler.
+     */
+    public MinimumActiveTxTimeReplicaRequestHandler(
+            ClockService clockService,
+            ReplicationRaftCommandApplicator commandApplicator
+    ) {
+        this.clockService = clockService;
+        this.commandApplicator = commandApplicator;
+    }
+
+    /**
+     * Handles {@link UpdateMinimumActiveTxBeginTimeReplicaRequest}.
+     *
+     * @param request Request to handle.
+     * @return Future that will be completed when the request is handled.
+     */
+    public CompletableFuture<?> 
handle(UpdateMinimumActiveTxBeginTimeReplicaRequest request) {
+        Command cmd = 
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
+                .timestamp(request.timestamp())
+                .initiatorTime(clockService.now())
+                .build();
+
+        // The timestamp must increase monotonically, otherwise it will have 
to be
+        // stored on disk so that reordering does not occur after the node is 
restarted.
+        return commandApplicator.applyCmdWithExceptionHandling(cmd);
+    }
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
similarity index 98%
rename from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java
rename to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
index dcf5bf37c7e..264de7b9927 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.partition.replicator;
+package org.apache.ignite.internal.partition.replicator.handlers;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
@@ -39,6 +39,9 @@ import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
+import org.apache.ignite.internal.partition.replicator.ReplicaTxFinishMarker;
+import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
 import 
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/package-info.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/package-info.java
new file mode 100644
index 00000000000..2cc8b2c87f2
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains replica request handlers that is used by
+ * {@link 
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener} 
and table requests processor.
+ */
+
+package org.apache.ignite.internal.partition.replicator.handlers;
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index 780efd0a4e5..018b109b55b 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -29,6 +29,8 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.TableAwareCommand;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
+import 
org.apache.ignite.internal.partition.replicator.raft.handlers.FinishTxCommandHandler;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionSnapshots;
@@ -60,6 +62,7 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
 
     private final PendingComparableValuesTracker<Long, Void> 
storageIndexTracker;
 
+    /** Mapping table partition identifier to table request processor. */
     private final Map<TablePartitionId, RaftTableProcessor> tableProcessors = 
new ConcurrentHashMap<>();
 
     private final PartitionsSnapshots partitionsSnapshots;
@@ -75,31 +78,18 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
 
     private final Object commitedConfigurationLock = new Object();
 
-    private static class CommittedConfiguration {
-        final RaftGroupConfiguration configuration;
-
-        final long lastAppliedIndex;
-
-        final long lastAppliedTerm;
-
-        CommittedConfiguration(RaftGroupConfiguration configuration, long 
lastAppliedIndex, long lastAppliedTerm) {
-            this.configuration = configuration;
-            this.lastAppliedIndex = lastAppliedIndex;
-            this.lastAppliedTerm = lastAppliedTerm;
-        }
-    }
-
+    // Raft command handlers.
     private final FinishTxCommandHandler finishTxCommandHandler;
 
     private final OnSnapshotSaveHandler onSnapshotSaveHandler;
 
     /** Constructor. */
     public ZonePartitionRaftListener(
+            ZonePartitionId zonePartitionId,
             TxStatePartitionStorage txStatePartitionStorage,
             TxManager txManager,
             SafeTimeValuesTracker safeTimeTracker,
             PendingComparableValuesTracker<Long, Void> storageIndexTracker,
-            ZonePartitionId zonePartitionId,
             PartitionsSnapshots partitionsSnapshots
     ) {
         this.safeTimeTracker = safeTimeTracker;
@@ -107,6 +97,7 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
         this.partitionsSnapshots = partitionsSnapshots;
         this.partitionKey = new ZonePartitionKey(zonePartitionId.zoneId(), 
zonePartitionId.partitionId());
 
+        // RAFT command handlers initialization.
         finishTxCommandHandler = new FinishTxCommandHandler(
                 txStatePartitionStorage,
                 // TODO: IGNITE-24343 - use ZonePartitionId here.
@@ -179,6 +170,16 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
                 TablePartitionId tablePartitionId = ((TableAwareCommand) 
command).tablePartitionId().asTablePartitionId();
 
                 result = processTableAwareCommand(tablePartitionId, command, 
commandIndex, commandTerm, safeTimestamp);
+            } else if (command instanceof 
UpdateMinimumActiveTxBeginTimeCommand) {
+                result = new IgniteBiTuple<>(null, false);
+
+                tableProcessors.values().forEach(processor -> {
+                    IgniteBiTuple<Serializable, Boolean> r = 
processor.processCommand(command, commandIndex, commandTerm, safeTimestamp);
+                    // Need to adjust the safe time if any of the table 
processors successfully handled the command.
+                    if (Boolean.TRUE.equals(r.get2())) {
+                        result.set2(Boolean.TRUE);
+                    }
+                });
             } else {
                 LOG.info("Message type " + command.getClass() + " is not 
supported by the zone partition RAFT listener yet");
 
@@ -277,4 +278,18 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
     private PartitionSnapshots partitionSnapshots() {
         return partitionsSnapshots.partitionSnapshots(partitionKey);
     }
+
+    private static class CommittedConfiguration {
+        final RaftGroupConfiguration configuration;
+
+        final long lastAppliedIndex;
+
+        final long lastAppliedTerm;
+
+        CommittedConfiguration(RaftGroupConfiguration configuration, long 
lastAppliedIndex, long lastAppliedTerm) {
+            this.configuration = configuration;
+            this.lastAppliedIndex = lastAppliedIndex;
+            this.lastAppliedTerm = lastAppliedTerm;
+        }
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/FinishTxCommandHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/FinishTxCommandHandler.java
similarity index 95%
rename from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/FinishTxCommandHandler.java
rename to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/FinishTxCommandHandler.java
index 3c48fd36bd7..5014b01264e 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/FinishTxCommandHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/FinishTxCommandHandler.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.partition.replicator.raft;
+package org.apache.ignite.internal.partition.replicator.raft.handlers;
 
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
@@ -30,6 +30,8 @@ import 
org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
+import org.apache.ignite.internal.partition.replicator.raft.RaftTxFinishMarker;
+import 
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/package-info.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/package-info.java
new file mode 100644
index 00000000000..d0c81eaa4ff
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/handlers/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains RAFT command handlers that is used by
+ * {@link 
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener} 
and
+ * {@link 
org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor}.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.handlers;
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
index b91af3ed9a0..205eac57627 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListenerTest.java
@@ -68,11 +68,11 @@ class ZonePartitionRaftListenerTest extends 
BaseIgniteAbstractTest {
     @BeforeEach
     void setUp() {
         listener = new ZonePartitionRaftListener(
+                new ZonePartitionId(ZONE_ID, PARTITION_ID),
                 txStatePartitionStorage,
                 txManager,
                 new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE),
                 new PendingComparableValuesTracker<>(0L),
-                new ZonePartitionId(ZONE_ID, PARTITION_ID),
                 outgoingSnapshotsManager
         );
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index c7a99f60025..499fb2558e9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -51,10 +51,10 @@ import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAll
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
-import 
org.apache.ignite.internal.partition.replicator.raft.FinishTxCommandHandler;
 import 
org.apache.ignite.internal.partition.replicator.raft.OnSnapshotSaveHandler;
 import org.apache.ignite.internal.partition.replicator.raft.RaftTableProcessor;
 import org.apache.ignite.internal.partition.replicator.raft.RaftTxFinishMarker;
+import 
org.apache.ignite.internal.partition.replicator.raft.handlers.FinishTxCommandHandler;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
@@ -78,6 +78,7 @@ import 
org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexMeta;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
+import 
org.apache.ignite.internal.table.distributed.raft.handlers.MinimumActiveTxTimeCommandHandler;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.UpdateCommandResult;
@@ -125,13 +126,14 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
     // This variable is volatile, because it may be updated outside the Raft 
thread under the colocation feature.
     private volatile Set<String> currentGroupTopology;
 
-    private final MinimumRequiredTimeCollectorService minTimeCollectorService;
+    private final OnSnapshotSaveHandler onSnapshotSaveHandler;
 
+    // Raft command handlers.
     private final RaftTxFinishMarker txFinisher;
 
     private final FinishTxCommandHandler finishTxCommandHandler;
 
-    private final OnSnapshotSaveHandler onSnapshotSaveHandler;
+    private final MinimumActiveTxTimeCommandHandler 
minimumActiveTxTimeCommandHandler;
 
     /** Constructor. */
     public PartitionListener(
@@ -157,15 +159,21 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
         this.schemaRegistry = schemaRegistry;
         this.indexMetaStorage = indexMetaStorage;
         this.localNodeId = localNodeId;
-        this.minTimeCollectorService = minTimeCollectorService;
 
+        onSnapshotSaveHandler = new 
OnSnapshotSaveHandler(txStatePartitionStorage, storageIndexTracker);
+
+        // RAFT command handlers initialization.
+        TablePartitionId tablePartitionId = new 
TablePartitionId(storage.tableId(), storage.partitionId());
         txFinisher = new RaftTxFinishMarker(txManager);
         finishTxCommandHandler = new FinishTxCommandHandler(
                 txStatePartitionStorage,
-                new TablePartitionId(storage.tableId(), storage.partitionId()),
-                txManager
-        );
-        onSnapshotSaveHandler = new 
OnSnapshotSaveHandler(txStatePartitionStorage, storageIndexTracker);
+                tablePartitionId,
+                txManager);
+
+        minimumActiveTxTimeCommandHandler = new 
MinimumActiveTxTimeCommandHandler(
+                storage,
+                tablePartitionId,
+                minTimeCollectorService);
     }
 
     @Override
@@ -257,10 +265,9 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
         } else if (command instanceof VacuumTxStatesCommand) {
             result = handleVacuumTxStatesCommand((VacuumTxStatesCommand) 
command, commandIndex, commandTerm);
         } else if (command instanceof UpdateMinimumActiveTxBeginTimeCommand) {
-            result = 
handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand) 
command, commandIndex,
-                    commandTerm);
+            result = 
minimumActiveTxTimeCommandHandler.handle((UpdateMinimumActiveTxBeginTimeCommand)
 command, commandIndex);
         } else {
-            throw new AssertionError("Unknown command type: " + 
command.toStringForLightLogging());
+            throw new AssertionError("Unknown command type [command=" + 
command.toStringForLightLogging() + ']');
         }
 
         if (Boolean.TRUE.equals(result.get2())) {
@@ -645,28 +652,6 @@ public class PartitionListener implements 
RaftGroupListener, RaftTableProcessor
         return new IgniteBiTuple<>(null, true);
     }
 
-    private IgniteBiTuple<Serializable, Boolean> 
handleUpdateMinimalActiveTxTimeCommand(
-            UpdateMinimumActiveTxBeginTimeCommand cmd,
-            long commandIndex,
-            long commandTerm
-    ) {
-        // Skips the write command because the storage has already executed it.
-        if (commandIndex <= storage.lastAppliedIndex()) {
-            return new IgniteBiTuple<>(null, false);
-        }
-
-        long timestamp = cmd.timestamp();
-
-        storage.flush(false).whenComplete((r, t) -> {
-            if (t == null) {
-                TablePartitionId partitionId = new 
TablePartitionId(storage.tableId(), storage.partitionId());
-                
minTimeCollectorService.recordMinActiveTxTimestamp(partitionId, timestamp);
-            }
-        });
-
-        return new IgniteBiTuple<>(null, true);
-    }
-
     private static <T extends Comparable<T>> void 
updateTrackerIgnoringTrackerClosedException(
             PendingComparableValuesTracker<T, Void> tracker,
             T newValue
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
new file mode 100644
index 00000000000..14f0f5c3bcf
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/MinimumActiveTxTimeCommandHandler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.handlers;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
+
+/**
+ * RAFT command handler that process {@link 
UpdateMinimumActiveTxBeginTimeCommand} commands.
+ */
+public class MinimumActiveTxTimeCommandHandler {
+    /** Data storage to which the command will be applied. */
+    private final PartitionDataStorage storage;
+
+    /**
+     * Table partition identifier.
+     * {@link TablePartitionId} is used here instead of {@link 
org.apache.ignite.internal.replicator.ZonePartitionId}
+     * intentionally because we are not going to re-work catalog compaction 
internals {@link MinimumRequiredTimeCollectorService}.
+     **/
+    private final TablePartitionId tablePartitionId;
+
+    /** Service that collects minimum required timestamp for each partition. */
+    private final MinimumRequiredTimeCollectorService minTimeCollectorService;
+
+    /**
+     * Creates a new instance of the command handler.
+     *
+     * @param storage Partition data storage.
+     * @param tablePartitionId Table partition identifier.
+     * @param minTimeCollectorService Minimum required time collector service.
+     */
+    public MinimumActiveTxTimeCommandHandler(
+            PartitionDataStorage storage,
+            TablePartitionId tablePartitionId,
+            MinimumRequiredTimeCollectorService minTimeCollectorService
+    ) {
+        this.storage = storage;
+        this.tablePartitionId = tablePartitionId;
+        this.minTimeCollectorService = minTimeCollectorService;
+    }
+
+    /**
+     * Handles {@link UpdateMinimumActiveTxBeginTimeCommand} command.
+     *
+     * @param cmd Command to be processed.
+     * @param commandIndex Command index.
+     * @return Tuple with the result of the command processing and a flag 
indicating whether the command was applied.
+     */
+    public IgniteBiTuple<Serializable, Boolean> 
handle(UpdateMinimumActiveTxBeginTimeCommand cmd, long commandIndex) {
+        // Skips the write command because the storage has already executed it.
+        if (commandIndex <= storage.lastAppliedIndex()) {
+            return new IgniteBiTuple<>(null, false);
+        }
+
+        long timestamp = cmd.timestamp();
+
+        storage.flush(false)
+                .whenComplete((r, t) -> {
+                    if (t == null) {
+                        
minTimeCollectorService.recordMinActiveTxTimestamp(tablePartitionId, timestamp);
+                    }
+                });
+
+        return new IgniteBiTuple<>(null, true);
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
new file mode 100644
index 00000000000..73bb636fa4c
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/handlers/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains RAFT command handlers that is used by
+ * {@link org.apache.ignite.internal.table.distributed.raft.PartitionListener} 
aka table raft processor.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.handlers;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index f6267c46619..b024c0f0561 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -109,7 +109,8 @@ import 
org.apache.ignite.internal.network.ClusterNodeResolver;
 import org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
 import org.apache.ignite.internal.partition.replicator.ReplicaTxFinishMarker;
 import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
-import 
org.apache.ignite.internal.partition.replicator.TxFinishReplicaRequestHandler;
+import 
org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler;
+import 
org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import org.apache.ignite.internal.partition.replicator.network.TimedBinaryRow;
 import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
@@ -359,7 +360,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     private final ReplicationRaftCommandApplicator raftCommandApplicator;
     private final ReplicaTxFinishMarker replicaTxFinishMarker;
 
+    // Replica request handlers.
     private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler;
+    private final MinimumActiveTxTimeReplicaRequestHandler 
minimumActiveTxTimeReplicaRequestHandler;
 
     /**
      * The constructor.
@@ -454,6 +457,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 replicationGroupId
         );
 
+        minimumActiveTxTimeReplicaRequestHandler = new 
MinimumActiveTxTimeReplicaRequestHandler(
+                clockService,
+                raftCommandApplicator);
+
         prepareIndexBuilderTxRwOperationTracker();
     }
 
@@ -519,6 +526,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     private CompletableFuture<?> processRequest(ReplicaRequest request, 
@Nullable Boolean isPrimary, UUID senderId,
             @Nullable Long leaseStartTime) {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-24526
+        // Need to move the necessary part of request processing to 
ZonePartitionReplicaListener
+
         boolean hasSchemaVersion = request instanceof 
SchemaVersionAwareReplicaRequest;
 
         if (hasSchemaVersion) {
@@ -879,10 +889,13 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         } else if (request instanceof VacuumTxStateReplicaRequest) {
             return 
processVacuumTxStateReplicaRequest((VacuumTxStateReplicaRequest) request);
         } else if (request instanceof 
UpdateMinimumActiveTxBeginTimeReplicaRequest) {
-            return 
processMinimumActiveTxTimeReplicaRequest((UpdateMinimumActiveTxBeginTimeReplicaRequest)
 request);
-        } else {
-            throw new UnsupportedReplicaRequestException(request.getClass());
+            if (!enabledColocationFeature) {
+                return 
minimumActiveTxTimeReplicaRequestHandler.handle((UpdateMinimumActiveTxBeginTimeReplicaRequest)
 request);
+            }
         }
+
+        // Unknown request.
+        throw new UnsupportedReplicaRequestException(request.getClass());
     }
 
     /**
@@ -3968,17 +3981,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         return raftCommandRunner.run(cmd);
     }
 
-    private CompletableFuture<?> 
processMinimumActiveTxTimeReplicaRequest(UpdateMinimumActiveTxBeginTimeReplicaRequest
 request) {
-        Command cmd = 
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
-                .timestamp(request.timestamp())
-                .initiatorTime(clockService.now())
-                .build();
-
-        // The timestamp must increase monotonically, otherwise it will have 
to be
-        // stored on disk so that reordering does not occur after the node is 
restarted.
-        return applyCmdWithExceptionHandling(cmd);
-    }
-
     /**
      * Operation unique identifier.
      */


Reply via email to