This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 51fa042e8a IGNITE-22641 Catalog compaction. Introduce 
UpdateMinimumActiveTxBeginTimeCommand RAFT command (#4137)
51fa042e8a is described below

commit 51fa042e8a2f359d9b99bac35caa8aa9510ccae2
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Wed Aug 14 19:01:03 2024 +0300

    IGNITE-22641 Catalog compaction. Introduce 
UpdateMinimumActiveTxBeginTimeCommand RAFT command (#4137)
---
 modules/catalog-compaction/build.gradle            |  13 +
 .../compaction/ItCatalogCompactionTest.java        | 222 +++++++++++++
 .../compaction/CatalogCompactionRunner.java        | 345 +++++++++++++++------
 .../compaction/CatalogManagerCompactionFacade.java |  90 ++++++
 .../message/CatalogCompactionMessageGroup.java     |   8 +-
 ...a => CatalogCompactionMinimumTimesRequest.java} |   9 +-
 ... => CatalogCompactionMinimumTimesResponse.java} |  18 +-
 .../compaction/AbstractCatalogCompactionTest.java  |  64 ++++
 .../CatalogCompactionRunnerSelfTest.java           | 295 +++++++++++++-----
 .../CatalogManagerCompactionFacadeTest.java        | 113 +++++++
 .../internal/catalog/CatalogManagerImpl.java       |  10 +-
 .../catalog/CatalogManagerRecoveryTest.java        |   4 +-
 .../internal/catalog/CatalogManagerSelfTest.java   |  13 +-
 .../IndexNodeFinishedRwTransactionsChecker.java    |  17 +-
 .../network/PartitionReplicationMessageGroup.java  |  10 +
 .../UpdateMinimumActiveTxBeginTimeCommand.java}    |  16 +-
 ...ateMinimumActiveTxBeginTimeReplicaRequest.java} |  14 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  35 ++-
 .../table/distributed/raft/PartitionListener.java  |  40 +++
 .../replicator/PartitionReplicaListener.java       |  21 ++
 .../tx/ActiveLocalTxMinimumBeginTimeProvider.java} |  18 +-
 21 files changed, 1127 insertions(+), 248 deletions(-)

diff --git a/modules/catalog-compaction/build.gradle 
b/modules/catalog-compaction/build.gradle
index 5cca292065..b1dff7f364 100644
--- a/modules/catalog-compaction/build.gradle
+++ b/modules/catalog-compaction/build.gradle
@@ -23,6 +23,7 @@ apply from: 
"$rootDir/buildscripts/java-integration-test.gradle"
 
 dependencies {
     annotationProcessor project(':ignite-network-annotation-processor')
+    annotationProcessor libs.auto.service
 
     implementation project(':ignite-affinity')
     implementation project(':ignite-catalog')
@@ -30,9 +31,14 @@ dependencies {
     implementation project(':ignite-core')
     implementation project(':ignite-network-api')
     implementation project(':ignite-placement-driver-api')
+    implementation project(':ignite-replicator')
+    implementation project(':ignite-partition-replicator')
     implementation project(':ignite-system-view-api')
+    implementation project(':ignite-table')
+    implementation project(':ignite-transactions')
 
     implementation libs.jetbrains.annotations
+    implementation libs.fastutil.core
 
     testImplementation(testFixtures(project(':ignite-core')))
     testImplementation(testFixtures(project(':ignite-metastorage')))
@@ -41,7 +47,14 @@ dependencies {
     testImplementation libs.mockito.core
     testImplementation libs.hamcrest.core
 
+    integrationTestImplementation libs.fastutil.core
     integrationTestImplementation project(':ignite-api')
+    integrationTestImplementation project(':ignite-catalog')
+    integrationTestImplementation project(':ignite-transactions')
+    integrationTestImplementation project(':ignite-raft')
+    integrationTestImplementation project(':ignite-raft-api')
+    integrationTestImplementation project(':ignite-system-view-api')
+    integrationTestImplementation project(':ignite-table')
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation testFixtures(project(':ignite-runner'))
 }
diff --git 
a/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
 
b/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
new file mode 100644
index 0000000000..29564fe70f
--- /dev/null
+++ 
b/modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import 
org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner.TimeHolder;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import 
org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingStateMachine;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests to verify catalog compaction.
+ */
+class ItCatalogCompactionTest extends ClusterPerClassIntegrationTest {
+    private static final int CLUSTER_SIZE = 3;
+
+    @Override
+    protected int initialNodes() {
+        return CLUSTER_SIZE;
+    }
+
+    @Test
+    void testRaftGroupsUpdate() throws InterruptedException {
+        IgniteImpl ignite = CLUSTER.aliveNode();
+        CatalogManagerImpl catalogManager = ((CatalogManagerImpl) 
CLUSTER.aliveNode().catalogManager());
+        int partsCount = 16;
+
+        sql(format("create zone if not exists test with partitions={}, 
replicas={}, storage_profiles='default'",
+                partsCount, initialNodes()));
+        sql("alter zone test set default");
+
+        Map<Integer, Integer> expectedTablesWithPartitions = new HashMap<>();
+
+        // Latest active catalog contains all required tables.
+        {
+            sql("create table a(a int primary key)");
+
+            Catalog minRequiredCatalog = 
catalogManager.catalog(catalogManager.latestCatalogVersion());
+            assertNotNull(minRequiredCatalog);
+
+            sql("create table b(a int primary key)");
+
+            Catalog lastCatalog = catalogManager.catalog(
+                    
catalogManager.activeCatalogVersion(ignite.clock().nowLong()));
+            assertNotNull(lastCatalog);
+
+            Collection<CatalogTableDescriptor> tables = lastCatalog.tables();
+            assertThat(tables, hasSize(2));
+
+            tables.forEach(t -> expectedTablesWithPartitions.put(t.id(), 
partsCount));
+
+            HybridTimestamp expectedTime = 
HybridTimestamp.hybridTimestamp(minRequiredCatalog.time());
+
+            CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+                    .propagateTimeToReplicas(expectedTime.longValue(), 
ignite.clusterNodes());
+
+            assertThat(fut, willCompleteSuccessfully());
+
+            ensureTimestampStoredInAllReplicas(expectedTime, 
expectedTablesWithPartitions);
+        }
+
+        // Latest active catalog does not contain all required tables.
+        // Replicas of dropped tables must also be updated.
+        long requiredTime = CLUSTER.aliveNode().clockService().nowLong();
+
+        {
+            sql("drop table a");
+            sql("drop table b");
+
+            HybridTimestamp expectedTime = 
HybridTimestamp.hybridTimestamp(requiredTime);
+
+            CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+                    .propagateTimeToReplicas(expectedTime.longValue(), 
ignite.clusterNodes());
+
+            assertThat(fut, willCompleteSuccessfully());
+
+            ensureTimestampStoredInAllReplicas(expectedTime, 
expectedTablesWithPartitions);
+        }
+    }
+
+    @Test
+    void testGlobalMinimumTxBeginTime() {
+        IgniteImpl node0 = CLUSTER.node(0);
+        IgniteImpl node1 = CLUSTER.node(1);
+        IgniteImpl node2 = CLUSTER.node(2);
+
+        List<CatalogCompactionRunner> compactors = List.of(
+                node0.catalogCompactionRunner(),
+                node1.catalogCompactionRunner(),
+                node2.catalogCompactionRunner()
+        );
+
+        Collection<ClusterNode> topologyNodes = node0.clusterNodes();
+
+        InternalTransaction tx1 = (InternalTransaction) 
node0.transactions().begin();
+        InternalTransaction tx2 = (InternalTransaction) 
node1.transactions().begin();
+        InternalTransaction readonlyTx = (InternalTransaction) 
node1.transactions().begin(new TransactionOptions().readOnly(true));
+        InternalTransaction tx3 = (InternalTransaction) 
node2.transactions().begin();
+
+        compactors.forEach(compactor -> {
+            TimeHolder timeHolder = 
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+            assertThat(timeHolder.minActiveTxBeginTime, 
is(tx1.startTimestamp().longValue()));
+        });
+
+        tx1.rollback();
+
+        compactors.forEach(compactor -> {
+            TimeHolder timeHolder = 
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+            assertThat(timeHolder.minActiveTxBeginTime, 
is(tx2.startTimestamp().longValue()));
+        });
+
+        tx2.commit();
+
+        compactors.forEach(compactor -> {
+            TimeHolder timeHolder = 
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+            assertThat(timeHolder.minActiveTxBeginTime, 
is(tx3.startTimestamp().longValue()));
+        });
+
+        tx3.rollback();
+
+        // Since there are no active RW transactions in the cluster, the 
minimum time will be min(now()) across all nodes.
+        compactors.forEach(compactor -> {
+            long minTime = Stream.of(node0, node1, node2).map(node -> 
node.clockService().nowLong()).min(Long::compareTo).orElseThrow();
+
+            TimeHolder timeHolder = 
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+
+            long maxTime = Stream.of(node0, node1, node2).map(node -> 
node.clockService().nowLong()).min(Long::compareTo).orElseThrow();
+
+            // Read-only transactions are not counted,
+            assertThat(timeHolder.minActiveTxBeginTime, 
greaterThan(readonlyTx.startTimestamp().longValue()));
+
+            assertThat(timeHolder.minActiveTxBeginTime, 
greaterThanOrEqualTo(minTime));
+            assertThat(timeHolder.minActiveTxBeginTime, 
lessThanOrEqualTo(maxTime));
+        });
+
+        readonlyTx.rollback();
+    }
+
+    private static void ensureTimestampStoredInAllReplicas(
+            HybridTimestamp expectedTimestamp,
+            Map<Integer, Integer> expectedTablesWithPartitions
+    ) throws InterruptedException {
+        Loza loza = CLUSTER.aliveNode().raftManager();
+        JraftServerImpl server = (JraftServerImpl) loza.server();
+
+        for (Map.Entry<Integer, Integer> tableWithPartition : 
expectedTablesWithPartitions.entrySet()) {
+            int tableId = tableWithPartition.getKey();
+            int partitionsCount = tableWithPartition.getValue();
+
+            for (int p = 0; p < partitionsCount; p++) {
+                TablePartitionId groupId = new TablePartitionId(tableId, p);
+                List<Peer> peers = server.localPeers(groupId);
+
+                assertThat(peers, is(not(empty())));
+
+                Peer serverPeer = server.localPeers(groupId).get(0);
+                RaftGroupService grp = server.raftGroupService(new 
RaftNodeId(groupId, serverPeer));
+                DelegatingStateMachine fsm = (DelegatingStateMachine) 
grp.getRaftNode().getOptions().getFsm();
+                PartitionListener listener = (PartitionListener) 
fsm.getListener();
+
+                // When a future completes from `Invoke`, it is guaranteed 
that the leader will be updated,
+                // the remaining replicas can be updated later.
+                IgniteTestUtils.waitForCondition(
+                        () -> 
Long.valueOf(expectedTimestamp.longValue()).equals(listener.minimumActiveTxBeginTime()),
+                        5_000
+                );
+
+                assertThat(grp.getGroupId(), 
listener.minimumActiveTxBeginTime(), equalTo(expectedTimestamp.longValue()));
+            }
+        }
+    }
+}
diff --git 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
index 0b1337ddd8..2c4be73c28 100644
--- 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
+++ 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
@@ -24,19 +24,20 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.affinity.TokenizedAssignments;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogManagerImpl;
 import 
org.apache.ignite.internal.catalog.compaction.message.CatalogCompactionMessageGroup;
 import 
org.apache.ignite.internal.catalog.compaction.message.CatalogCompactionMessagesFactory;
-import 
org.apache.ignite.internal.catalog.compaction.message.CatalogMinimumRequiredTimeRequest;
-import 
org.apache.ignite.internal.catalog.compaction.message.CatalogMinimumRequiredTimeResponse;
+import 
org.apache.ignite.internal.catalog.compaction.message.CatalogCompactionMinimumTimesRequest;
+import 
org.apache.ignite.internal.catalog.compaction.message.CatalogCompactionMinimumTimesResponse;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
@@ -49,10 +50,16 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.network.MessagingService;
-import org.apache.ignite.internal.network.NetworkMessage;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageUtils;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.ActiveLocalTxMinimumBeginTimeProvider;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.network.ClusterNode;
@@ -70,7 +77,7 @@ import org.jetbrains.annotations.TestOnly;
  *     <li>Routine is triggered after receiving local notification that the 
low watermark
  *     has been updated on catalog compaction coordinator (metastorage group 
leader).</li>
  *     <li>Coordinator calculates the minimum required time in the cluster
- *     by sending {@link CatalogMinimumRequiredTimeRequest} to all cluster 
members.</li>
+ *     by sending {@link CatalogCompactionMinimumTimesRequest} to all cluster 
members.</li>
  *     <li>If it is considered safe to trim the history up to calculated 
catalog version
  *     (at least, all partition owners are present in the logical topology), 
then the catalog is compacted.</li>
  * </ol>
@@ -78,11 +85,15 @@ import org.jetbrains.annotations.TestOnly;
 public class CatalogCompactionRunner implements IgniteComponent {
     private static final IgniteLogger LOG = 
Loggers.forClass(CatalogCompactionRunner.class);
 
-    private static final CatalogCompactionMessagesFactory MESSAGES_FACTORY = 
new CatalogCompactionMessagesFactory();
+    private static final CatalogCompactionMessagesFactory 
COMPACTION_MESSAGES_FACTORY = new CatalogCompactionMessagesFactory();
+
+    private static final PartitionReplicationMessagesFactory 
REPLICATION_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory();
+
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
 
     private static final long ANSWER_TIMEOUT = 5_000;
 
-    private final CatalogManagerImpl catalogManager;
+    private final CatalogManagerCompactionFacade catalogManagerFacade;
 
     private final MessagingService messagingService;
 
@@ -100,6 +111,14 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
 
     private final String localNodeName;
 
+    private final ActiveLocalTxMinimumBeginTimeProvider 
activeLocalTxMinimumBeginTimeProvider;
+
+    private final ReplicaService replicaService;
+
+    private final SchemaSyncService schemaSyncService;
+
+    private CompletableFuture<Void> lastRunFuture = 
CompletableFutures.nullCompletedFuture();
+
     /**
      * Node that is considered to be a coordinator of compaction process.
      *
@@ -107,8 +126,6 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
      */
     private volatile @Nullable String compactionCoordinatorNodeName;
 
-    private volatile CompletableFuture<Boolean> lastRunFuture = 
CompletableFutures.nullCompletedFuture();
-
     private volatile HybridTimestamp lowWatermark;
 
     /**
@@ -120,10 +137,14 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
             MessagingService messagingService,
             LogicalTopologyService logicalTopologyService,
             PlacementDriver placementDriver,
+            ReplicaService replicaService,
             ClockService clockService,
-            Executor executor
+            SchemaSyncService schemaSyncService,
+            Executor executor,
+            ActiveLocalTxMinimumBeginTimeProvider 
activeLocalTxMinimumBeginTimeProvider
     ) {
-        this(localNodeName, catalogManager, messagingService, 
logicalTopologyService, placementDriver, clockService, executor, null);
+        this(localNodeName, catalogManager, messagingService, 
logicalTopologyService, placementDriver, replicaService, clockService,
+                schemaSyncService, executor, 
activeLocalTxMinimumBeginTimeProvider, null);
     }
 
     /**
@@ -135,17 +156,23 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
             MessagingService messagingService,
             LogicalTopologyService logicalTopologyService,
             PlacementDriver placementDriver,
+            ReplicaService replicaService,
             ClockService clockService,
+            SchemaSyncService schemaSyncService,
             Executor executor,
+            ActiveLocalTxMinimumBeginTimeProvider 
activeLocalTxMinimumBeginTimeProvider,
             @Nullable CatalogCompactionRunner.MinimumRequiredTimeProvider 
localMinTimeProvider
     ) {
-        this.messagingService = messagingService;
         this.localNodeName = localNodeName;
+        this.messagingService = messagingService;
         this.logicalTopologyService = logicalTopologyService;
-        this.catalogManager = catalogManager;
+        this.catalogManagerFacade = new 
CatalogManagerCompactionFacade(catalogManager);
         this.clockService = clockService;
+        this.schemaSyncService = schemaSyncService;
         this.placementDriver = placementDriver;
+        this.replicaService = replicaService;
         this.executor = executor;
+        this.activeLocalTxMinimumBeginTimeProvider = 
activeLocalTxMinimumBeginTimeProvider;
         this.localMinTimeProvider = localMinTimeProvider == null ? 
this::determineLocalMinimumRequiredTime : localMinTimeProvider;
     }
 
@@ -153,14 +180,21 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
         
messagingService.addMessageHandler(CatalogCompactionMessageGroup.class, 
(message, sender, correlationId) -> {
             assert message.groupType() == 
CatalogCompactionMessageGroup.GROUP_TYPE : message.groupType();
-            assert message.messageType() == 
CatalogCompactionMessageGroup.MINIMUM_REQUIRED_TIME_REQUEST : 
message.messageType();
-            assert correlationId != null;
 
-            CatalogMinimumRequiredTimeResponse response = 
MESSAGES_FACTORY.catalogMinimumRequiredTimeResponse()
-                    .timestamp(localMinTimeProvider.time())
-                    .build();
+            if (message.messageType() == 
CatalogCompactionMessageGroup.MINIMUM_TIMES_REQUEST) {
+                assert correlationId != null;
+
+                CatalogCompactionMinimumTimesResponse response = 
COMPACTION_MESSAGES_FACTORY.catalogCompactionMinimumTimesResponse()
+                        .minimumRequiredTime(localMinTimeProvider.time())
+                        
.minimumActiveTxTime(activeLocalTxMinimumBeginTimeProvider.minimumBeginTime().longValue())
+                        .build();
 
-            messagingService.respond(sender, response, correlationId);
+                messagingService.respond(sender, response, correlationId);
+
+                return;
+            }
+
+            throw new UnsupportedOperationException("Not supported message 
type: " + message.messageType());
         });
 
         return CompletableFutures.nullCompletedFuture();
@@ -196,115 +230,118 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
     }
 
     @TestOnly
-    CompletableFuture<Boolean> lastRunFuture() {
+    synchronized CompletableFuture<Void> lastRunFuture() {
         return lastRunFuture;
     }
 
     /** Starts the catalog compaction routine. */
-    CompletableFuture<Boolean> triggerCompaction(@Nullable HybridTimestamp 
lwm) {
+    void triggerCompaction(@Nullable HybridTimestamp lwm) {
         if (lwm == null || 
!localNodeName.equals(compactionCoordinatorNodeName)) {
-            return CompletableFutures.falseCompletedFuture();
+            return;
         }
 
-        return inBusyLock(busyLock, () -> {
-            CompletableFuture<Boolean> fut = lastRunFuture;
+        inBusyLock(busyLock, () -> {
+            synchronized (this) {
+                CompletableFuture<Void> fut = lastRunFuture;
 
-            if (!fut.isDone()) {
-                LOG.info("Catalog compaction is already in progress, skipping 
(timestamp={})", lwm.longValue());
+                if (!fut.isDone()) {
+                    LOG.info("Catalog compaction is already in progress, 
skipping (timestamp={})", lwm.longValue());
 
-                return CompletableFutures.falseCompletedFuture();
-            }
+                    return;
+                }
 
-            fut = 
startCompaction(logicalTopologyService.localLogicalTopology())
-                    .whenComplete((res, ex) -> {
-                        if (ex != null) {
-                            LOG.warn("Catalog compaction has failed 
(timestamp={})", ex, lwm.longValue());
-                        } else if (LOG.isDebugEnabled()) {
-                            if (res) {
-                                LOG.debug("Catalog compaction completed 
successfully (timestamp={})", lwm.longValue());
-                            } else {
-                                LOG.debug("Catalog compaction skipped 
(timestamp={})", lwm.longValue());
-                            }
-                        }
-                    });
-
-            lastRunFuture = fut;
-
-            return fut;
+                lastRunFuture = 
startCompaction(logicalTopologyService.localLogicalTopology());
+            }
         });
     }
 
-    private CompletableFuture<Boolean> startCompaction(LogicalTopologySnapshot 
topologySnapshot) {
+    private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot 
topologySnapshot) {
         long localMinimum = localMinTimeProvider.time();
 
-        if (catalogByTsNullable(localMinimum) == null) {
-            return CompletableFutures.falseCompletedFuture();
+        if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) {
+            LOG.info("Catalog compaction skipped, nothing to compact (ts={})", 
localMinimum);
+
+            return CompletableFutures.nullCompletedFuture();
         }
 
         return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(), 
localMinimum)
-                .thenComposeAsync(ts -> {
-                    Catalog catalog = catalogByTsNullable(ts);
+                .thenComposeAsync(timeHolder -> {
+                    long minRequiredTime = timeHolder.minRequiredTime;
+                    long minActiveTxBeginTime = 
timeHolder.minActiveTxBeginTime;
+                    Catalog catalog = 
catalogManagerFacade.catalogByTsNullable(minRequiredTime);
 
-                    if (catalog == null) {
-                        return CompletableFutures.falseCompletedFuture();
-                    }
+                    CompletableFuture<Boolean> catalogCompactionFut;
 
-                    return requiredNodes(catalog)
-                            .thenCompose(requiredNodes -> {
-                                List<String> missingNodes = 
missingNodes(requiredNodes, topologySnapshot.nodes());
-
-                                if (!missingNodes.isEmpty()) {
-                                    if (LOG.isDebugEnabled()) {
-                                        LOG.debug("Catalog compaction aborted 
due to missing cluster members (nodes={})", missingNodes);
-                                    }
+                    if (catalog == null) {
+                        LOG.info("Catalog compaction skipped, nothing to 
compact (ts={})", minRequiredTime);
 
-                                    return 
CompletableFutures.falseCompletedFuture();
+                        catalogCompactionFut = 
CompletableFutures.falseCompletedFuture();
+                    } else {
+                        catalogCompactionFut = tryCompactCatalog(catalog, 
topologySnapshot).whenComplete((res, ex) -> {
+                            if (ex != null) {
+                                LOG.warn("Catalog compaction has failed 
(timestamp={})", ex, minRequiredTime);
+                            } else {
+                                if (res) {
+                                    LOG.info("Catalog compaction completed 
successfully (timestamp={})", minRequiredTime);
+                                } else {
+                                    LOG.info("Catalog compaction skipped 
(timestamp={})", minRequiredTime);
                                 }
+                            }
+                        });
+                    }
 
-                                return 
catalogManager.compactCatalog(catalog.time());
-                            });
+                    CompletableFuture<Void> propagateToReplicasFut =
+                            propagateTimeToReplicas(minActiveTxBeginTime, 
topologySnapshot.nodes())
+                                    .whenComplete((res, ex) -> {
+                                        if (ex != null) {
+                                            LOG.warn("Failed to propagate 
minimum active tx begin time to replicas", ex);
+                                        }
+                                    });
+
+                    return CompletableFuture.allOf(
+                            catalogCompactionFut,
+                            propagateToReplicasFut
+                    );
                 }, executor);
     }
 
-    private @Nullable Catalog catalogByTsNullable(long ts) {
-        try {
-            int catalogVer = catalogManager.activeCatalogVersion(ts);
-            return catalogManager.catalog(catalogVer - 1);
-        } catch (IllegalStateException e) {
-            return null;
-        }
-    }
-
-    private CompletableFuture<Long> 
determineGlobalMinimumRequiredTime(Set<LogicalNode> logicalTopologyNodes, long 
localMinimumTs) {
-        CatalogMinimumRequiredTimeRequest request = 
MESSAGES_FACTORY.catalogMinimumRequiredTimeRequest().build();
-        List<CompletableFuture<?>> ackFutures = new 
ArrayList<>(logicalTopologyNodes.size());
-        AtomicLong minimumRequiredTimeHolder = new AtomicLong(Long.MAX_VALUE);
+    CompletableFuture<TimeHolder> determineGlobalMinimumRequiredTime(
+            Collection<? extends ClusterNode> nodes,
+            long localMinimumRequiredTime
+    ) {
+        CatalogCompactionMinimumTimesRequest request = 
COMPACTION_MESSAGES_FACTORY.catalogCompactionMinimumTimesRequest().build();
+        List<CompletableFuture<CatalogCompactionMinimumTimesResponse>> 
responseFutures = new ArrayList<>(nodes.size() - 1);
 
-        for (LogicalNode node : logicalTopologyNodes) {
+        for (ClusterNode node : nodes) {
             if (localNodeName.equals(node.name())) {
                 continue;
             }
 
-            CompletableFuture<NetworkMessage> fut = 
messagingService.invoke(node, request, ANSWER_TIMEOUT)
-                    .whenComplete((msg, ex) -> {
-                        if (ex != null) {
-                            return;
-                        }
+            CompletableFuture<CatalogCompactionMinimumTimesResponse> fut = 
messagingService.invoke(node, request, ANSWER_TIMEOUT)
+                    
.thenApply(CatalogCompactionMinimumTimesResponse.class::cast);
 
-                        long time = ((CatalogMinimumRequiredTimeResponse) 
msg).timestamp();
-                        long prevTime;
+            responseFutures.add(fut);
+        }
 
-                        // Update minimum timestamp.
-                        do {
-                            prevTime = minimumRequiredTimeHolder.get();
-                        } while (time < prevTime && 
!minimumRequiredTimeHolder.compareAndSet(prevTime, time));
-                    });
+        return CompletableFuture.allOf(responseFutures.toArray(new 
CompletableFuture[0]))
+                .thenApply(ignore -> {
+                    long globalMinimumRequiredTime = localMinimumRequiredTime;
+                    long globalMinimumActiveTxTime = 
activeLocalTxMinimumBeginTimeProvider.minimumBeginTime().longValue();
 
-            ackFutures.add(fut);
-        }
+                    for 
(CompletableFuture<CatalogCompactionMinimumTimesResponse> fut : 
responseFutures) {
+                        CatalogCompactionMinimumTimesResponse response = 
fut.join();
+
+                        if (response.minimumRequiredTime() < 
globalMinimumRequiredTime) {
+                            globalMinimumRequiredTime = 
response.minimumRequiredTime();
+                        }
 
-        return CompletableFutures.allOf(ackFutures)
-                .thenApply(ignore -> Math.min(minimumRequiredTimeHolder.get(), 
localMinimumTs));
+                        if (response.minimumActiveTxTime() < 
globalMinimumActiveTxTime) {
+                            globalMinimumActiveTxTime = 
response.minimumActiveTxTime();
+                        }
+                    }
+
+                    return new TimeHolder(globalMinimumRequiredTime, 
globalMinimumActiveTxTime);
+                });
     }
 
     private long determineLocalMinimumRequiredTime() {
@@ -334,7 +371,7 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
 
         int partitions = zone.partitions();
 
-        List<ReplicationGroupId> replicationGroupIds = new 
ArrayList<>(partitions);
+        List<TablePartitionId> replicationGroupIds = new 
ArrayList<>(partitions);
 
         for (int p = 0; p < partitions; p++) {
             replicationGroupIds.add(new TablePartitionId(table.id(), p));
@@ -347,8 +384,7 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
                     for (int p = 0; p < partitions; p++) {
                         TokenizedAssignments assignment = 
tokenizedAssignments.get(p);
                         if (assignment == null) {
-                            throw new IllegalStateException("Cannot get 
assignments for table " + table.name()
-                                    + " (replication group=" + 
replicationGroupIds.get(p) + ").");
+                            
throwAssignmentsNotReadyException(replicationGroupIds.get(p));
                         }
 
                         assignment.nodes().forEach(a -> 
required.add(a.consistentId()));
@@ -365,10 +401,123 @@ public class CatalogCompactionRunner implements 
IgniteComponent {
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
-    /** Minimum required time supplier. */
+    CompletableFuture<Void> propagateTimeToReplicas(long timestamp, 
Collection<? extends ClusterNode> topologyNodes) {
+        HybridTimestamp nowTs = clockService.now();
+
+        return schemaSyncService.waitForMetadataCompleteness(nowTs)
+                .thenComposeAsync(ignore -> {
+                    Map<Integer, Integer> tablesWithPartitions =
+                            
catalogManagerFacade.collectTablesWithPartitionsBetween(timestamp, 
nowTs.longValue());
+
+                    Set<String> topologyNodeNames = topologyNodes.stream()
+                            .map(ClusterNode::name)
+                            .collect(Collectors.toSet());
+
+                    // TODO https://issues.apache.org/jira/browse/IGNITE-22951 
Minimize the number of network requests
+                    return 
CompletableFutures.allOf(tablesWithPartitions.entrySet().stream()
+                            .map(e -> invokeOnReplicas(e.getKey(), 
e.getValue(), timestamp, nowTs, topologyNodeNames))
+                            .collect(Collectors.toList())
+                    );
+                }, executor);
+    }
+
+    private CompletableFuture<Void> invokeOnReplicas(
+            int tableId,
+            int partitions,
+            long txBeginTime,
+            HybridTimestamp nowTs,
+            Set<String> logicalTopologyNodes
+    ) {
+        List<TablePartitionId> replicationGroupIds = new 
ArrayList<>(partitions);
+
+        for (int p = 0; p < partitions; p++) {
+            replicationGroupIds.add(new TablePartitionId(tableId, p));
+        }
+
+        return placementDriver.getAssignments(replicationGroupIds, nowTs)
+                .thenComposeAsync(tokenizedAssignments -> {
+                    assert tokenizedAssignments.size() == 
replicationGroupIds.size();
+
+                    List<CompletableFuture<?>> replicaInvokeFutures = new 
ArrayList<>(partitions);
+
+                    for (int p = 0; p < partitions; p++) {
+                        TablePartitionId replicationGroupId = 
replicationGroupIds.get(p);
+                        TokenizedAssignments tokenizedAssignment = 
tokenizedAssignments.get(p);
+
+                        if (tokenizedAssignment == null) {
+                            
throwAssignmentsNotReadyException(replicationGroupId);
+                        }
+
+                        Set<String> assignments = 
tokenizedAssignment.nodes().stream()
+                                
.map(Assignment::consistentId).collect(Collectors.toSet());
+
+                        String targetNodeName;
+
+                        if (assignments.contains(localNodeName)) {
+                            targetNodeName = localNodeName;
+                        } else {
+                            targetNodeName = assignments.stream()
+                                    .filter(logicalTopologyNodes::contains)
+                                    .findAny()
+                                    .orElseThrow(() -> new 
IllegalStateException("Current topology doesn't include assignment nodes "
+                                            + "(assignments=" + 
tokenizedAssignment.nodes()
+                                            + ", topology=" + 
logicalTopologyNodes
+                                            + ", replication group=" + 
replicationGroupId + ").")
+                                    );
+                        }
+
+                        TablePartitionIdMessage partIdMessage = 
ReplicaMessageUtils.toTablePartitionIdMessage(
+                                REPLICA_MESSAGES_FACTORY,
+                                replicationGroupId
+                        );
+
+                        UpdateMinimumActiveTxBeginTimeReplicaRequest msg = 
REPLICATION_MESSAGES_FACTORY
+                                .updateMinimumActiveTxBeginTimeReplicaRequest()
+                                .groupId(partIdMessage)
+                                .timestamp(txBeginTime)
+                                .build();
+
+                        
replicaInvokeFutures.add(replicaService.invoke(targetNodeName, msg));
+                    }
+
+                    return CompletableFutures.allOf(replicaInvokeFutures);
+                }, executor);
+    }
+
+    private CompletableFuture<Boolean> tryCompactCatalog(Catalog catalog, 
LogicalTopologySnapshot topologySnapshot) {
+        return requiredNodes(catalog)
+                .thenCompose(requiredNodes -> {
+                    List<String> missingNodes = missingNodes(requiredNodes, 
topologySnapshot.nodes());
+
+                    if (!missingNodes.isEmpty()) {
+                        LOG.info("Catalog compaction aborted due to missing 
cluster members (nodes={})", missingNodes);
+
+                        return CompletableFutures.falseCompletedFuture();
+                    }
+
+                    return 
catalogManagerFacade.compactCatalog(catalog.version());
+                });
+    }
+
+    private static void throwAssignmentsNotReadyException(TablePartitionId 
replicationGroupId) {
+        throw new IllegalStateException("Cannot get assignments for table "
+                + "(replication group=" + replicationGroupId + ").");
+    }
+
+    /** Minimum required time provider. */
     @FunctionalInterface
     interface MinimumRequiredTimeProvider {
         /** Returns minimum required timestamp. */
         long time();
     }
+
+    static class TimeHolder {
+        final long minRequiredTime;
+        final long minActiveTxBeginTime;
+
+        private TimeHolder(long minRequiredTime, long minActiveTxBeginTime) {
+            this.minRequiredTime = minRequiredTime;
+            this.minActiveTxBeginTime = minActiveTxBeginTime;
+        }
+    }
 }
diff --git 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
new file mode 100644
index 0000000000..613004d433
--- /dev/null
+++ 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacade.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class contains utility methods for interacting with the catalog manager.
+ * These methods are only needed for catalog compaction routine.
+ */
+class CatalogManagerCompactionFacade {
+    private final CatalogManagerImpl catalogManager;
+
+    CatalogManagerCompactionFacade(CatalogManagerImpl catalogManager) {
+        this.catalogManager = catalogManager;
+    }
+
+    /**
+     * Scans catalog versions in a given time interval (including interval 
boundaries).
+     * Extracts all tables contained in these catalog versions and creates a 
mapping
+     * tableId -> number of partitions in this table.
+     *
+     * @param minTsInclusive Lower timestamp (inclusive).
+     * @param maxTsInclusive Upper timestamp (inclusive).
+     * @return Mapping tableId to number of partitions in this table.
+     */
+    Int2IntMap collectTablesWithPartitionsBetween(long minTsInclusive, long 
maxTsInclusive) {
+        Int2IntMap tablesWithPartitions = new Int2IntOpenHashMap();
+        int curVer = catalogManager.activeCatalogVersion(minTsInclusive);
+        int lastVer = catalogManager.activeCatalogVersion(maxTsInclusive);
+
+        do {
+            Catalog catalog = catalogManager.catalog(curVer);
+
+            assert catalog != null : "ver=" + curVer + ", last=" + lastVer;
+
+            for (CatalogTableDescriptor table : catalog.tables()) {
+                CatalogZoneDescriptor zone = catalog.zone(table.zoneId());
+
+                assert zone != null : table.zoneId();
+
+                tablesWithPartitions.put(table.id(), zone.partitions());
+            }
+        } while (++curVer <= lastVer);
+
+        return tablesWithPartitions;
+    }
+
+    /**
+     * Returns the catalog version that is active at the given timestamp.
+     *
+     * @param timestamp Timestamp.
+     * @return Catalog or {@code null} if such version of the catalog doesn't 
exist.
+     */
+    @Nullable Catalog catalogByTsNullable(long timestamp) {
+        try {
+            int catalogVer = catalogManager.activeCatalogVersion(timestamp);
+
+            return catalogManager.catalog(catalogVer - 1);
+        } catch (IllegalStateException ignore) {
+            return null;
+        }
+    }
+
+    CompletableFuture<Boolean> compactCatalog(int version) {
+        return catalogManager.compactCatalog(version);
+    }
+}
diff --git 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMessageGroup.java
 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMessageGroup.java
index b1c74a5b51..6e828e712d 100644
--- 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMessageGroup.java
+++ 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMessageGroup.java
@@ -26,9 +26,9 @@ import 
org.apache.ignite.internal.network.annotations.MessageGroup;
 public class CatalogCompactionMessageGroup {
     public static final short GROUP_TYPE = 14;
 
-    /** See {@link CatalogMinimumRequiredTimeRequest} for the details. */
-    public static final short MINIMUM_REQUIRED_TIME_REQUEST = 0;
+    /** See {@link CatalogCompactionMinimumTimesRequest} for the details. */
+    public static final short MINIMUM_TIMES_REQUEST = 0;
 
-    /** See {@link CatalogMinimumRequiredTimeResponse} for the details. */
-    public static final short MINIMUM_REQUIRED_TIME_RESPONSE = 1;
+    /** See {@link CatalogCompactionMinimumTimesResponse} for the details. */
+    public static final short MINIMUM_TIMES_RESPONSE = 1;
 }
diff --git 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeRequest.java
 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMinimumTimesRequest.java
similarity index 77%
copy from 
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeRequest.java
copy to 
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMinimumTimesRequest.java
index d1156a3736..3177d614cd 100644
--- 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeRequest.java
+++ 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMinimumTimesRequest.java
@@ -21,10 +21,11 @@ import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
 
 /**
- * Request to obtain the minimum timestamp to which, from the local
- * node's perspective, the catalog history can be safely truncated.
+ * Request to obtain timestamps required to safely perform catalog compaction.
+ *
+ * @see CatalogCompactionMinimumTimesResponse
  */
-@Transferable(CatalogCompactionMessageGroup.MINIMUM_REQUIRED_TIME_REQUEST)
-public interface CatalogMinimumRequiredTimeRequest extends NetworkMessage {
+@Transferable(CatalogCompactionMessageGroup.MINIMUM_TIMES_REQUEST)
+public interface CatalogCompactionMinimumTimesRequest extends NetworkMessage {
 
 }
diff --git 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMinimumTimesResponse.java
similarity index 60%
copy from 
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
copy to 
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMinimumTimesResponse.java
index b3760e9d15..a3c1eee692 100644
--- 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
+++ 
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMinimumTimesResponse.java
@@ -21,11 +21,19 @@ import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
 
 /**
- * Response message containing the low watermark required for the local node.
- * This watermark is used to safely truncate catalog history.
+ * Response message containing timestamps from the entire cluster required to 
safely perform catalog compaction.
+ *
+ * <p>This includes the following:
+ * <ol>
+ *     <li>the minimum timestamp to which, from the local node's perspective, 
the catalog history can be safely truncated</li>
+ *     <li>the minimum starting time among locally started active RW 
transactions</li>
+ * </ol>
  */
-@Transferable(CatalogCompactionMessageGroup.MINIMUM_REQUIRED_TIME_RESPONSE)
-public interface CatalogMinimumRequiredTimeResponse extends NetworkMessage {
+@Transferable(CatalogCompactionMessageGroup.MINIMUM_TIMES_RESPONSE)
+public interface CatalogCompactionMinimumTimesResponse extends NetworkMessage {
     /** Returns node's minimum required time. */
-    long timestamp();
+    long minimumRequiredTime();
+
+    /** Returns node's minimum starting time among locally started active RW 
transactions. */
+    long minimumActiveTxTime();
 }
diff --git 
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/AbstractCatalogCompactionTest.java
 
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/AbstractCatalogCompactionTest.java
new file mode 100644
index 0000000000..13cc7bfdd4
--- /dev/null
+++ 
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/AbstractCatalogCompactionTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction;
+
+import static 
org.apache.ignite.internal.catalog.CatalogTestUtils.awaitDefaultZoneCreation;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.ClockWaiter;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.manager.ComponentContext;
+import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.BeforeEach;
+
+/** Base class for catalog compaction unit testing. */
+abstract class AbstractCatalogCompactionTest extends BaseIgniteAbstractTest {
+    private final HybridClock clock = new HybridClockImpl();
+
+    private final ClockWaiter clockWaiter = new ClockWaiter("test-node", 
clock);
+
+    final ClockService clockService = new TestClockService(clock, clockWaiter);
+
+    CatalogManagerImpl catalogManager;
+
+    @BeforeEach
+    void setUp() {
+        catalogManager = createCatalogManager("test-node");
+    }
+
+    /** Creates catalog manager. */
+    private CatalogManagerImpl createCatalogManager(String nodeName) {
+        StandaloneMetaStorageManager metastore = 
StandaloneMetaStorageManager.create(new 
SimpleInMemoryKeyValueStorage(nodeName));
+        CatalogManagerImpl manager = new CatalogManagerImpl(new 
UpdateLogImpl(metastore), clockService);
+
+        assertThat(startAsync(new ComponentContext(), metastore, clockWaiter, 
manager), willCompleteSuccessfully());
+        assertThat("Watches were not deployed", metastore.deployWatches(), 
willCompleteSuccessfully());
+        awaitDefaultZoneCreation(manager);
+
+        return manager;
+    }
+}
diff --git 
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
 
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
index bb56812acf..49910f3259 100644
--- 
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
+++ 
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
@@ -17,14 +17,13 @@
 
 package org.apache.ignite.internal.catalog.compaction;
 
-import static 
org.apache.ignite.internal.catalog.CatalogTestUtils.awaitDefaultZoneCreation;
 import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParams;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
 import static org.apache.ignite.sql.ColumnType.INT32;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -35,67 +34,67 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.affinity.TokenizedAssignmentsImpl;
 import org.apache.ignite.internal.catalog.Catalog;
-import org.apache.ignite.internal.catalog.CatalogManagerImpl;
 import org.apache.ignite.internal.catalog.CatalogTestUtils.TestCommand;
 import org.apache.ignite.internal.catalog.commands.CatalogUtils;
 import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
 import org.apache.ignite.internal.catalog.commands.CreateTableCommandBuilder;
 import org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
 import 
org.apache.ignite.internal.catalog.compaction.message.CatalogCompactionMessagesFactory;
-import 
org.apache.ignite.internal.catalog.compaction.message.CatalogMinimumRequiredTimeRequest;
-import 
org.apache.ignite.internal.catalog.compaction.message.CatalogMinimumRequiredTimeResponse;
-import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
+import 
org.apache.ignite.internal.catalog.compaction.message.CatalogCompactionMinimumTimesRequest;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.internal.hlc.ClockService;
-import org.apache.ignite.internal.hlc.ClockWaiter;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.manager.ComponentContext;
-import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
-import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.UnresolvableConsistentIdException;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /**
  * Tests for class {@link CatalogCompactionRunner}.
  */
-public class CatalogCompactionRunnerSelfTest extends BaseIgniteAbstractTest {
+public class CatalogCompactionRunnerSelfTest extends 
AbstractCatalogCompactionTest {
     private static final LogicalNode NODE1 = new LogicalNode("1", "node1", new 
NetworkAddress("localhost", 123));
 
     private static final LogicalNode NODE2 = new LogicalNode("2", "node2", new 
NetworkAddress("localhost", 123));
@@ -104,9 +103,7 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
 
     private static final List<LogicalNode> logicalNodes = List.of(NODE1, 
NODE2, NODE3);
 
-    private final ClockService clockService = new TestClockService(new 
HybridClockImpl());
-
-    private CatalogManagerImpl catalogManager;
+    private final AtomicReference<ClusterNode> coordinatorNodeHolder = new 
AtomicReference<>();
 
     private LogicalTopologyService logicalTopologyService;
 
@@ -114,12 +111,7 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
 
     private PlacementDriver placementDriver;
 
-    @BeforeEach
-    void setup() {
-        HybridClock clock = new HybridClockImpl();
-
-        catalogManager = createCatalogManager(clock);
-    }
+    private ReplicaService replicaService;
 
     @Test
     public void routineSucceedOnCoordinator() throws InterruptedException {
@@ -147,7 +139,7 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
         CatalogCompactionRunner compactionRunner = createRunner(NODE1, NODE1, 
nodeToTime::get);
 
         assertThat(compactionRunner.onLowWatermarkChanged(clockService.now()), 
willBe(false));
-        assertThat(compactionRunner.lastRunFuture(), willBe(true));
+        assertThat(compactionRunner.lastRunFuture(), 
willCompleteSuccessfully());
 
         int expectedEarliestCatalogVersion = catalog1.version() - 1;
 
@@ -156,25 +148,30 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
         verify(messagingService, times(logicalNodes.size() - 
1)).invoke(any(ClusterNode.class), any(NetworkMessage.class), anyLong());
 
         // Nothing should be changed if catalog already compacted for previous 
timestamp.
-        assertThat(compactionRunner.triggerCompaction(clockService.now()), 
willBe(false));
+        compactionRunner.triggerCompaction(clockService.now());
+        assertThat(compactionRunner.lastRunFuture(), 
willCompleteSuccessfully());
+        assertEquals(expectedEarliestCatalogVersion, 
catalogManager.earliestCatalogVersion());
 
         // Nothing should be changed if previous catalog doesn't exists.
         Catalog earliestCatalog = 
Objects.requireNonNull(catalogManager.catalog(catalogManager.earliestCatalogVersion()));
         compactionRunner = createRunner(NODE1, NODE1, (n) -> 
earliestCatalog.time());
-        assertThat(compactionRunner.triggerCompaction(clockService.now()), 
willBe(false));
+        compactionRunner.triggerCompaction(clockService.now());
+        assertThat(compactionRunner.lastRunFuture(), 
willCompleteSuccessfully());
         verify(messagingService, times(0)).invoke(any(ClusterNode.class), 
any(NetworkMessage.class), anyLong());
     }
 
     @Test
     public void mustNotStartOnNonCoordinator() {
-        CatalogCompactionRunner compactor = createRunner(NODE1, NODE3, ignore 
-> 0L);
+        assertThat(catalogManager.execute(TestCommand.ok()), 
willCompleteSuccessfully());
+        CatalogCompactionRunner compactor = createRunner(NODE1, NODE3, ignore 
-> clockService.nowLong());
 
-        CompletableFuture<Boolean> lastRunFuture = compactor.lastRunFuture();
+        CompletableFuture<Void> lastRunFuture = compactor.lastRunFuture();
 
         assertThat(compactor.onLowWatermarkChanged(clockService.now()), 
willBe(false));
         assertThat(compactor.lastRunFuture(), is(lastRunFuture));
 
         // Changing the coordinator should trigger compaction.
+        coordinatorNodeHolder.set(NODE1);
         compactor.updateCoordinator(NODE1);
         assertThat(compactor.lastRunFuture(), is(not(lastRunFuture)));
     }
@@ -187,27 +184,13 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
         CatalogCompactionRunner compactor =
                 createRunner(NODE1, NODE1, (n) -> earliestCatalog.time() - 1, 
logicalNodes, logicalNodes);
 
-        assertThat(compactor.triggerCompaction(clockService.now()), 
willBe(false));
+        compactor.triggerCompaction(clockService.now());
+        assertThat(compactor.lastRunFuture(), willCompleteSuccessfully());
     }
 
     @Test
     public void mustNotPerformWhenAssignmentNodeIsMissing() throws 
InterruptedException {
-        CreateTableCommandBuilder tabBuilder = CreateTableCommand.builder()
-                .tableName("test")
-                .schemaName("PUBLIC")
-                .columns(List.of(columnParams("key1", INT32), 
columnParams("key2", INT32), columnParams("val", INT32, true)))
-                
.primaryKey(TableHashPrimaryKey.builder().columns(List.of("key1", 
"key2")).build())
-                .colocationColumns(List.of("key2"));
-
-        assertThat(catalogManager.execute(TestCommand.ok()), 
willCompleteSuccessfully());
-        
assertThat(catalogManager.execute(tabBuilder.tableName("test1").build()), 
willCompleteSuccessfully());
-        
assertThat(catalogManager.execute(tabBuilder.tableName("test2").build()), 
willCompleteSuccessfully());
-        
assertThat(catalogManager.execute(tabBuilder.tableName("test3").build()), 
willCompleteSuccessfully());
-        assertThat(catalogManager.execute(TestCommand.ok()), 
willCompleteSuccessfully());
-
-        Catalog catalog = 
catalogManager.catalog(catalogManager.activeCatalogVersion(clockService.nowLong()));
-
-        assertNotNull(catalog);
+        Catalog catalog = prepareCatalogWithTables();
 
         // Node NODE3 from the assignment is missing in logical topology.
         {
@@ -219,7 +202,9 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
                     List.of(NODE1, NODE2, NODE3)
             );
 
-            assertThat(compactor.triggerCompaction(clockService.now()), 
willBe(false));
+            compactor.triggerCompaction(clockService.now());
+            assertThat(compactor.lastRunFuture(), willCompleteSuccessfully());
+            assertThat(catalogManager.earliestCatalogVersion(), is(0));
         }
 
         // Node NODE3 from the assignment is missing in logical topology, but 
topology changes during messaging.
@@ -245,8 +230,12 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
                     logicalNodes
             );
 
-            CompletableFuture<CompletableFuture<Boolean>> fut = 
IgniteTestUtils.runAsync(
-                    () -> compactor.triggerCompaction(clockService.now()));
+            CompletableFuture<CompletableFuture<Void>> fut = 
IgniteTestUtils.runAsync(
+                    () -> {
+                        compactor.triggerCompaction(clockService.now());
+
+                        return compactor.lastRunFuture();
+                    });
 
             assertTrue(messageBlockLatch.await(5, TimeUnit.SECONDS));
 
@@ -263,7 +252,7 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
             // Since we do not know the minimum required time by NODE3, 
despite the fact
             // that all the necessary nodes are in the logical topology at the 
time
             // assignments are collected, we cannot perform catalog compaction.
-            assertThat(fut.join(), willBe(false));
+            assertThat(catalogManager.earliestCatalogVersion(), is(0));
         }
 
         // All nodes from the assignments are present in logical topology.
@@ -276,7 +265,11 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
                     logicalNodes
             );
 
-            assertThat(compactor.triggerCompaction(clockService.now()), 
willBe(true));
+            compactor.triggerCompaction(clockService.now());
+            assertThat(compactor.lastRunFuture(), willCompleteSuccessfully());
+            waitForCondition(() -> catalogManager.earliestCatalogVersion() != 
0, 1_000);
+
+            assertThat(catalogManager.earliestCatalogVersion(), 
is(catalog.version() - 1));
         }
     }
 
@@ -292,9 +285,10 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
         };
 
         CatalogCompactionRunner compactor = createRunner(NODE1, NODE1, 
timeSupplier);
+        compactor.triggerCompaction(clockService.now());
 
         ExecutionException ex = 
Assertions.assertThrows(ExecutionException.class,
-                () -> compactor.triggerCompaction(clockService.now()).get());
+                () -> compactor.lastRunFuture().get());
 
         assertThat(ex.getCause(), instanceOf(expected.getClass()));
         assertThat(ex.getCause().getMessage(), equalTo(expected.getMessage()));
@@ -303,7 +297,7 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
 
     @Test
     public void compactionAbortedIfAssignmentsNotAvailableForTable() {
-        CreateTableCommandBuilder tabBuilder = CreateTableCommand.builder()
+        CreateTableCommandBuilder tableCmdBuilder = 
CreateTableCommand.builder()
                 .tableName("test")
                 .schemaName("PUBLIC")
                 .columns(List.of(columnParams("key1", INT32), 
columnParams("key2", INT32), columnParams("val", INT32, true)))
@@ -311,7 +305,7 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
                 .colocationColumns(List.of("key2"));
 
         assertThat(catalogManager.execute(TestCommand.ok()), 
willCompleteSuccessfully());
-        
assertThat(catalogManager.execute(tabBuilder.tableName("test1").build()), 
willCompleteSuccessfully());
+        assertThat(catalogManager.execute(tableCmdBuilder.build()), 
willCompleteSuccessfully());
         assertThat(catalogManager.execute(TestCommand.ok()), 
willCompleteSuccessfully());
 
         Catalog catalog = 
catalogManager.catalog(catalogManager.activeCatalogVersion(clockService.nowLong()));
@@ -326,12 +320,133 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
         );
 
         when(placementDriver.getAssignments(any(List.class), 
any())).thenReturn(CompletableFuture.failedFuture(new ArithmeticException()));
-        assertThat(compactor.triggerCompaction(clockService.now()), 
willThrow(ArithmeticException.class));
+        compactor.triggerCompaction(clockService.now());
+        assertThat(compactor.lastRunFuture(), 
willThrow(ArithmeticException.class));
 
         List<?> assignments = IntStream.range(0, 
CatalogUtils.DEFAULT_PARTITION_COUNT).mapToObj(i -> 
null).collect(Collectors.toList());
 
         when(placementDriver.getAssignments(any(List.class), 
any())).thenReturn(CompletableFuture.completedFuture(assignments));
-        assertThat(compactor.triggerCompaction(clockService.now()), 
willThrow(IllegalStateException.class));
+        compactor.triggerCompaction(clockService.now());
+        assertThat(compactor.lastRunFuture(), 
willThrow(IllegalStateException.class));
+    }
+
+    @Test
+    public void shouldNotStartIfAlreadyInProgress() throws 
InterruptedException {
+        assertThat(catalogManager.execute(TestCommand.ok()), 
willCompleteSuccessfully());
+
+        CountDownLatch messageBlockLatch = new CountDownLatch(1);
+        CountDownLatch topologyChangeLatch = new CountDownLatch(1);
+
+        CatalogCompactionRunner compactor = createRunner(
+                NODE1,
+                NODE1,
+                (node) -> {
+                    if (NODE1.name().equals(node)) {
+                        return clockService.nowLong();
+                    }
+
+                    try {
+                        messageBlockLatch.countDown();
+
+                        topologyChangeLatch.await();
+
+                        return Long.MIN_VALUE;
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+        );
+
+        compactor.triggerCompaction(clockService.now());
+
+        messageBlockLatch.await();
+
+        CompletableFuture<Void> lastFut = compactor.lastRunFuture();
+
+        compactor.triggerCompaction(clockService.now());
+
+        assertSame(lastFut, compactor.lastRunFuture());
+
+        topologyChangeLatch.countDown();
+
+        assertThat(compactor.lastRunFuture(), willCompleteSuccessfully());
+    }
+
+    @Test
+    public void minTxTimePropagation() {
+        Catalog catalog = prepareCatalogWithTables();
+
+        List<LogicalNode> logicalTopology = List.of(NODE2, NODE3, NODE1);
+        List<LogicalNode> assignments = List.of(NODE3, NODE2, NODE1);
+        CatalogCompactionRunner compactor = createRunner(NODE1, NODE1, (n) -> 
catalog.time(), logicalTopology, assignments);
+
+        assertThat(compactor.propagateTimeToReplicas(catalog.time(), 
logicalTopology), willCompleteSuccessfully());
+
+        // All invocations must be made locally, since coordinator is present 
in assignments for all tables.
+        verify(replicaService, times(0)).invoke(eq(NODE2.name()), 
any(ReplicaRequest.class));
+        verify(replicaService, times(0)).invoke(eq(NODE3.name()), 
any(ReplicaRequest.class));
+        verify(replicaService, times(/* tables */ 3 * /* partitions */ 
25)).invoke(eq(NODE1.name()), any(ReplicaRequest.class));
+    }
+
+    @Test
+    public void minTxTimePropagationSucceedWhenSomeAssignmentIsMissing() {
+        Catalog catalog = prepareCatalogWithTables();
+
+        {
+            List<LogicalNode> logicalTopology = List.of(NODE2, NODE1);
+            List<LogicalNode> assignments = List.of(NODE3, NODE2, NODE1);
+            CatalogCompactionRunner compactor = createRunner(NODE1, NODE1, (n) 
-> catalog.time(), logicalTopology, assignments);
+
+            assertThat(compactor.propagateTimeToReplicas(catalog.time(), 
logicalTopology), willCompleteSuccessfully());
+            verify(replicaService, times(0)).invoke(eq(NODE2.name()), 
any(ReplicaRequest.class));
+            verify(replicaService, times(0)).invoke(eq(NODE3.name()), 
any(ReplicaRequest.class));
+            verify(replicaService, times(/* tables */ 3 * /* partitions */ 
25)).invoke(eq(NODE1.name()), any(ReplicaRequest.class));
+            clearInvocations(replicaService);
+        }
+
+        {
+            List<LogicalNode> logicalTopology = List.of(NODE2, NODE1);
+            List<LogicalNode> assignments = List.of(NODE3, NODE2);
+            CatalogCompactionRunner compactor = createRunner(NODE1, NODE1, (n) 
-> catalog.time(), logicalTopology, assignments);
+
+            assertThat(compactor.propagateTimeToReplicas(catalog.time(), 
logicalTopology), willCompleteSuccessfully());
+            verify(replicaService, times(0)).invoke(eq(NODE1.name()), 
any(ReplicaRequest.class));
+            verify(replicaService, times(0)).invoke(eq(NODE3.name()), 
any(ReplicaRequest.class));
+            verify(replicaService, times(/* tables */ 3 * /* partitions */ 
25)).invoke(eq(NODE2.name()), any(ReplicaRequest.class));
+        }
+    }
+
+    @Test
+    public void minTxTimePropagationAbortedIfNoAssignmentsPresentInTopology() {
+        Catalog catalog = prepareCatalogWithTables();
+
+        List<LogicalNode> logicalTopology = List.of(NODE1, NODE2);
+        List<LogicalNode> assignments = List.of(NODE3);
+
+        CatalogCompactionRunner compactor = createRunner(NODE1, NODE1, (n) -> 
catalog.time(), logicalTopology, assignments);
+
+        CompletableFuture<Void> fut = 
compactor.propagateTimeToReplicas(catalog.time(), logicalTopology);
+
+        //noinspection ThrowableNotThrown
+        assertThrows(IllegalStateException.class, () -> await(fut), "Current 
topology doesn't include assignment nodes");
+    }
+
+    private Catalog prepareCatalogWithTables() {
+        CreateTableCommandBuilder tableCmdBuilder = 
CreateTableCommand.builder()
+                .schemaName("PUBLIC")
+                .columns(List.of(columnParams("key1", INT32), 
columnParams("key2", INT32), columnParams("val", INT32, true)))
+                
.primaryKey(TableHashPrimaryKey.builder().columns(List.of("key1", 
"key2")).build())
+                .colocationColumns(List.of("key2"));
+
+        assertThat(catalogManager.execute(TestCommand.ok()), 
willCompleteSuccessfully());
+        
assertThat(catalogManager.execute(tableCmdBuilder.tableName("test1").build()), 
willCompleteSuccessfully());
+        
assertThat(catalogManager.execute(tableCmdBuilder.tableName("test2").build()), 
willCompleteSuccessfully());
+        
assertThat(catalogManager.execute(tableCmdBuilder.tableName("test3").build()), 
willCompleteSuccessfully());
+        assertThat(catalogManager.execute(TestCommand.ok()), 
willCompleteSuccessfully());
+
+        Catalog catalog = 
catalogManager.catalog(catalogManager.activeCatalogVersion(clockService.nowLong()));
+
+        return Objects.requireNonNull(catalog);
     }
 
     private CatalogCompactionRunner createRunner(
@@ -349,33 +464,40 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
             List<LogicalNode> topology,
             List<LogicalNode> assignmentNodes
     ) {
+        coordinatorNodeHolder.set(coordinator);
         messagingService = mock(MessagingService.class);
         logicalTopologyService = mock(LogicalTopologyService.class);
         placementDriver = mock(PlacementDriver.class);
+        replicaService = mock(ReplicaService.class);
+        SchemaSyncService schemaSyncService = mock(SchemaSyncService.class);
+
         CatalogCompactionMessagesFactory messagesFactory = new 
CatalogCompactionMessagesFactory();
 
-        when(messagingService.invoke(any(ClusterNode.class), 
any(CatalogMinimumRequiredTimeRequest.class), anyLong()))
+        when(messagingService.invoke(any(ClusterNode.class), 
any(CatalogCompactionMinimumTimesRequest.class), anyLong()))
                 .thenAnswer(invocation -> {
-                    String nodeName = ((ClusterNode) 
invocation.getArgument(0)).name();
+                    return CompletableFuture.supplyAsync(() -> {
+                        String nodeName = ((ClusterNode) 
invocation.getArgument(0)).name();
 
-                    assertThat("Coordinator shouldn't send messages to 
himself", nodeName, not(Matchers.equalTo(coordinator.name())));
+                        assertThat("Coordinator shouldn't send messages to 
himself",
+                                nodeName, 
not(Matchers.equalTo(coordinatorNodeHolder.get().name())));
 
-                    Object obj = timeSupplier.apply(nodeName);
-
-                    // Simulate an exception when exchanging messages.
-                    if (obj instanceof Exception) {
-                        return CompletableFuture.failedFuture((Exception) obj);
-                    }
+                        Object obj = timeSupplier.apply(nodeName);
 
-                    CatalogMinimumRequiredTimeResponse msg = 
messagesFactory.catalogMinimumRequiredTimeResponse()
-                            .timestamp(((Long) obj)).build();
+                        // Simulate an exception when exchanging messages.
+                        if (obj instanceof Exception) {
+                            throw new CompletionException((Exception) obj);
+                        }
 
-                    return CompletableFuture.completedFuture(msg);
+                        return 
messagesFactory.catalogCompactionMinimumTimesResponse()
+                                .minimumRequiredTime(((Long) obj))
+                                .minimumActiveTxTime(clockService.nowLong())
+                                .build();
+                    });
                 });
 
         Set<Assignment> assignments = assignmentNodes.stream()
                 .map(node -> Assignment.forPeer(node.name()))
-                .collect(Collectors.toSet());
+                .collect(Collectors.toCollection(LinkedHashSet::new));
 
         List<?> tableAssignments = IntStream.range(0, 
CatalogUtils.DEFAULT_PARTITION_COUNT)
                 .mapToObj(i -> new TokenizedAssignmentsImpl(assignments, 
Long.MAX_VALUE))
@@ -387,14 +509,33 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
 
         
when(logicalTopologyService.localLogicalTopology()).thenReturn(logicalTop);
 
+        Set<String> logicalNodeNames = 
topology.stream().map(ClusterNodeImpl::name).collect(Collectors.toSet());
+
+        when(replicaService.invoke(any(String.class), 
any(ReplicaRequest.class)))
+                .thenAnswer(invocation ->
+                        CompletableFuture.supplyAsync(() -> {
+                            String nodeName = invocation.getArgument(0);
+
+                            if (!logicalNodeNames.contains(nodeName)) {
+                                throw new 
UnresolvableConsistentIdException(nodeName);
+                            }
+
+                            return null;
+                        }));
+
+        
when(schemaSyncService.waitForMetadataCompleteness(any())).thenReturn(CompletableFutures.nullCompletedFuture());
+
         CatalogCompactionRunner runner = new CatalogCompactionRunner(
                 localNode.name(),
                 catalogManager,
                 messagingService,
                 logicalTopologyService,
                 placementDriver,
+                replicaService,
                 clockService,
+                schemaSyncService,
                 ForkJoinPool.commonPool(),
+                clockService::now,
                 () -> (Long) timeSupplier.apply(coordinator.name())
         );
 
@@ -404,18 +545,4 @@ public class CatalogCompactionRunnerSelfTest extends 
BaseIgniteAbstractTest {
 
         return runner;
     }
-
-    private static CatalogManagerImpl createCatalogManager(HybridClock clock) {
-        StandaloneMetaStorageManager metastore = 
StandaloneMetaStorageManager.create(new 
SimpleInMemoryKeyValueStorage(NODE1.name()));
-        ClockWaiter clockWaiter = new ClockWaiter(NODE1.name(), clock);
-        TestClockService clockService = new TestClockService(clock, 
clockWaiter);
-
-        CatalogManagerImpl manager = new CatalogManagerImpl(new 
UpdateLogImpl(metastore), clockService);
-
-        assertThat(startAsync(new ComponentContext(), metastore, clockWaiter, 
manager), willCompleteSuccessfully());
-        assertThat("Watches were not deployed", metastore.deployWatches(), 
willCompleteSuccessfully());
-        awaitDefaultZoneCreation(manager);
-
-        return manager;
-    }
 }
diff --git 
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
 
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
new file mode 100644
index 0000000000..5072e0200d
--- /dev/null
+++ 
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionFacadeTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction;
+
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParams;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import java.util.List;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
+import org.apache.ignite.internal.catalog.commands.CreateTableCommandBuilder;
+import org.apache.ignite.internal.catalog.commands.DropTableCommand;
+import org.apache.ignite.internal.catalog.commands.DropTableCommandBuilder;
+import org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link CatalogManagerCompactionFacade}.
+ */
+class CatalogManagerCompactionFacadeTest extends AbstractCatalogCompactionTest 
{
+    private CatalogManagerCompactionFacade catalogManagerFacade;
+
+    @BeforeEach
+    void setupHelper() {
+        catalogManagerFacade = new 
CatalogManagerCompactionFacade(catalogManager);
+    }
+
+    @Test
+    void testCollectTablesWithPartitionsBetween() {
+        CreateTableCommandBuilder tableCmdBuilder = 
CreateTableCommand.builder()
+                .schemaName("PUBLIC")
+                .columns(List.of(columnParams("key1", INT32), 
columnParams("key2", INT32), columnParams("val", INT32, true)))
+                
.primaryKey(TableHashPrimaryKey.builder().columns(List.of("key1", 
"key2")).build())
+                .colocationColumns(List.of("key2"));
+
+        DropTableCommandBuilder dropTableCommandBuilder = 
DropTableCommand.builder()
+                        .schemaName("PUBLIC");
+
+        long from1 = clockService.nowLong();
+
+        
assertThat(catalogManager.execute(tableCmdBuilder.tableName("test1").build()), 
willCompleteSuccessfully());
+        
assertThat(catalogManager.execute(dropTableCommandBuilder.tableName("test1").build()),
 willCompleteSuccessfully());
+
+        long from2 = clockService.nowLong();
+
+        
assertThat(catalogManager.execute(tableCmdBuilder.tableName("test2").build()), 
willCompleteSuccessfully());
+        
assertThat(catalogManager.execute(dropTableCommandBuilder.tableName("test2").build()),
 willCompleteSuccessfully());
+
+        long from3 = clockService.nowLong();
+        
assertThat(catalogManager.execute(tableCmdBuilder.tableName("test3").build()), 
willCompleteSuccessfully());
+        
assertThat(catalogManager.execute(dropTableCommandBuilder.tableName("test3").build()),
 willCompleteSuccessfully());
+
+        {
+            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectTablesWithPartitionsBetween(from1,
+                    clockService.nowLong());
+
+            assertThat(tablesWithParts.keySet(), hasSize(3));
+        }
+
+        {
+            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectTablesWithPartitionsBetween(from2,
+                    clockService.nowLong());
+
+            assertThat(tablesWithParts.keySet(), hasSize(2));
+        }
+
+        {
+            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectTablesWithPartitionsBetween(from3,
+                    clockService.nowLong());
+
+            assertThat(tablesWithParts.keySet(), hasSize(1));
+        }
+
+        {
+            Int2IntMap tablesWithParts = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
+                    clockService.nowLong(),
+                    clockService.nowLong()
+            );
+
+            assertThat(tablesWithParts.keySet(), hasSize(0));
+        }
+    }
+
+    @Test
+    void testCatalogByTsNullable() {
+        Catalog earliestCatalog = 
catalogManager.catalog(catalogManager.earliestCatalogVersion());
+        assertNotNull(earliestCatalog);
+
+        
assertNull(catalogManagerFacade.catalogByTsNullable(earliestCatalog.time() - 
1));
+    }
+}
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 70c086c5b7..5865e5f00e 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -345,12 +345,16 @@ public class CatalogManagerImpl extends 
AbstractEventProducer<CatalogEvent, Cata
     /**
      * Cleanup outdated catalog versions, which can't be observed after given 
timestamp (inclusively), and compact underlying update log.
      *
-     * @param timestamp Earliest observable timestamp.
+     * @param version Earliest observable version.
      * @return Operation future, which is completing with {@code true} if a 
new snapshot has been successfully written, {@code false}
      *         otherwise if a snapshot with the same or greater version 
already exists.
      */
-    public CompletableFuture<Boolean> compactCatalog(long timestamp) {
-        Catalog catalog = catalogAt(timestamp);
+    public CompletableFuture<Boolean> compactCatalog(int version) {
+        Catalog catalog = catalog(version);
+
+        if (catalog == null) {
+            throw new IllegalArgumentException("Catalog version not found: " + 
version);
+        }
 
         return updateLog.saveSnapshot(new SnapshotEntry(catalog));
     }
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
index efb5574a85..1ae5d4d8f3 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
@@ -151,7 +151,6 @@ public class CatalogManagerRecoveryTest extends 
BaseIgniteAbstractTest {
         // Save version, which will be earliest version after snapshot.
         int earliestVersion = catalogManager.latestCatalogVersion();
         long earliestVersionActivationTime = 
catalogManager.catalog(earliestVersion).time();
-        long snapshotTime = clock.nowLong();
 
         assertThat(catalogManager.execute(simpleTable(TABLE_NAME_3)), 
willCompleteSuccessfully());
 
@@ -162,7 +161,7 @@ public class CatalogManagerRecoveryTest extends 
BaseIgniteAbstractTest {
         clearInvocations(interceptor);
         interceptor.dropSnapshotEvents();
 
-        assertThat(((CatalogManagerImpl) 
catalogManager).compactCatalog(snapshotTime), willBe(true));
+        assertThat(((CatalogManagerImpl) 
catalogManager).compactCatalog(earliestVersion), willBe(true));
 
         verify(interceptor, timeout(2_000)).handle(any(SnapshotEntry.class), 
any(), anyLong());
         assertThat(catalogManager.earliestCatalogVersion(), equalTo(0));
@@ -181,7 +180,6 @@ public class CatalogManagerRecoveryTest extends 
BaseIgniteAbstractTest {
         assertThrows(IllegalStateException.class, () -> 
catalogManager.activeCatalogVersion(0));
         assertThrows(IllegalStateException.class, () -> 
catalogManager.activeCatalogVersion(earliestVersionActivationTime - 1));
         
assertThat(catalogManager.activeCatalogVersion(earliestVersionActivationTime), 
equalTo(earliestVersion));
-        assertThat(catalogManager.activeCatalogVersion(snapshotTime), 
equalTo(earliestVersion));
         
assertThat(catalogManager.activeCatalogVersion(latestVersionActivationTime), 
equalTo(latestVersion));
     }
 
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index 1b45d27895..ea038e7d79 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -427,14 +427,14 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
         assertThat(manager.execute(TestCommand.ok()), 
willCompleteSuccessfully());
         assertThat(manager.execute(TestCommand.ok()), 
willCompleteSuccessfully());
 
-        long timestamp = clock.nowLong();
+        int compactToVer = manager.latestCatalogVersion();
         Catalog catalog = 
manager.catalog(manager.activeCatalogVersion(clock.nowLong()));
 
         // Add more updates
         assertThat(manager.execute(TestCommand.ok()), 
willCompleteSuccessfully());
         assertThat(manager.execute(TestCommand.ok()), 
willCompleteSuccessfully());
 
-        assertThat(manager.compactCatalog(timestamp), willBe(Boolean.TRUE));
+        assertThat(manager.compactCatalog(compactToVer), willBe(Boolean.TRUE));
         assertTrue(waitForCondition(() -> catalog.version() == 
manager.earliestCatalogVersion(), 3_000));
 
         assertNull(manager.catalog(0));
@@ -444,9 +444,9 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
         assertThrows(IllegalStateException.class, () -> 
manager.activeCatalogVersion(0));
         assertThrows(IllegalStateException.class, () -> 
manager.activeCatalogVersion(catalog.time() - 1));
         assertSame(catalog.version(), 
manager.activeCatalogVersion(catalog.time()));
-        assertSame(catalog.version(), manager.activeCatalogVersion(timestamp));
+        assertSame(catalog.version(), compactToVer);
 
-        assertThat(manager.compactCatalog(timestamp), willBe(false));
+        assertThat(manager.compactCatalog(compactToVer), willBe(false));
         assertEquals(catalog.version(), manager.earliestCatalogVersion());
     }
 
@@ -454,9 +454,7 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
     public void testEmptyCatalogCompaction() {
         assertEquals(1, manager.latestCatalogVersion());
 
-        long timestamp = clock.nowLong();
-
-        assertThat(manager.compactCatalog(timestamp), willBe(false));
+        assertThat(manager.compactCatalog(1), willBe(false));
 
         assertEquals(0, manager.earliestCatalogVersion());
         assertEquals(1, manager.latestCatalogVersion());
@@ -464,6 +462,5 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
         assertNotNull(manager.catalog(1));
 
         assertEquals(0, manager.activeCatalogVersion(0));
-        assertEquals(1, manager.activeCatalogVersion(timestamp));
     }
 }
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
index 9a3c59c6c6..301a0f0f48 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexNodeFinishedRwTransactionsChecker.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessageHandler;
+import org.apache.ignite.internal.tx.ActiveLocalTxMinimumBeginTimeProvider;
 import org.apache.ignite.internal.tx.LocalRwTxCounter;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.network.ClusterNode;
@@ -49,7 +50,7 @@ import org.jetbrains.annotations.Nullable;
  * Local node RW transaction completion checker for indexes. Main task is to 
handle the
  * {@link IsNodeFinishedRwTransactionsStartedBeforeRequest}.
  */
-public class IndexNodeFinishedRwTransactionsChecker implements 
LocalRwTxCounter, IgniteComponent {
+public class IndexNodeFinishedRwTransactionsChecker implements 
LocalRwTxCounter, ActiveLocalTxMinimumBeginTimeProvider, IgniteComponent {
     private static final IndexMessagesFactory FACTORY = new 
IndexMessagesFactory();
 
     private final ReentrantReadWriteLock readWriteLock = new 
ReentrantReadWriteLock();
@@ -146,6 +147,20 @@ public class IndexNodeFinishedRwTransactionsChecker 
implements LocalRwTxCounter,
         });
     }
 
+    @Override
+    public HybridTimestamp minimumBeginTime() {
+        readWriteLock.writeLock().lock();
+
+        try {
+            // TODO https://issues.apache.org/jira/browse/IGNITE-22975 Improve 
minimum begin time determination
+            return txCatalogVersionByBeginTxTs.keySet().stream()
+                    .min(HybridTimestamp::compareTo)
+                    .orElse(clock.now());
+        } finally {
+            readWriteLock.writeLock().unlock();
+        }
+    }
+
     /**
      * Handles {@link IsNodeFinishedRwTransactionsStartedBeforeRequest} of 
{@link IndexMessageGroup}.
      *
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
index 40e4987556..52e815f2a8 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
@@ -25,6 +25,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxC
 import 
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesRequest;
@@ -54,6 +55,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadW
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSingleRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSwapRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
 
 /**
  * Message group for the table module.
@@ -191,6 +193,11 @@ public interface PartitionReplicationMessageGroup {
      */
     short GET_ESTIMATED_SIZE_MESSAGE = 25;
 
+    /**
+     * Message type for {@link UpdateMinimumActiveTxBeginTimeReplicaRequest}.
+     */
+    short UPDATE_MINIMUM_ACTIVE_TX_TIME_REPLICA_REQUEST = 26;
+
     /**
      * Message types for partition replicator module RAFT commands.
      *
@@ -211,6 +218,9 @@ public interface PartitionReplicationMessageGroup {
 
         /** Message type for {@link BuildIndexCommand}. */
         short BUILD_INDEX = 44;
+
+        /** Message type for {@link UpdateMinimumActiveTxBeginTimeCommand}. */
+        short UPDATE_MINIMUM_ACTIVE_TX_TIME_COMMAND = 45;
     }
 
     /**
diff --git 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateMinimumActiveTxBeginTimeCommand.java
similarity index 57%
copy from 
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
copy to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateMinimumActiveTxBeginTimeCommand.java
index b3760e9d15..0cd7bde81e 100644
--- 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateMinimumActiveTxBeginTimeCommand.java
@@ -15,17 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.catalog.compaction.message;
+package org.apache.ignite.internal.partition.replicator.network.command;
+
+import static 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands.UPDATE_MINIMUM_ACTIVE_TX_TIME_COMMAND;
 
-import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
 
 /**
- * Response message containing the low watermark required for the local node.
- * This watermark is used to safely truncate catalog history.
+ * Command to store the minimum starting time among all active RW transactions
+ * into transient state of each replication group.
  */
-@Transferable(CatalogCompactionMessageGroup.MINIMUM_REQUIRED_TIME_RESPONSE)
-public interface CatalogMinimumRequiredTimeResponse extends NetworkMessage {
-    /** Returns node's minimum required time. */
+@Transferable(UPDATE_MINIMUM_ACTIVE_TX_TIME_COMMAND)
+public interface UpdateMinimumActiveTxBeginTimeCommand extends 
SafeTimePropagatingCommand {
+    /** Returns the minimum starting time among all active RW transactions. */
     long timestamp();
 }
diff --git 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/UpdateMinimumActiveTxBeginTimeReplicaRequest.java
similarity index 60%
rename from 
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
rename to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/UpdateMinimumActiveTxBeginTimeReplicaRequest.java
index b3760e9d15..1738db8791 100644
--- 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeResponse.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/replication/UpdateMinimumActiveTxBeginTimeReplicaRequest.java
@@ -15,17 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.catalog.compaction.message;
+package org.apache.ignite.internal.partition.replicator.network.replication;
 
-import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 
 /**
- * Response message containing the low watermark required for the local node.
- * This watermark is used to safely truncate catalog history.
+ * Request to update the minimum starting time among all active RW 
transactions.
  */
-@Transferable(CatalogCompactionMessageGroup.MINIMUM_REQUIRED_TIME_RESPONSE)
-public interface CatalogMinimumRequiredTimeResponse extends NetworkMessage {
-    /** Returns node's minimum required time. */
+@Transferable(PartitionReplicationMessageGroup.UPDATE_MINIMUM_ACTIVE_TX_TIME_REPLICA_REQUEST)
+public interface UpdateMinimumActiveTxBeginTimeReplicaRequest extends 
ReplicaRequest {
+    /** Returns the minimum starting time among all active RW transactions. */
     long timestamp();
 }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 805c8fd585..e7bebf3218 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -762,25 +762,12 @@ public class IgniteImpl implements Ignite {
                 partitionIdleSafeTimePropagationPeriodMsSupplier
         );
 
-        CatalogCompactionRunner catalogCompactionRunner = new 
CatalogCompactionRunner(
-                name,
-                catalogManager,
-                clusterSvc.messagingService(),
-                logicalTopologyService,
-                placementDriverMgr.placementDriver(),
-                clockService,
-                threadPoolsManager.commonScheduler()
-        );
-
-        
metaStorageMgr.addElectionListener(catalogCompactionRunner::updateCoordinator);
-
         systemViewManager = new SystemViewManagerImpl(name, catalogManager);
         nodeAttributesCollector.register(systemViewManager);
         logicalTopology.addEventListener(systemViewManager);
         systemViewManager.register(catalogManager);
 
         this.catalogManager = catalogManager;
-        this.catalogCompactionRunner = catalogCompactionRunner;
 
         lowWatermark = new LowWatermarkImpl(
                 name,
@@ -791,9 +778,6 @@ public class IgniteImpl implements Ignite {
                 clusterSvc.messagingService()
         );
 
-        lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
-                params -> 
catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters)
 params).newLowWatermark()));
-
         this.indexMetaStorage = new IndexMetaStorage(catalogManager, 
lowWatermark, metaStorageMgr);
 
         raftMgr.appendEntriesRequestInterceptor(new 
CheckCatalogVersionOnAppendEntries(catalogManager));
@@ -834,6 +818,25 @@ public class IgniteImpl implements Ignite {
                 clock
         );
 
+        CatalogCompactionRunner catalogCompactionRunner = new 
CatalogCompactionRunner(
+                name,
+                catalogManager,
+                clusterSvc.messagingService(),
+                logicalTopologyService,
+                placementDriverMgr.placementDriver(),
+                replicaSvc,
+                clockService,
+                schemaSyncService,
+                threadPoolsManager.commonScheduler(),
+                indexNodeFinishedRwTransactionsChecker
+        );
+
+        
metaStorageMgr.addElectionListener(catalogCompactionRunner::updateCoordinator);
+        this.catalogCompactionRunner = catalogCompactionRunner;
+
+        lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
+                params -> 
catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters)
 params).newLowWatermark()));
+
         resourcesRegistry = new RemotelyTriggeredResourceRegistry();
 
         var transactionInflights = new 
TransactionInflights(placementDriverMgr.placementDriver(), clockService);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 195fe3e936..11fd10dc3e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -50,6 +50,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.command.BuildInde
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ReadCommand;
@@ -95,6 +96,9 @@ public class PartitionListener implements RaftGroupListener, 
BeforeApplyHandler
     /** Logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(PartitionListener.class);
 
+    /** Undefined value for {@link #minActiveTxBeginTime}. */
+    private static final long UNDEFINED_MIN_TX_TIME = 0L;
+
     /** Transaction manager. */
     private final TxManager txManager;
 
@@ -127,6 +131,12 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
     private final IndexMetaStorage indexMetaStorage;
 
+    /**
+     * Timestamp with minimum starting time among all active RW transactions 
in the cluster.
+     * This timestamp is used to prevent the catalog from being dropped, which 
may be used when applying raft commands.
+     */
+    private volatile long minActiveTxBeginTime = UNDEFINED_MIN_TX_TIME;
+
     /** Constructor. */
     public PartitionListener(
             TxManager txManager,
@@ -221,6 +231,8 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
                     
handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command, 
commandIndex, commandTerm);
                 } else if (command instanceof VacuumTxStatesCommand) {
                     handleVacuumTxStatesCommand((VacuumTxStatesCommand) 
command, commandIndex, commandTerm);
+                } else if (command instanceof 
UpdateMinimumActiveTxBeginTimeCommand) {
+                    
handleUpdateMinimalActiveTxTimeCommand((UpdateMinimumActiveTxBeginTimeCommand) 
command, commandIndex, commandTerm);
                 } else {
                     assert false : "Command was not found [cmd=" + command + 
']';
                 }
@@ -574,6 +586,21 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
         return storage.getStorage();
     }
 
+    /**
+     * Returns minimum starting time among all active RW transactions in the 
cluster,
+     * or {@code null} if the value has not yet been set.
+     */
+    @TestOnly
+    public @Nullable Long minimumActiveTxBeginTime() {
+        long minActiveTxBeginTime0 = minActiveTxBeginTime;
+
+        if (minActiveTxBeginTime0 == UNDEFINED_MIN_TX_TIME) {
+            return null;
+        }
+
+        return minActiveTxBeginTime0;
+    }
+
     /**
      * Handler for the {@link BuildIndexCommand}.
      *
@@ -666,6 +693,19 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
         txStateStorage.removeAll(cmd.txIds(), commandIndex, commandTerm);
     }
 
+    private void 
handleUpdateMinimalActiveTxTimeCommand(UpdateMinimumActiveTxBeginTimeCommand 
cmd, long commandIndex, long commandTerm) {
+        // Skips the write command because the storage has already executed it.
+        if (commandIndex <= storage.lastAppliedIndex()) {
+            return;
+        }
+
+        long minActiveTxBeginTime0 = minActiveTxBeginTime;
+
+        assert minActiveTxBeginTime0 <= cmd.timestamp() : "maxTime=" + 
minActiveTxBeginTime0 + ", cmdTime=" + cmd.timestamp();
+
+        minActiveTxBeginTime = cmd.timestamp();
+    }
+
     private static void onTxStateStorageCasFail(UUID txId, TxMeta 
txMetaBeforeCas, TxMeta txMetaToSet) {
         String errorMsg = format("Failed to update tx state in the storage, 
transaction txId = {} because of inconsistent state,"
                         + " expected state = {}, state to set = {}",
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ee0b0b0e01..ccc49bf2f6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -127,6 +127,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadW
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSwapRowReplicaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ScanCloseReplicaRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
@@ -813,6 +814,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return 
processTxStateCommitPartitionRequest((TxStateCommitPartitionRequest) request);
         } else if (request instanceof VacuumTxStateReplicaRequest) {
             return 
processVacuumTxStateReplicaRequest((VacuumTxStateReplicaRequest) request);
+        } else if (request instanceof 
UpdateMinimumActiveTxBeginTimeReplicaRequest) {
+            return 
processMinimumActiveTxTimeReplicaRequest((UpdateMinimumActiveTxBeginTimeReplicaRequest)
 request);
         } else {
             throw new UnsupportedReplicaRequestException(request.getClass());
         }
@@ -4136,6 +4139,24 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         return raftClient.run(cmd);
     }
 
+    private CompletableFuture<?> 
processMinimumActiveTxTimeReplicaRequest(UpdateMinimumActiveTxBeginTimeReplicaRequest
 request) {
+        Command cmd = 
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
+                .timestamp(request.timestamp())
+                .safeTime(clockService.now())
+                .build();
+
+        CompletableFuture<Object> resultFuture = new CompletableFuture<>();
+
+        // The timestamp must increase monotonically, otherwise it will have 
to be
+        // stored on disk so that reordering does not occur after the node is 
restarted.
+        applyCmdWithRetryOnSafeTimeReorderException(
+                cmd,
+                resultFuture
+        );
+
+        return resultFuture;
+    }
+
     /**
      * Operation unique identifier.
      */
diff --git 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeRequest.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveLocalTxMinimumBeginTimeProvider.java
similarity index 61%
rename from 
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeRequest.java
rename to 
modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveLocalTxMinimumBeginTimeProvider.java
index d1156a3736..90f3f537dd 100644
--- 
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogMinimumRequiredTimeRequest.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveLocalTxMinimumBeginTimeProvider.java
@@ -15,16 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.catalog.compaction.message;
+package org.apache.ignite.internal.tx;
 
-import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 
 /**
- * Request to obtain the minimum timestamp to which, from the local
- * node's perspective, the catalog history can be safely truncated.
+ * Provides the minimum begin time among all active RW transactions started 
locally.
  */
-@Transferable(CatalogCompactionMessageGroup.MINIMUM_REQUIRED_TIME_REQUEST)
-public interface CatalogMinimumRequiredTimeRequest extends NetworkMessage {
-
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ActiveLocalTxMinimumBeginTimeProvider {
+    /**
+     * Returns the minimum begin time among all active RW transactions started 
locally,
+     * or the current time if there are no active RW transactions.
+     */
+    HybridTimestamp minimumBeginTime();
 }

Reply via email to