This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f5bda1cb8 IGNITE-19209 Implement installing table schema updates 
(#2228)
7f5bda1cb8 is described below

commit 7f5bda1cb88e5ed854343a8534d917f3207318cf
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Jun 23 16:38:12 2023 +0400

    IGNITE-19209 Implement installing table schema updates (#2228)
---
 modules/api/build.gradle                           |   1 +
 .../testframework/IntegrationTestBase.java         |   2 +-
 .../testframework/TestIgnitionManager.java         |  39 ++++
 .../internal/catalog/CatalogServiceImpl.java       |  97 +++++----
 .../ignite/internal/catalog/ClockWaiter.java       | 171 ++++++++++++++++
 .../SchemaSynchronizationConfigurationSchema.java  |   3 +-
 .../ignite/internal/catalog/storage/UpdateLog.java |   4 +-
 .../internal/catalog/storage/UpdateLogImpl.java    |  31 ++-
 .../internal/catalog/storage/VersionedUpdate.java  |  15 +-
 .../internal/catalog/CatalogServiceSelfTest.java   | 216 ++++++++++++---------
 .../ignite/internal/catalog/ClockWaiterTest.java   |  97 +++++++++
 .../catalog/storage/UpdateLogImplTest.java         |  24 +--
 .../internal/cli/commands/ItConfigCommandTest.java |   2 +-
 .../internal/rest/ItGeneratedRestClientTest.java   |   2 +-
 .../{HybridClock.java => ClockUpdateListener.java} |  27 +--
 .../apache/ignite/internal/hlc/HybridClock.java    |  14 ++
 .../ignite/internal/hlc/HybridClockImpl.java       |  23 ++-
 .../ignite/internal/hlc/HybridTimestamp.java       |   8 +
 .../apache/ignite/internal/HybridClockTest.java    |  60 ++++++
 .../apache/ignite/internal/TestHybridClock.java    |  24 ++-
 .../apache/ignite/jdbc/AbstractJdbcSelfTest.java   |   2 +-
 .../internal/metastorage/MetaStorageManager.java   |  20 ++
 .../ignite/internal/metastorage/WatchEvent.java    |  22 ++-
 .../metastorage/impl/MetaStorageManagerImpl.java   |  26 +++
 .../metastorage/server/KeyValueStorage.java        |   8 +
 .../metastorage/server/WatchProcessor.java         |   8 +-
 .../server/persistence/RocksDbKeyValueStorage.java |   3 +-
 .../server/SimpleInMemoryKeyValueStorage.java      |   8 +
 modules/rest-api/openapi/openapi.yaml              |   2 +-
 .../java/org/apache/ignite/internal/Cluster.java   |   4 +-
 .../benchmark/AbstractOneNodeBenchmark.java        |   2 +-
 .../cluster/management/ItClusterInitTest.java      |   4 +-
 .../component/ItRestAddressReportTest.java         |   3 +-
 .../storage/ItRebalanceDistributedTest.java        |  19 +-
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     |   2 +-
 .../runner/app/AbstractSchemaChangeTest.java       |   2 +-
 .../internal/runner/app/ItDataSchemaSyncTest.java  |   2 +-
 .../app/ItIgniteInMemoryNodeRestartTest.java       |   2 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |  13 +-
 .../internal/runner/app/ItTableCreationTest.java   |   2 +-
 .../internal/runner/app/ItTablesApiTest.java       |   2 +-
 .../runner/app/PlatformTestNodeRunner.java         |   2 +-
 .../app/client/ItAbstractThinClientTest.java       |   2 +-
 .../sql/engine/ClusterPerClassIntegrationTest.java |   2 +-
 .../ignite/internal/sqllogic/ItSqlLogicTest.java   |   2 +-
 .../org/apache/ignite/internal/ssl/ItSslTest.java  |   4 +-
 .../ignite/internal/table/ItRoReadsTest.java       |   2 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  17 +-
 .../DistributedConfigurationCatchUpTest.java       |   3 +-
 49 files changed, 823 insertions(+), 227 deletions(-)

diff --git a/modules/api/build.gradle b/modules/api/build.gradle
index 0155eb0ba7..c53932e48f 100644
--- a/modules/api/build.gradle
+++ b/modules/api/build.gradle
@@ -34,6 +34,7 @@ dependencies {
     testFixturesAnnotationProcessor libs.micronaut.inject.annotation.processor
     testFixturesImplementation project(":ignite-core")
     testFixturesImplementation testFixtures(project(":ignite-core"))
+    testFixturesImplementation libs.typesafe.config
     testFixturesImplementation libs.hamcrest.core
     testFixturesImplementation libs.micronaut.junit5
     testFixturesImplementation libs.jetbrains.annotations
diff --git 
a/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/IntegrationTestBase.java
 
b/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/IntegrationTestBase.java
index 860e07b637..0aa46d654e 100644
--- 
a/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/IntegrationTestBase.java
+++ 
b/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/IntegrationTestBase.java
@@ -129,7 +129,7 @@ public class IntegrationTestBase extends 
BaseIgniteAbstractTest {
 
         configureInitParameters(builder);
 
-        IgnitionManager.init(builder.build());
+        TestIgnitionManager.init(builder.build());
 
         awaitClusterInitialized();
     }
diff --git 
a/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java
 
b/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java
index 06244f15b9..dc4d66f463 100644
--- 
a/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java
+++ 
b/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.testframework;
 
+import com.typesafe.config.parser.ConfigDocument;
+import com.typesafe.config.parser.ConfigDocumentFactory;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -24,6 +26,8 @@ import java.nio.file.StandardOpenOption;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.InitParametersBuilder;
 import org.apache.ignite.lang.IgniteException;
 import org.jetbrains.annotations.Nullable;
 
@@ -77,4 +81,39 @@ public class TestIgnitionManager {
             throw new IgniteException("Couldn't write node config.", e);
         }
     }
+
+    /**
+     * Initializes a cluster using test defaults for cluster configuration 
values that are not
+     * specified explicitly.
+     *
+     * @param parameters Init parameters.
+     * @see IgnitionManager#init(InitParameters)
+     */
+    public static void init(InitParameters parameters) {
+        IgnitionManager.init(applyTestDefaultsToClusterConfig(parameters));
+    }
+
+    private static InitParameters 
applyTestDefaultsToClusterConfig(InitParameters params) {
+        InitParametersBuilder builder = new InitParametersBuilder()
+                .clusterName(params.clusterName())
+                .destinationNodeName(params.nodeName())
+                .metaStorageNodeNames(params.metaStorageNodeNames())
+                .cmgNodeNames(params.cmgNodeNames());
+
+        if (params.clusterConfiguration() == null) {
+            builder.clusterConfiguration("{ schemaSync.delayDuration: 0 }");
+        } else {
+            ConfigDocument configDocument = 
ConfigDocumentFactory.parseString(params.clusterConfiguration());
+
+            String delayDurationPath = "schemaSync.delayDuration";
+
+            if (!configDocument.hasPath(delayDurationPath)) {
+                ConfigDocument updatedDocument = 
configDocument.withValueText(delayDurationPath, "0");
+
+                builder.clusterConfiguration(updatedDocument.render());
+            }
+        }
+
+        return builder.build();
+    }
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index 66ea991585..ace40aa938 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -33,6 +33,7 @@ import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.LongSupplier;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.catalog.commands.AlterColumnParams;
 import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnParams;
@@ -72,7 +73,7 @@ import org.apache.ignite.internal.catalog.storage.UpdateEntry;
 import org.apache.ignite.internal.catalog.storage.UpdateLog;
 import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
 import org.apache.ignite.internal.catalog.storage.VersionedUpdate;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.Producer;
@@ -118,24 +119,31 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     private final PendingComparableValuesTracker<Integer, Void> versionTracker 
= new PendingComparableValuesTracker<>(0);
 
-    private final HybridClock clock;
+    private final ClockWaiter clockWaiter;
 
-    private final long delayDurationMs;
+    private final LongSupplier delayDurationMsSupplier;
 
     /**
      * Constructor.
      */
-    public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock) {
-        this(updateLog, clock, DEFAULT_DELAY_DURATION);
+    public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter) {
+        this(updateLog, clockWaiter, DEFAULT_DELAY_DURATION);
     }
 
     /**
      * Constructor.
      */
-    public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock, long 
delayDurationMs) {
+    CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long 
delayDurationMs) {
+        this(updateLog, clockWaiter, () -> delayDurationMs);
+    }
+
+    /**
+     * Constructor.
+     */
+    public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter, 
LongSupplier delayDurationMsSupplier) {
         this.updateLog = updateLog;
-        this.clock = clock;
-        this.delayDurationMs = delayDurationMs;
+        this.clockWaiter = clockWaiter;
+        this.delayDurationMsSupplier = delayDurationMsSupplier;
     }
 
     @Override
@@ -256,7 +264,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     @Override
     public CompletableFuture<Void> createTable(CreateTableParams params) {
-        return saveUpdate(catalog -> {
+        return saveUpdateAndWaitForActivation(catalog -> {
             CatalogSchemaDescriptor schema = getSchema(catalog, 
params.schemaName());
 
             if (schema.table(params.tableName()) != null) {
@@ -278,7 +286,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     @Override
     public CompletableFuture<Void> dropTable(DropTableParams params) {
-        return saveUpdate(catalog -> {
+        return saveUpdateAndWaitForActivation(catalog -> {
             CatalogSchemaDescriptor schema = getSchema(catalog, 
params.schemaName());
 
             CatalogTableDescriptor table = getTable(schema, 
params.tableName());
@@ -301,7 +309,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
             return completedFuture(null);
         }
 
-        return saveUpdate(catalog -> {
+        return saveUpdateAndWaitForActivation(catalog -> {
             CatalogSchemaDescriptor schema = getSchema(catalog, 
params.schemaName());
 
             CatalogTableDescriptor table = getTable(schema, 
params.tableName());
@@ -328,7 +336,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
             return completedFuture(null);
         }
 
-        return saveUpdate(catalog -> {
+        return saveUpdateAndWaitForActivation(catalog -> {
             CatalogSchemaDescriptor schema = getSchema(catalog, 
params.schemaName());
 
             CatalogTableDescriptor table = getTable(schema, 
params.tableName());
@@ -343,7 +351,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     @Override
     public CompletableFuture<Void> alterColumn(AlterColumnParams params) {
-        return saveUpdate(catalog -> {
+        return saveUpdateAndWaitForActivation(catalog -> {
             CatalogSchemaDescriptor schema = getSchema(catalog, 
params.schemaName());
 
             CatalogTableDescriptor table = getTable(schema, 
params.tableName());
@@ -369,7 +377,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     @Override
     public CompletableFuture<Void> createIndex(CreateHashIndexParams params) {
-        return saveUpdate(catalog -> {
+        return saveUpdateAndWaitForActivation(catalog -> {
             CatalogSchemaDescriptor schema = getSchema(catalog, 
params.schemaName());
 
             if (schema.index(params.indexName()) != null) {
@@ -391,7 +399,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     @Override
     public CompletableFuture<Void> createIndex(CreateSortedIndexParams params) 
{
-        return saveUpdate(catalog -> {
+        return saveUpdateAndWaitForActivation(catalog -> {
             CatalogSchemaDescriptor schema = getSchema(catalog, 
params.schemaName());
 
             if (schema.index(params.indexName()) != null) {
@@ -413,7 +421,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     @Override
     public CompletableFuture<Void> dropIndex(DropIndexParams params) {
-        return saveUpdate(catalog -> {
+        return saveUpdateAndWaitForActivation(catalog -> {
             CatalogSchemaDescriptor schema = getSchema(catalog, 
params.schemaName());
 
             CatalogIndexDescriptor index = schema.index(params.indexName());
@@ -430,7 +438,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     @Override
     public CompletableFuture<Void> createDistributionZone(CreateZoneParams 
params) {
-        return saveUpdate(catalog -> {
+        return saveUpdateAndWaitForActivation(catalog -> {
             validateCreateZoneParams(params);
 
             String zoneName = Objects.requireNonNull(params.zoneName(), 
"zone");
@@ -450,7 +458,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     @Override
     public CompletableFuture<Void> dropDistributionZone(DropZoneParams params) 
{
-        return saveUpdate(catalog -> {
+        return saveUpdateAndWaitForActivation(catalog -> {
             CatalogZoneDescriptor zone = getZone(catalog, params.zoneName());
 
             if (zone.name().equals(DEFAULT_ZONE_NAME)) {
@@ -476,7 +484,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     @Override
     public CompletableFuture<Void> renameDistributionZone(RenameZoneParams 
params) {
-        return saveUpdate(catalog -> {
+        return saveUpdateAndWaitForActivation(catalog -> {
             CatalogZoneDescriptor zone = getZone(catalog, params.zoneName());
 
             if (catalog.zone(params.newZoneName()) != null) {
@@ -507,7 +515,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     @Override
     public CompletableFuture<Void> alterDistributionZone(AlterZoneParams 
params) {
-        return saveUpdate(catalog -> {
+        return saveUpdateAndWaitForActivation(catalog -> {
             CatalogZoneDescriptor zone = getZone(catalog, params.zoneName());
 
             Integer dataNodesAutoAdjust = params.dataNodesAutoAdjust();
@@ -552,11 +560,29 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
         catalogByTs.put(newCatalog.time(), newCatalog);
     }
 
-    private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer) {
-        return saveUpdate(updateProducer, 0);
+    private CompletableFuture<Void> 
saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
+        return saveUpdate(updateProducer, 0)
+                .thenCompose(newVersion -> {
+                    Catalog catalog = catalogByVer.get(newVersion);
+
+                    HybridTimestamp activationTs = 
HybridTimestamp.hybridTimestamp(catalog.time());
+                    HybridTimestamp clusterWideEnsuredActivationTs = 
activationTs.addPhysicalTime(
+                            HybridTimestamp.maxClockSkew()
+                    );
+
+                    return clockWaiter.waitFor(clusterWideEnsuredActivationTs);
+                });
     }
 
-    private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer, 
int attemptNo) {
+    /**
+     * Attempts to save a versioned update using a CAS-like logic. If the 
attempt fails, makes more attempts
+     * until the max retry count is reached.
+     *
+     * @param updateProducer Supplies simple updates to include into a 
versioned update to install.
+     * @param attemptNo Ordinal number of an attempt.
+     * @return Future that completes with the new Catalog version (if update 
was saved successfully) or an exception, otherwise.
+     */
+    private CompletableFuture<Integer> saveUpdate(UpdateProducer 
updateProducer, int attemptNo) {
         if (attemptNo >= MAX_RETRY_COUNT) {
             return failedFuture(new 
IgniteInternalException(Common.INTERNAL_ERR, "Max retry limit exceeded: " + 
attemptNo));
         }
@@ -571,18 +597,16 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
         }
 
         if (updates.isEmpty()) {
-            return completedFuture(null);
+            return completedFuture(catalog.version());
         }
 
         int newVersion = catalog.version() + 1;
-        //TODO https://issues.apache.org/jira/browse/IGNITE-19209 Make 
activation time in the MS entry strictly equal to MS entry ts+DD
-        long activationTimestamp = activationTimestamp();
 
-        return updateLog.append(new VersionedUpdate(newVersion, 
activationTimestamp, updates))
+        return updateLog.append(new VersionedUpdate(newVersion, 
delayDurationMsSupplier.getAsLong(), updates))
                 .thenCompose(result -> 
versionTracker.waitFor(newVersion).thenApply(none -> result))
                 .thenCompose(result -> {
                     if (result) {
-                        return completedFuture(null);
+                        return completedFuture(newVersion);
                     }
 
                     return saveUpdate(updateProducer, attemptNo + 1);
@@ -591,7 +615,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     class OnUpdateHandlerImpl implements OnUpdateHandler {
         @Override
-        public void handle(VersionedUpdate update) {
+        public void handle(VersionedUpdate update, HybridTimestamp 
metaStorageUpdateTimestamp) {
             int version = update.version();
             Catalog catalog = catalogByVer.get(version - 1);
 
@@ -601,7 +625,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
                 catalog = entry.applyUpdate(catalog);
             }
 
-            catalog = applyUpdateFinal(catalog, update);
+            catalog = applyUpdateFinal(catalog, update, 
metaStorageUpdateTimestamp);
 
             registerCatalog(catalog);
 
@@ -630,13 +654,6 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
         }
     }
 
-    /**
-     * Calculate catalog activation timestamp.
-     */
-    private long activationTimestamp() {
-        return clock.now().addPhysicalTime(delayDurationMs).longValue();
-    }
-
     private static void throwUnsupportedDdl(String msg, Object... params) {
         throw new SqlException(UNSUPPORTED_DDL_OPERATION_ERR, msg, params);
     }
@@ -646,10 +663,12 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
         List<UpdateEntry> get(Catalog catalog);
     }
 
-    private static Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate 
update) {
+    private static Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate 
update, HybridTimestamp metaStorageUpdateTimestamp) {
+        long activationTimestamp = 
metaStorageUpdateTimestamp.addPhysicalTime(update.delayDurationMs()).longValue();
+
         return new Catalog(
                 update.version(),
-                update.activationTimestamp(),
+                activationTimestamp,
                 catalog.objectIdGenState(),
                 catalog.zones(),
                 catalog.schemas()
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
new file mode 100644
index 0000000000..8c5c65df22
--- /dev/null
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It 
only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+    private static final IgniteLogger LOG = 
Loggers.forClass(ClockWaiter.class);
+
+    private final String nodeName;
+    private final HybridClock clock;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+    private final PendingComparableValuesTracker<Long, Void> nowTracker = new 
PendingComparableValuesTracker<>(
+            HybridTimestamp.MIN_VALUE.longValue()
+    );
+
+    private final ClockUpdateListener updateListener = this::onUpdate;
+
+    private volatile ScheduledExecutorService scheduler;
+
+    public ClockWaiter(String nodeName, HybridClock clock) {
+        this.nodeName = nodeName;
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        clock.addUpdateListener(updateListener);
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        clock.removeUpdateListener(updateListener);
+
+        nowTracker.close();
+
+        // We do shutdownNow() right away without doing usual 
shutdown()+awaitTermination() because
+        // this would make us wait till all the scheduled tasks get executed 
(which might take a lot
+        // of time). An alternative would be to track all ScheduledFutures 
(which are not same as the
+        // user-facing futures we return from the tracker), but we don't need 
them for anything else,
+        // so it's simpler to just use shutdownNow().
+        scheduler.shutdownNow();
+        scheduler.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    private void onUpdate(long newTs) {
+        if (!busyLock.enterBusy()) {
+            return;
+        }
+
+        try {
+            nowTracker.update(newTs, null);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Wait for the clock to reach the given timestamp.
+     *
+     * @param targetTimestamp Timestamp to wait for.
+     * @return A future that completes when the timestamp is reached by the 
clock's time.
+     */
+    public CompletableFuture<Void> waitFor(HybridTimestamp targetTimestamp) {
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return doWaitFor(targetTimestamp);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Void> doWaitFor(HybridTimestamp targetTimestamp) 
{
+        CompletableFuture<Void> future = 
nowTracker.waitFor(targetTimestamp.longValue());
+
+        ScheduledFuture<?> scheduledFuture;
+
+        if (!future.isDone()) {
+            // This triggers a clock update.
+            HybridTimestamp now = clock.now();
+
+            if (targetTimestamp.compareTo(now) <= 0) {
+                assert future.isDone();
+
+                scheduledFuture = null;
+            } else {
+                // Adding 1 to account for a possible non-null logical part of 
the targetTimestamp.
+                long millisToWait = targetTimestamp.getPhysical() - 
now.getPhysical() + 1;
+
+                scheduledFuture = scheduler.schedule(this::triggerClockUpdate, 
millisToWait, TimeUnit.MILLISECONDS);
+            }
+        } else {
+            scheduledFuture = null;
+        }
+
+        return future.handle((res, ex) -> {
+            if (scheduledFuture != null) {
+                scheduledFuture.cancel(true);
+            }
+
+            if (ex != null) {
+                // Let's replace a TrackerClosedException with a 
CancellationException as the latter makes more sense for the clients.
+                if (ex instanceof TrackerClosedException) {
+                    throw new CancellationException();
+                } else {
+                    throw new CompletionException(ex);
+                }
+            }
+
+            return res;
+        });
+    }
+
+    private void triggerClockUpdate() {
+        clock.now();
+    }
+}
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/configuration/SchemaSynchronizationConfigurationSchema.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/configuration/SchemaSynchronizationConfigurationSchema.java
index f5be7596b6..0cbbda6444 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/configuration/SchemaSynchronizationConfigurationSchema.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/configuration/SchemaSynchronizationConfigurationSchema.java
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
 import org.apache.ignite.configuration.annotation.Value;
-import org.apache.ignite.configuration.validation.Immutable;
 import org.apache.ignite.configuration.validation.Range;
 
 /**
@@ -33,7 +32,7 @@ import org.apache.ignite.configuration.validation.Range;
 public class SchemaSynchronizationConfigurationSchema {
     /** Delay Duration (ms), see the spec for details. */
     @Value(hasDefault = true)
-    @Immutable
     @Range(min = 0)
+    // TODO: IGNITE-19792 - make @Immutable when it gets being handled 
property for distributed config.
     public long delayDuration = TimeUnit.SECONDS.toMillis(1);
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
index 8c5e1a886b..94124dae55 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.catalog.storage;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.lang.IgniteInternalException;
 
@@ -67,7 +68,8 @@ public interface UpdateLog extends IgniteComponent {
          * An actual handler that will be invoked when new update is appended 
to the log.
          *
          * @param update A new update.
+         * @param metaStorageUpdateTimestamp Timestamp assigned to the update 
by the Metastorage.
          */
-        void handle(VersionedUpdate update);
+        void handle(VersionedUpdate update, HybridTimestamp 
metaStorageUpdateTimestamp);
     }
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 3652a89fa6..5427173ef8 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -27,8 +27,10 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Statements.iif;
 import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
 
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
@@ -39,8 +41,6 @@ import 
org.apache.ignite.internal.metastorage.dsl.StatementResult;
 import org.apache.ignite.internal.metastorage.dsl.Update;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.vault.VaultEntry;
-import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.ErrorGroups.Common;
 import org.apache.ignite.lang.IgniteException;
@@ -56,7 +56,6 @@ public class UpdateLogImpl implements UpdateLog {
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
     private final MetaStorageManager metastore;
-    private final VaultManager vault;
 
     private volatile OnUpdateHandler onUpdateHandler;
     private volatile @Nullable UpdateListener listener = null;
@@ -65,13 +64,8 @@ public class UpdateLogImpl implements UpdateLog {
      * Creates the object.
      *
      * @param metastore A metastore is used to store and distribute updates 
across the cluster.
-     * @param vault A vault is used to recover state and replay updates on 
start.
      */
-    public UpdateLogImpl(
-            MetaStorageManager metastore,
-            VaultManager vault
-    ) {
-        this.vault = vault;
+    public UpdateLogImpl(MetaStorageManager metastore) {
         this.metastore = metastore;
     }
 
@@ -94,7 +88,7 @@ public class UpdateLogImpl implements UpdateLog {
 
             restoreStateFromVault(handler);
 
-            UpdateListener listener = new UpdateListener(handler);
+            UpdateListener listener = new UpdateListener(onUpdateHandler);
             this.listener = listener;
 
             metastore.registerPrefixWatch(CatalogKey.updatePrefix(), listener);
@@ -156,19 +150,22 @@ public class UpdateLogImpl implements UpdateLog {
     }
 
     private void restoreStateFromVault(OnUpdateHandler handler) {
+        long appliedRevision = metastore.appliedRevision();
+
         int ver = 1;
 
         // TODO: IGNITE-19790 Read range from metastore
         while (true) {
-            VaultEntry entry = vault.get(CatalogKey.update(ver++)).join();
+            ByteArray key = CatalogKey.update(ver++);
+            Entry entry = metastore.getLocally(key.bytes(), appliedRevision);
 
-            if (entry == null) {
+            if (entry.empty() || entry.tombstone()) {
                 break;
             }
 
-            VersionedUpdate update = fromBytes(entry.value());
+            VersionedUpdate update = 
fromBytes(Objects.requireNonNull(entry.value()));
 
-            handler.handle(update);
+            handler.handle(update, 
metastore.timestampByRevision(entry.revision()));
         }
     }
 
@@ -193,11 +190,10 @@ public class UpdateLogImpl implements UpdateLog {
     private static class UpdateListener implements WatchListener {
         private final OnUpdateHandler onUpdateHandler;
 
-        UpdateListener(OnUpdateHandler onUpdateHandler) {
+        private UpdateListener(OnUpdateHandler onUpdateHandler) {
             this.onUpdateHandler = onUpdateHandler;
         }
 
-        /** {@inheritDoc} */
         @Override
         public CompletableFuture<Void> onUpdate(WatchEvent event) {
             for (EntryEvent eventEntry : event.entryEvents()) {
@@ -210,13 +206,12 @@ public class UpdateLogImpl implements UpdateLog {
 
                 VersionedUpdate update = fromBytes(payload);
 
-                onUpdateHandler.handle(update);
+                onUpdateHandler.handle(update, event.timestamp());
             }
 
             return CompletableFuture.completedFuture(null);
         }
 
-        /** {@inheritDoc} */
         @Override
         public void onError(Throwable e) {
             assert false;
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
index 437534ccf9..8ad4da749b 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
@@ -31,7 +31,7 @@ public class VersionedUpdate implements Serializable {
 
     private final int version;
 
-    private final long activationTimestamp;
+    private final long delayDurationMs;
 
     @IgniteToStringInclude
     private final List<UpdateEntry> entries;
@@ -40,12 +40,13 @@ public class VersionedUpdate implements Serializable {
      * Constructs the object.
      *
      * @param version A version the changes relate to.
-     * @param activationTimestamp Timestamp given changes become active at.
+     * @param delayDurationMs Delay duration that, when added to the update's 
entry timestamp assigned by the MetaStorage, will produce the
+     *     activation timestamp (milliseconds).
      * @param entries A list of changes.
      */
-    public VersionedUpdate(int version, long activationTimestamp, 
List<UpdateEntry> entries) {
+    public VersionedUpdate(int version, long delayDurationMs, 
List<UpdateEntry> entries) {
         this.version = version;
-        this.activationTimestamp = activationTimestamp;
+        this.delayDurationMs = delayDurationMs;
         this.entries = List.copyOf(
                 Objects.requireNonNull(entries, "entries")
         );
@@ -56,9 +57,9 @@ public class VersionedUpdate implements Serializable {
         return version;
     }
 
-    /** Returns activation timestamp. */
-    public long activationTimestamp() {
-        return activationTimestamp;
+    /** Returns Delay Duration for this update (in milliseconds). */
+    public long delayDurationMs() {
+        return delayDurationMs;
     }
 
     /** Returns list of changes. */
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
index 2c72ace9b4..cb4b7ad325 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
@@ -26,6 +26,9 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -37,6 +40,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
@@ -48,6 +53,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.commands.AlterColumnParams;
@@ -90,6 +96,7 @@ import 
org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
 import org.apache.ignite.internal.catalog.storage.VersionedUpdate;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.manager.EventListener;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
@@ -117,7 +124,6 @@ import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.EnumSource.Mode;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatchers;
-import org.mockito.Mockito;
 
 /**
  * Catalog service self test.
@@ -135,23 +141,28 @@ public class CatalogServiceSelfTest {
 
     private VaultManager vault;
 
+    private UpdateLog updateLog;
+
     private CatalogServiceImpl service;
 
     private HybridClock clock;
 
+    private ClockWaiter clockWaiter;
+
     @BeforeEach
     void setUp() {
         vault = new VaultManager(new InMemoryVaultService());
 
-        metastore = StandaloneMetaStorageManager.create(
-                vault, new SimpleInMemoryKeyValueStorage("test")
-        );
+        metastore = StandaloneMetaStorageManager.create(vault, new 
SimpleInMemoryKeyValueStorage("test"));
 
         clock = new HybridClockImpl();
-        service = new CatalogServiceImpl(new UpdateLogImpl(metastore, vault), 
clock, 0L);
+        clockWaiter = spy(new ClockWaiter("test", clock));
+        updateLog = spy(new UpdateLogImpl(metastore));
+        service = new CatalogServiceImpl(updateLog, clockWaiter);
 
         vault.start();
         metastore.start();
+        clockWaiter.start();
         service.start();
 
         assertThat("Watches were not deployed", metastore.deployWatches(), 
willCompleteSuccessfully());
@@ -160,6 +171,7 @@ public class CatalogServiceSelfTest {
     @AfterEach
     public void tearDown() throws Exception {
         service.stop();
+        clockWaiter.stop();
         metastore.stop();
         vault.stop();
     }
@@ -211,7 +223,7 @@ public class CatalogServiceSelfTest {
 
         CompletableFuture<Void> fut = service.createTable(params);
 
-        assertThat(fut, willBe((Object) null));
+        assertThat(fut, willBe(nullValue()));
 
         // Validate catalog version from the past.
         CatalogSchemaDescriptor schema = service.schema(0);
@@ -245,7 +257,7 @@ public class CatalogServiceSelfTest {
         assertEquals(1L, table.zoneId());
 
         // Validate another table creation.
-        assertThat(service.createTable(simpleTable(TABLE_NAME_2)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME_2)), 
willBe(nullValue()));
 
         // Validate actual catalog has both tables.
         schema = service.schema(2);
@@ -290,14 +302,14 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testDropTable() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
-        assertThat(service.createTable(simpleTable(TABLE_NAME_2)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(service.createTable(simpleTable(TABLE_NAME_2)), 
willBe(nullValue()));
 
         long beforeDropTimestamp = clock.nowLong();
 
         DropTableParams dropTableParams = 
DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build();
 
-        assertThat(service.dropTable(dropTableParams), willBe((Object) null));
+        assertThat(service.dropTable(dropTableParams), willBe(nullValue()));
 
         // Validate catalog version from the past.
         CatalogSchemaDescriptor schema = service.schema(2);
@@ -337,7 +349,7 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testAddColumn() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         AlterTableAddColumnParams params = AlterTableAddColumnParams.builder()
                 .schemaName(SCHEMA_NAME)
@@ -353,7 +365,7 @@ public class CatalogServiceSelfTest {
 
         long beforeAddedTimestamp = clock.nowLong();
 
-        assertThat(service.addColumn(params), willBe((Object) null));
+        assertThat(service.addColumn(params), willBe(nullValue()));
 
         // Validate catalog version from the past.
         CatalogSchemaDescriptor schema = 
service.activeSchema(beforeAddedTimestamp);
@@ -384,7 +396,7 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testDropColumn() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         // Validate dropping column
         AlterTableDropColumnParams params = 
AlterTableDropColumnParams.builder()
@@ -395,7 +407,7 @@ public class CatalogServiceSelfTest {
 
         long beforeAddedTimestamp = clock.nowLong();
 
-        assertThat(service.dropColumn(params), willBe((Object) null));
+        assertThat(service.dropColumn(params), willBe(nullValue()));
 
         // Validate catalog version from the past.
         CatalogSchemaDescriptor schema = 
service.activeSchema(beforeAddedTimestamp);
@@ -437,8 +449,8 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testDropIndexedColumn() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
-        assertThat(service.createIndex(simpleIndex(INDEX_NAME, TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(service.createIndex(simpleIndex(INDEX_NAME, TABLE_NAME)), 
willBe(nullValue()));
 
         // Try to drop indexed column
         AlterTableDropColumnParams params = 
AlterTableDropColumnParams.builder()
@@ -470,7 +482,7 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testAddDropMultipleColumns() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         // Add duplicate column.
         AlterTableAddColumnParams addColumnParams = 
AlterTableAddColumnParams.builder()
@@ -499,7 +511,7 @@ public class CatalogServiceSelfTest {
                 ))
                 .build();
 
-        assertThat(service.addColumn(addColumnParams), willBe((Object) null));
+        assertThat(service.addColumn(addColumnParams), willBe(nullValue()));
 
         // Validate both columns added.
         schema = service.activeSchema(clock.nowLong());
@@ -514,7 +526,7 @@ public class CatalogServiceSelfTest {
                 .columns(Set.of(NEW_COLUMN_NAME, NEW_COLUMN_NAME_2))
                 .build();
 
-        assertThat(service.dropColumn(dropColumnParams), willBe((Object) 
null));
+        assertThat(service.dropColumn(dropColumnParams), willBe(nullValue()));
 
         // Validate both columns dropped.
         schema = service.activeSchema(clock.nowLong());
@@ -544,7 +556,7 @@ public class CatalogServiceSelfTest {
      */
     @Test
     public void testAlterColumnDefault() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         int schemaVer = 1;
         assertNotNull(service.schema(schemaVer));
@@ -552,27 +564,27 @@ public class CatalogServiceSelfTest {
 
         // NULL-> NULL : No-op.
         assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> 
DefaultValue.constant(null)),
-                willBe((Object) null));
+                willBe(nullValue()));
         assertNull(service.schema(schemaVer + 1));
 
         // NULL -> 1 : Ok.
         assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> 
DefaultValue.constant(1)),
-                willBe((Object) null));
+                willBe(nullValue()));
         assertNotNull(service.schema(++schemaVer));
 
         // 1 -> 1 : No-op.
         assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> 
DefaultValue.constant(1)),
-                willBe((Object) null));
+                willBe(nullValue()));
         assertNull(service.schema(schemaVer + 1));
 
         // 1 -> 2 : Ok.
         assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> 
DefaultValue.constant(2)),
-                willBe((Object) null));
+                willBe(nullValue()));
         assertNotNull(service.schema(++schemaVer));
 
         // 2 -> NULL : Ok.
         assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> 
DefaultValue.constant(null)),
-                willBe((Object) null));
+                willBe(nullValue()));
         assertNotNull(service.schema(++schemaVer));
     }
 
@@ -586,7 +598,7 @@ public class CatalogServiceSelfTest {
      */
     @Test
     public void testAlterColumnNotNull() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         int schemaVer = 1;
         assertNotNull(service.schema(schemaVer));
@@ -594,12 +606,12 @@ public class CatalogServiceSelfTest {
 
         // NULLABLE -> NULLABLE : No-op.
         // NOT NULL -> NOT NULL : No-op.
-        assertThat(changeColumn(TABLE_NAME, "VAL", null, false, null), 
willBe((Object) null));
-        assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, true, null), 
willBe((Object) null));
+        assertThat(changeColumn(TABLE_NAME, "VAL", null, false, null), 
willBe(nullValue()));
+        assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, true, null), 
willBe(nullValue()));
         assertNull(service.schema(schemaVer + 1));
 
         // NOT NULL -> NULlABLE : Ok.
-        assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, false, 
null), willBe((Object) null));
+        assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, false, 
null), willBe(nullValue()));
         assertNotNull(service.schema(++schemaVer));
 
         // DROP NOT NULL for PK : Error.
@@ -629,7 +641,7 @@ public class CatalogServiceSelfTest {
         ColumnParams pkCol = 
ColumnParams.builder().name("ID").type(ColumnType.INT32).build();
         ColumnParams col = ColumnParams.builder().name("COL_" + 
type).type(type).build();
 
-        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col))), willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col))), willBe(nullValue()));
 
         int schemaVer = 1;
         assertNotNull(service.schema(schemaVer));
@@ -637,20 +649,20 @@ public class CatalogServiceSelfTest {
 
         // ANY-> UNDEFINED PRECISION : No-op.
         assertThat(changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type()), null, null),
-                willBe((Object) null));
+                willBe(nullValue()));
         assertNull(service.schema(schemaVer + 1));
 
         // UNDEFINED PRECISION -> 10 : Ok.
         assertThat(
                 changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), 10, null, null), null, null),
-                willBe((Object) null)
+                willBe(nullValue())
         );
         assertNotNull(service.schema(++schemaVer));
 
         // 10 -> 11 : Ok.
         assertThat(
                 changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), 11, null, null), null, null),
-                willBe((Object) null)
+                willBe(nullValue())
         );
 
         CatalogSchemaDescriptor schema = service.schema(++schemaVer);
@@ -676,7 +688,7 @@ public class CatalogServiceSelfTest {
         ColumnParams col = 
ColumnParams.builder().name("COL").type(type).build();
         ColumnParams colWithPrecision = 
ColumnParams.builder().name("COL_PRECISION").type(type).precision(10).build();
 
-        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col, colWithPrecision))), willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col, colWithPrecision))), willBe(nullValue()));
 
         int schemaVer = 1;
         assertNotNull(service.schema(schemaVer));
@@ -686,7 +698,7 @@ public class CatalogServiceSelfTest {
                 willThrowFast(SqlException.class, "Cannot change precision for 
column '" + col.name() + "'"));
 
         assertThat(changeColumn(TABLE_NAME, colWithPrecision.name(), new 
TestColumnTypeParams(type, 10, null, null), null, null),
-                willBe((Object) null));
+                willBe(nullValue()));
 
         assertThat(changeColumn(TABLE_NAME, colWithPrecision.name(), new 
TestColumnTypeParams(type, 9, null, null), null, null),
                 willThrowFast(SqlException.class, "Cannot change precision for 
column '" + colWithPrecision.name() + "'"));
@@ -711,7 +723,7 @@ public class CatalogServiceSelfTest {
         ColumnParams pkCol = 
ColumnParams.builder().name("ID").type(ColumnType.INT32).build();
         ColumnParams col = ColumnParams.builder().name("COL_" + 
type).type(type).build();
 
-        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col))), willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col))), willBe(nullValue()));
 
         int schemaVer = 1;
         assertNotNull(service.schema(schemaVer));
@@ -719,20 +731,20 @@ public class CatalogServiceSelfTest {
 
         // ANY-> UNDEFINED LENGTH : No-op.
         assertThat(changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type()), null, null),
-                willBe((Object) null));
+                willBe(nullValue()));
         assertNull(service.schema(schemaVer + 1));
 
         // UNDEFINED LENGTH -> 10 : Ok.
         assertThat(
                 changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), null, 10, null), null, null),
-                willBe((Object) null)
+                willBe(nullValue())
         );
         assertNotNull(service.schema(++schemaVer));
 
         // 10 -> 11 : Ok.
         assertThat(
                 changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), null, 11, null), null, null),
-                willBe((Object) null)
+                willBe(nullValue())
         );
 
         CatalogSchemaDescriptor schema = service.schema(++schemaVer);
@@ -758,7 +770,7 @@ public class CatalogServiceSelfTest {
         ColumnParams col = 
ColumnParams.builder().name("COL").type(type).build();
         ColumnParams colWithLength = 
ColumnParams.builder().name("COL_PRECISION").type(type).length(10).build();
 
-        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col, colWithLength))), willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col, colWithLength))), willBe(nullValue()));
 
         int schemaVer = 1;
         assertNotNull(service.schema(schemaVer));
@@ -768,7 +780,7 @@ public class CatalogServiceSelfTest {
                 willThrowFast(SqlException.class, "Cannot change length for 
column '" + col.name() + "'"));
 
         assertThat(changeColumn(TABLE_NAME, colWithLength.name(), new 
TestColumnTypeParams(type, null, 10, null), null, null),
-                willBe((Object) null));
+                willBe(nullValue()));
 
         assertThat(changeColumn(TABLE_NAME, colWithLength.name(), new 
TestColumnTypeParams(type, null, 9, null), null, null),
                 willThrowFast(SqlException.class, "Cannot change length for 
column '" + colWithLength.name() + "'"));
@@ -784,7 +796,7 @@ public class CatalogServiceSelfTest {
     public void testAlterColumnTypeScaleIsRejected(ColumnType type) {
         ColumnParams pkCol = 
ColumnParams.builder().name("ID").type(ColumnType.INT32).build();
         ColumnParams col = ColumnParams.builder().name("COL_" + 
type).type(type).scale(3).build();
-        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col))), willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col))), willBe(nullValue()));
 
         int schemaVer = 1;
         assertNotNull(service.schema(schemaVer));
@@ -792,12 +804,12 @@ public class CatalogServiceSelfTest {
 
         // ANY-> UNDEFINED SCALE : No-op.
         assertThat(changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type()), null, null),
-                willBe((Object) null));
+                willBe(nullValue()));
         assertNull(service.schema(schemaVer + 1));
 
         // 3 -> 3 : No-op.
         assertThat(changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), null, null, 3), null, null),
-                willBe((Object) null));
+                willBe(nullValue()));
         assertNull(service.schema(schemaVer + 1));
 
         // 3 -> 4 : Error.
@@ -836,7 +848,7 @@ public class CatalogServiceSelfTest {
 
         CreateTableParams createTableParams = simpleTable(TABLE_NAME, 
tableColumns);
 
-        assertThat(service.createTable(createTableParams), willBe((Object) 
null));
+        assertThat(service.createTable(createTableParams), 
willBe(nullValue()));
 
         int schemaVer = 1;
         assertNotNull(service.schema(schemaVer));
@@ -847,7 +859,7 @@ public class CatalogServiceSelfTest {
             boolean sameType = col.type() == target;
 
             if (sameType || 
CatalogUtils.isSupportedColumnTypeChange(col.type(), target)) {
-                matcher = willBe((Object) null);
+                matcher = willBe(nullValue());
                 schemaVer += sameType ? 0 : 1;
             } else {
                 matcher = willThrowFast(SqlException.class,
@@ -864,7 +876,7 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testAlterColumnTypeRejectedForPrimaryKey() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         assertThat(changeColumn(TABLE_NAME, "ID", new 
TestColumnTypeParams(ColumnType.INT64), null, null),
                 willThrowFast(SqlException.class, "Cannot change data type for 
primary key column 'ID'."));
@@ -876,7 +888,7 @@ public class CatalogServiceSelfTest {
      */
     @Test
     public void testAlterColumnMultipleChanges() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         int schemaVer = 1;
         assertNotNull(service.schema(schemaVer));
@@ -887,7 +899,7 @@ public class CatalogServiceSelfTest {
         TestColumnTypeParams typeParams = new 
TestColumnTypeParams(ColumnType.INT64);
 
         // Ensures that 3 different actions applied.
-        assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams, 
notNull, dflt), willBe((Object) null));
+        assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams, 
notNull, dflt), willBe(nullValue()));
 
         CatalogSchemaDescriptor schema = service.schema(++schemaVer);
         assertNotNull(schema);
@@ -899,14 +911,14 @@ public class CatalogServiceSelfTest {
 
         // Ensures that only one of three actions applied.
         dflt = () -> DefaultValue.constant(2);
-        assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams, 
notNull, dflt), willBe((Object) null));
+        assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams, 
notNull, dflt), willBe(nullValue()));
 
         schema = service.schema(++schemaVer);
         assertNotNull(schema);
         assertEquals(DefaultValue.constant(2), 
schema.table(TABLE_NAME).column("VAL_NOT_NULL").defaultValue());
 
         // Ensures that no action will be applied.
-        assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams, 
notNull, dflt), willBe((Object) null));
+        assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams, 
notNull, dflt), willBe(nullValue()));
         assertNull(service.schema(schemaVer + 1));
     }
 
@@ -929,14 +941,14 @@ public class CatalogServiceSelfTest {
                 .columns(List.of("VAL"))
                 .build();
 
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
-        assertThat(service.createIndex(params), willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(service.createIndex(params), willBe(nullValue()));
 
         long beforeDropTimestamp = clock.nowLong();
 
         DropTableParams dropTableParams = 
DropTableParams.builder().schemaName("PUBLIC").tableName(TABLE_NAME).build();
 
-        assertThat(service.dropTable(dropTableParams), willBe((Object) null));
+        assertThat(service.dropTable(dropTableParams), willBe(nullValue()));
 
         // Validate catalog version from the past.
         CatalogSchemaDescriptor schema = service.schema(2);
@@ -971,7 +983,7 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testCreateHashIndex() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         CreateHashIndexParams params = CreateHashIndexParams.builder()
                 .indexName(INDEX_NAME)
@@ -979,7 +991,7 @@ public class CatalogServiceSelfTest {
                 .columns(List.of("VAL", "ID"))
                 .build();
 
-        assertThat(service.createIndex(params), willBe((Object) null));
+        assertThat(service.createIndex(params), willBe(nullValue()));
 
         // Validate catalog version from the past.
         CatalogSchemaDescriptor schema = service.schema(1);
@@ -1010,7 +1022,7 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testCreateSortedIndex() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         CreateSortedIndexParams params = CreateSortedIndexParams.builder()
                 .indexName(INDEX_NAME)
@@ -1020,7 +1032,7 @@ public class CatalogServiceSelfTest {
                 .collations(List.of(CatalogColumnCollation.DESC_NULLS_FIRST, 
CatalogColumnCollation.ASC_NULLS_LAST))
                 .build();
 
-        assertThat(service.createIndex(params), willBe((Object) null));
+        assertThat(service.createIndex(params), willBe(nullValue()));
 
         // Validate catalog version from the past.
         CatalogSchemaDescriptor schema = service.schema(1);
@@ -1054,7 +1066,7 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testCreateIndexWithSameName() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         CreateHashIndexParams params = CreateHashIndexParams.builder()
                 .indexName(INDEX_NAME)
@@ -1062,13 +1074,13 @@ public class CatalogServiceSelfTest {
                 .columns(List.of("VAL"))
                 .build();
 
-        assertThat(service.createIndex(params), willBe((Object) null));
+        assertThat(service.createIndex(params), willBe(nullValue()));
         assertThat(service.createIndex(params), 
willThrow(IndexAlreadyExistsException.class));
     }
 
     @Test
     public void testCreateIndexOnDuplicateColumns() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         CreateHashIndexParams params = CreateHashIndexParams.builder()
                 .indexName(INDEX_NAME)
@@ -1088,7 +1100,7 @@ public class CatalogServiceSelfTest {
 
         
doNothing().when(updateLogMock).registerUpdateHandler(updateHandlerCapture.capture());
 
-        CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, 
clock);
+        CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, 
clockWaiter);
         service.start();
 
         when(updateLogMock.append(any())).thenAnswer(invocation -> {
@@ -1099,11 +1111,11 @@ public class CatalogServiceSelfTest {
 
             VersionedUpdate update = new VersionedUpdate(
                     updateFromInvocation.version(),
-                    updateFromInvocation.activationTimestamp(),
+                    updateFromInvocation.delayDurationMs(),
                     List.of(new ObjectIdGenUpdateEntry(1))
             );
 
-            updateHandlerCapture.getValue().handle(update);
+            updateHandlerCapture.getValue().handle(update, clock.now());
 
             return completedFuture(false);
         });
@@ -1118,20 +1130,12 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void catalogActivationTime() throws Exception {
-        final int delayDuration = 3_000;
+        final long delayDuration = TimeUnit.DAYS.toMillis(365);
 
-        InMemoryVaultService vaultService = new InMemoryVaultService();
-        VaultManager vault = new VaultManager(vaultService);
-        StandaloneMetaStorageManager metaStorageManager = 
StandaloneMetaStorageManager.create(vault);
-        UpdateLog updateLogMock = Mockito.spy(new 
UpdateLogImpl(metaStorageManager, vault));
-        CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, 
clock, delayDuration);
+        CatalogServiceImpl service = new CatalogServiceImpl(updateLog, 
clockWaiter, delayDuration);
 
-        vault.start();
-        metaStorageManager.start();
         service.start();
 
-        assertThat("Watches were not deployed", 
metaStorageManager.deployWatches(), willCompleteSuccessfully());
-
         try {
             CreateTableParams params = CreateTableParams.builder()
                     .schemaName(SCHEMA_NAME)
@@ -1143,11 +1147,13 @@ public class CatalogServiceSelfTest {
                     .primaryKeyColumns(List.of("key"))
                     .build();
 
-            CompletableFuture<Void> fut = service.createTable(params);
+            service.createTable(params);
+
+            verify(updateLog).append(any());
+            // TODO IGNITE-19400: recheck createTable future completion 
guarantees
 
-            verify(updateLogMock).append(any());
-            // TODO IGNITE-19400: recheck future completion guarantees
-            assertThat(fut, willBe((Object) null));
+            // This waits till the new Catalog version lands in the internal 
structures.
+            verify(clockWaiter, timeout(10_000)).waitFor(any());
 
             assertSame(service.schema(0), 
service.activeSchema(clock.nowLong()));
             assertNull(service.table(TABLE_NAME, clock.nowLong()));
@@ -1158,8 +1164,6 @@ public class CatalogServiceSelfTest {
             assertNotNull(service.table(TABLE_NAME, clock.nowLong()));
         } finally {
             service.stop();
-            metaStorageManager.stop();
-            vault.stop();
         }
     }
 
@@ -1167,7 +1171,7 @@ public class CatalogServiceSelfTest {
     public void catalogServiceManagesUpdateLogLifecycle() throws Exception {
         UpdateLog updateLogMock = mock(UpdateLog.class);
 
-        CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, 
mock(HybridClock.class));
+        CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, 
clockWaiter);
 
         service.start();
 
@@ -1201,10 +1205,10 @@ public class CatalogServiceSelfTest {
         service.listen(CatalogEvent.TABLE_CREATE, eventListener);
         service.listen(CatalogEvent.TABLE_DROP, eventListener);
 
-        assertThat(service.createTable(createTableParams), willBe((Object) 
null));
+        assertThat(service.createTable(createTableParams), 
willBe(nullValue()));
         verify(eventListener).notify(any(CreateTableEventParameters.class), 
ArgumentMatchers.isNull());
 
-        assertThat(service.dropTable(dropTableparams), willBe((Object) null));
+        assertThat(service.dropTable(dropTableparams), willBe(nullValue()));
         verify(eventListener).notify(any(DropTableEventParameters.class), 
ArgumentMatchers.isNull());
 
         verifyNoMoreInteractions(eventListener);
@@ -1558,14 +1562,14 @@ public class CatalogServiceSelfTest {
         verifyNoInteractions(eventListener);
 
         // Create table.
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         // Add column.
-        assertThat(service.addColumn(addColumnParams), willBe((Object) null));
+        assertThat(service.addColumn(addColumnParams), willBe(nullValue()));
         verify(eventListener).notify(any(AddColumnEventParameters.class), 
ArgumentMatchers.isNull());
 
         // Drop column.
-        assertThat(service.dropColumn(dropColumnParams), willBe((Object) 
null));
+        assertThat(service.dropColumn(dropColumnParams), willBe(nullValue()));
         verify(eventListener).notify(any(DropColumnEventParameters.class), 
ArgumentMatchers.isNull());
 
         // Try drop column once again.
@@ -1574,6 +1578,44 @@ public class CatalogServiceSelfTest {
         verifyNoMoreInteractions(eventListener);
     }
 
+    @Test
+    public void userFutureCompletesAfterClusterWideActivationHappens() throws 
Exception {
+        final long delayDuration = TimeUnit.DAYS.toMillis(365);
+
+        HybridTimestamp startTs = clock.now();
+
+        CatalogServiceImpl service = new CatalogServiceImpl(updateLog, 
clockWaiter, delayDuration);
+
+        service.start();
+
+        try {
+            CreateTableParams params = CreateTableParams.builder()
+                    .schemaName(SCHEMA_NAME)
+                    .tableName(TABLE_NAME)
+                    .columns(List.of(
+                            
ColumnParams.builder().name("key").type(ColumnType.INT32).build(),
+                            
ColumnParams.builder().name("val").type(ColumnType.INT32).nullable(true).build()
+                    ))
+                    .primaryKeyColumns(List.of("key"))
+                    .build();
+
+            CompletableFuture<Void> future = service.createTable(params);
+
+            assertThat(future.isDone(), is(false));
+
+            ArgumentCaptor<HybridTimestamp> tsCaptor = 
ArgumentCaptor.forClass(HybridTimestamp.class);
+
+            verify(clockWaiter, timeout(10_000)).waitFor(tsCaptor.capture());
+            HybridTimestamp userWaitTs = tsCaptor.getValue();
+            assertThat(
+                    userWaitTs.getPhysical() - startTs.getPhysical(),
+                    greaterThanOrEqualTo(delayDuration + 
HybridTimestamp.maxClockSkew())
+            );
+        } finally {
+            service.stop();
+        }
+    }
+
     @Test
     void testGetCatalogEntityInCatalogEvent() {
         CompletableFuture<Void> result = new CompletableFuture<>();
@@ -1592,7 +1634,7 @@ public class CatalogServiceSelfTest {
             }
         });
 
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
         assertThat(result, willCompleteSuccessfully());
     }
 
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
new file mode 100644
index 0000000000..e970e4027c
--- /dev/null
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ClockWaiterTest {
+    private ClockWaiter waiter;
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    @BeforeEach
+    void createWaiter() {
+        waiter = new ClockWaiter("test", clock);
+
+        waiter.start();
+    }
+
+    @AfterEach
+    void cleanup() throws Exception {
+        if (waiter != null) {
+            waiter.stop();
+        }
+    }
+
+    @Test
+    void futureCompletesImmediatelyOnPassedMoment() {
+        CompletableFuture<Void> future = waiter.waitFor(clock.now());
+
+        assertThat(future.isDone(), is(true));
+    }
+
+    @Test
+    void futureCompletesWhenClockGetsUpdatedToSufficientTimestamp() {
+        HybridTimestamp oneYearAhead = getOneYearAhead();
+
+        CompletableFuture<Void> future = waiter.waitFor(oneYearAhead);
+
+        assertThat(future.isDone(), is(false));
+
+        clock.update(oneYearAhead);
+
+        assertThat(future.isDone(), is(true));
+    }
+
+    private HybridTimestamp getOneYearAhead() {
+        return clock.now().addPhysicalTime(TimeUnit.DAYS.toMillis(365));
+    }
+
+    @Test
+    void futureCompletesWithoutClockUpdates() {
+        HybridTimestamp littleAhead = clock.now().addPhysicalTime(200);
+
+        CompletableFuture<Void> future = waiter.waitFor(littleAhead);
+
+        assertThat(future, willSucceedIn(10, TimeUnit.SECONDS));
+    }
+
+    @Test
+    void futureGetsCancelledOnStop() throws Exception {
+        HybridTimestamp oneYearAhead = getOneYearAhead();
+
+        CompletableFuture<Void> future = waiter.waitFor(oneYearAhead);
+
+        waiter.stop();
+
+        assertThat(future, willThrow(CancellationException.class));
+    }
+}
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index 706ca47bb3..b7e6af23c2 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -57,9 +57,7 @@ class UpdateLogImplTest {
     void setUp() {
         vault = new VaultManager(new InMemoryVaultService());
 
-        metastore = StandaloneMetaStorageManager.create(
-                vault, new SimpleInMemoryKeyValueStorage("test")
-        );
+        metastore = StandaloneMetaStorageManager.create(vault, new 
SimpleInMemoryKeyValueStorage("test"));
 
         vault.start();
         metastore.start();
@@ -74,11 +72,11 @@ class UpdateLogImplTest {
     @Test
     public void logReplayedOnStart() throws Exception {
         // first, let's append a few entries to the log
-        UpdateLogImpl updateLog = new UpdateLogImpl(metastore, vault);
+        UpdateLogImpl updateLog = createUpdateLogImpl();
 
         long revisionBefore = metastore.appliedRevision();
 
-        updateLog.registerUpdateHandler(update -> {/* no-op */});
+        updateLog.registerUpdateHandler((update, ts) -> {/* no-op */});
         updateLog.start();
 
         assertThat("Watches were not deployed", metastore.deployWatches(), 
willCompleteSuccessfully());
@@ -104,18 +102,22 @@ class UpdateLogImplTest {
 
         // now let's create new component over a stuffed vault/metastore
         // and check if log is replayed on start
-        updateLog = new UpdateLogImpl(metastore, vault);
+        updateLog = createUpdateLogImpl();
 
         List<VersionedUpdate> actualLog = new ArrayList<>();
-        updateLog.registerUpdateHandler(actualLog::add);
+        updateLog.registerUpdateHandler((update, ts) -> actualLog.add(update));
         updateLog.start();
 
         assertEquals(expectedLog, actualLog);
     }
 
+    private UpdateLogImpl createUpdateLogImpl() {
+        return new UpdateLogImpl(metastore);
+    }
+
     @Test
     public void exceptionIsThrownOnStartIfHandlerHasNotBeenRegistered() {
-        UpdateLogImpl updateLog = new UpdateLogImpl(metastore, vault);
+        UpdateLogImpl updateLog = createUpdateLogImpl();
 
         IgniteInternalException ex = assertThrows(
                 IgniteInternalException.class,
@@ -131,10 +133,10 @@ class UpdateLogImplTest {
     @ParameterizedTest
     @ValueSource(ints = {1, 2, 4, 8})
     public void appendAcceptsUpdatesInOrder(int startVersion) throws Exception 
{
-        UpdateLogImpl updateLog = new UpdateLogImpl(metastore, vault);
+        UpdateLogImpl updateLog = createUpdateLogImpl();
 
         List<Integer> appliedVersions = new ArrayList<>();
-        updateLog.registerUpdateHandler(update -> 
appliedVersions.add(update.version()));
+        updateLog.registerUpdateHandler((update, ts) -> 
appliedVersions.add(update.version()));
 
         updateLog.start();
 
@@ -174,7 +176,7 @@ class UpdateLogImplTest {
     }
 
     private static VersionedUpdate singleEntryUpdateOfVersion(int version) {
-        return new VersionedUpdate(version, version, List.of(new 
TestUpdateEntry("foo_" + version)));
+        return new VersionedUpdate(version, 1, List.of(new 
TestUpdateEntry("foo_" + version)));
     }
 
     static class TestUpdateEntry implements UpdateEntry {
diff --git 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConfigCommandTest.java
 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConfigCommandTest.java
index 4e32ac25da..b575c93415 100644
--- 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConfigCommandTest.java
+++ 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConfigCommandTest.java
@@ -62,7 +62,7 @@ public class ItConfigCommandTest extends AbstractCliTest {
                 .clusterName("cluster")
                 .build();
 
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         assertThat(future, willCompleteSuccessfully());
 
diff --git 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/rest/ItGeneratedRestClientTest.java
 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/rest/ItGeneratedRestClientTest.java
index e10b3471c7..5862014837 100644
--- 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/rest/ItGeneratedRestClientTest.java
+++ 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/rest/ItGeneratedRestClientTest.java
@@ -155,7 +155,7 @@ public class ItGeneratedRestClientTest {
                 .clusterName("cluster")
                 .build();
 
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         for (CompletableFuture<Ignite> future : futures) {
             assertThat(future, willCompleteSuccessfully());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
similarity index 63%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
copy to 
modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
index 3dc56f4d52..e550a9b486 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
@@ -18,28 +18,15 @@
 package org.apache.ignite.internal.hlc;
 
 /**
- * A Hybrid Logical Clock.
+ * Used to track updates of a {@link HybridClock}: it gets notified each time 
the clock 'ticks', including
+ * adjustments caused by external events.
  */
-public interface HybridClock {
+@FunctionalInterface
+public interface ClockUpdateListener {
     /**
-     * Creates a timestamp for new event.
+     * Called when the clock's current time advances.
      *
-     * @return The hybrid timestamp.
+     * @param newTs New timestamp on the clock (represented as a long value, 
see {@link HybridTimestamp#longValue()}.
      */
-    long nowLong();
-
-    /**
-     * Creates a timestamp for new event.
-     *
-     * @return The hybrid timestamp.
-     */
-    HybridTimestamp now();
-
-    /**
-     * Creates a timestamp for a received event.
-     *
-     * @param requestTime Timestamp from request.
-     * @return The hybrid timestamp.
-     */
-    HybridTimestamp update(HybridTimestamp requestTime);
+    void onUpdate(long newTs);
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
index 3dc56f4d52..b451c1219e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
@@ -42,4 +42,18 @@ public interface HybridClock {
      * @return The hybrid timestamp.
      */
     HybridTimestamp update(HybridTimestamp requestTime);
+
+    /**
+     * Adds an update listener to self.
+     *
+     * @param listener Update listener to add.
+     */
+    void addUpdateListener(ClockUpdateListener listener);
+
+    /**
+     * Removes an update listener from self.
+     *
+     * @param listener Listener to remove.
+     */
+    void removeUpdateListener(ClockUpdateListener listener);
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
index 32d5839dbe..4209d31674 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
@@ -24,6 +24,8 @@ import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.ignite.internal.tostring.S;
 
 /**
@@ -45,6 +47,8 @@ public class HybridClockImpl implements HybridClock {
 
     private volatile long latestTime;
 
+    private final List<ClockUpdateListener> updateListeners = new 
CopyOnWriteArrayList<>();
+
     /**
      * The constructor which initializes the latest time to current time by 
system clock.
      */
@@ -67,11 +71,17 @@ public class HybridClockImpl implements HybridClock {
             long newLatestTime = max(oldLatestTime + 1, now);
 
             if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) 
{
+                notifyUpdateListeners(newLatestTime);
+
                 return newLatestTime;
             }
         }
     }
 
+    private void notifyUpdateListeners(long newTs) {
+        updateListeners.forEach(listener -> listener.onUpdate(newTs));
+    }
+
     @Override
     public HybridTimestamp now() {
         return hybridTimestamp(nowLong());
@@ -94,12 +104,23 @@ public class HybridClockImpl implements HybridClock {
             long newLatestTime = max(requestTime.longValue() + 1, max(now, 
oldLatestTime + 1));
 
             if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) 
{
+                notifyUpdateListeners(newLatestTime);
+
                 return hybridTimestamp(newLatestTime);
             }
         }
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public void addUpdateListener(ClockUpdateListener listener) {
+        updateListeners.add(listener);
+    }
+
+    @Override
+    public void removeUpdateListener(ClockUpdateListener listener) {
+        updateListeners.remove(listener);
+    }
+
     @Override
     public String toString() {
         return S.toString(HybridClock.class, this);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index 047c536cc9..a40f0e44fd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -243,4 +243,12 @@ public final class HybridTimestamp implements 
Comparable<HybridTimestamp>, Seria
 
         return new HybridTimestamp(time + (mills << LOGICAL_TIME_BITS_SIZE));
     }
+
+    /**
+     * Returns max clock skew for the cluster (in millis).
+     */
+    // TODO: IGNITE-19809 - Convert this to a cluster-wide config property.
+    public static long maxClockSkew() {
+        return CLOCK_SKEW;
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java 
b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
index c5a5633e81..41fd6174ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
@@ -19,26 +19,38 @@ package org.apache.ignite.internal;
 
 import static 
org.apache.ignite.internal.hlc.HybridClockTestUtils.mockToEpochMilli;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 
 import java.time.Clock;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
 import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 /**
  * Tests of a Hybrid Logical Clock implementation.
  * {@link HybridClock}
  */
+@ExtendWith(MockitoExtension.class)
 class HybridClockTest {
     /**
      * Mock of a system clock.
      */
     private static MockedStatic<Clock> clockMock;
 
+    @Mock
+    private ClockUpdateListener updateListener;
+
     @AfterEach
     public void afterEach() {
         closeClockMock();
@@ -106,4 +118,52 @@ class HybridClockTest {
             clockMock.close();
         }
     }
+
+    @Test
+    void updateListenerGetsNotifiedOnUpdateCausedByNowCall() {
+        HybridClock clock = new HybridClockImpl();
+
+        clock.addUpdateListener(updateListener);
+
+        HybridTimestamp ts = clock.now();
+
+        verify(updateListener).onUpdate(ts.longValue());
+    }
+
+    @Test
+    void updateListenerGetsNotifiedOnUpdateCausedByNowLongCall() {
+        HybridClock clock = new HybridClockImpl();
+
+        clock.addUpdateListener(updateListener);
+
+        long ts = clock.nowLong();
+
+        verify(updateListener).onUpdate(ts);
+    }
+
+    @Test
+    void updateListenerGetsNotifiedOnExternalUpdate() {
+        HybridClock clock = new HybridClockImpl();
+
+        clock.addUpdateListener(updateListener);
+
+        HybridTimestamp ts = 
clock.now().addPhysicalTime(TimeUnit.DAYS.toMillis(365));
+
+        HybridTimestamp afterUpdate = clock.update(ts);
+
+        verify(updateListener).onUpdate(afterUpdate.longValue());
+    }
+
+    @Test
+    void updateListenerIsNotUpdatedAfterRemoval() {
+        HybridClock clock = new HybridClockImpl();
+
+        clock.addUpdateListener(updateListener);
+
+        clock.removeUpdateListener(updateListener);
+
+        clock.now();
+
+        verify(updateListener, never()).onUpdate(anyLong());
+    }
 }
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
index 4dd2a0f860..aa2d5cec37 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
@@ -23,7 +23,10 @@ import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.LongSupplier;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.tostring.S;
@@ -38,6 +41,8 @@ public class TestHybridClock implements HybridClock {
     /** Latest time. */
     private volatile long latestTime;
 
+    private final List<ClockUpdateListener> updateListeners = new 
CopyOnWriteArrayList<>();
+
     /**
      * Var handle for {@link #latestTime}.
      */
@@ -71,11 +76,17 @@ public class TestHybridClock implements HybridClock {
             long newLatestTime = max(oldLatestTime + 1, now);
 
             if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) 
{
+                notifyUpdateListeners(newLatestTime);
+
                 return newLatestTime;
             }
         }
     }
 
+    private void notifyUpdateListeners(long newLatestTime) {
+        updateListeners.forEach(listener -> listener.onUpdate(newLatestTime));
+    }
+
     @Override
     public HybridTimestamp now() {
         return hybridTimestamp(nowLong());
@@ -98,12 +109,23 @@ public class TestHybridClock implements HybridClock {
             long newLatestTime = max(requestTime.longValue() + 1, max(now, 
oldLatestTime + 1));
 
             if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime)) 
{
+                notifyUpdateListeners(newLatestTime);
+
                 return hybridTimestamp(newLatestTime);
             }
         }
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public void addUpdateListener(ClockUpdateListener listener) {
+        updateListeners.add(listener);
+    }
+
+    @Override
+    public void removeUpdateListener(ClockUpdateListener listener) {
+        updateListeners.remove(listener);
+    }
+
     @Override
     public String toString() {
         return S.toString(HybridClock.class, this);
diff --git 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/AbstractJdbcSelfTest.java
 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/AbstractJdbcSelfTest.java
index 4cbc43fb3b..6b54e77f20 100644
--- 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/AbstractJdbcSelfTest.java
+++ 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/AbstractJdbcSelfTest.java
@@ -87,7 +87,7 @@ public class AbstractJdbcSelfTest extends 
BaseIgniteAbstractTest {
                 .clusterName("cluster")
                 .build();
 
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         assertThat(future, willCompleteSuccessfully());
 
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index c2850211d0..a82f322611 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
@@ -75,6 +76,25 @@ public interface MetaStorageManager extends IgniteComponent {
     @Deprecated
     List<Entry> getLocally(byte[] key, long revLowerBound, long revUpperBound);
 
+    /**
+     * Returns an entry by the given key and bounded by the given revision. 
The entry is obtained
+     * from the local storage.
+     *
+     * @param key The key.
+     * @param revUpperBound The upper bound of revision.
+     * @return Value corresponding to the given key.
+     */
+    Entry getLocally(byte[] key, long revUpperBound);
+
+    /**
+     * Looks up a timestamp by a revision. This should only be invoked if it 
is guaranteed that the
+     * revision is available in the local storage. This method always operates 
locally.
+     *
+     * @param revision Revision by which to do a lookup.
+     * @return Timestamp corresponding to the revision.
+     */
+    HybridTimestamp timestampByRevision(long revision);
+
     /**
      * Retrieves entries for given keys.
      */
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
index 33130cb77f..9c8a9aea2f 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal.metastorage;
 
 import java.util.Collection;
 import java.util.List;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Watch event contains all entry updates done under one revision. Each 
particular entry update in this revision is represented by {@link
@@ -33,15 +35,20 @@ public class WatchEvent {
 
     private final long revision;
 
+    /** Timestamp assigned by the MetaStorage to the event's revision. */
+    private final HybridTimestamp timestamp;
+
     /**
      * Constructs an watch event with given entry events collection.
      *
      * @param entryEvts Events for entries corresponding to an update under 
one revision.
      * @param revision Revision of the updated entries.
+     * @param timestamp Timestamp assigned by the MetaStorage to the event's 
revision.
      */
-    public WatchEvent(Collection<EntryEvent> entryEvts, long revision) {
+    public WatchEvent(Collection<EntryEvent> entryEvts, long revision, 
HybridTimestamp timestamp) {
         this.entryEvts = List.copyOf(entryEvts);
         this.revision = revision;
+        this.timestamp = timestamp;
     }
 
     /**
@@ -49,8 +56,10 @@ public class WatchEvent {
      *
      * @param entryEvt Entry event.
      */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-19820 - 
remove/rework.
+    @TestOnly
     public WatchEvent(EntryEvent entryEvt) {
-        this(List.of(entryEvt), entryEvt.newEntry().revision());
+        this(List.of(entryEvt), entryEvt.newEntry().revision(), 
HybridTimestamp.MAX_VALUE);
     }
 
     /**
@@ -91,6 +100,15 @@ public class WatchEvent {
         return revision;
     }
 
+    /**
+     * Returns the timestamp assigned by the MetaStorage to this event's 
revision.
+     *
+     * @return Timestamp assigned by the MetaStorage to this event's revision.
+     */
+    public HybridTimestamp timestamp() {
+        return timestamp;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 8daf22341e..e0e7afde88 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -334,6 +334,32 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         }
     }
 
+    @Override
+    public Entry getLocally(byte[] key, long revUpperBound) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+
+        try {
+            return storage.get(key, revUpperBound);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    @Override
+    public HybridTimestamp timestampByRevision(long revision) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(new NodeStoppingException());
+        }
+
+        try {
+            return storage.timestampByRevision(revision);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     @Override
     public CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> 
keys) {
         if (!busyLock.enterBusy()) {
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index cc2edef81b..c5350e0e63 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -284,4 +284,12 @@ public interface KeyValueStorage extends ManuallyCloseable 
{
      * present in the storage.
      */
     byte @Nullable [] nextKey(byte[] key);
+
+    /**
+     * Looks up a timestamp by a revision.
+     *
+     * @param revision Revision by which to do a lookup.
+     * @return Timestamp corresponding to the revision.
+     */
+    HybridTimestamp timestampByRevision(long revision);
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 234a14dd08..859040a9b5 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -130,7 +130,7 @@ public class WatchProcessor implements ManuallyCloseable {
 
                     // Notify all watches in parallel, then aggregate the 
entries that they have processed.
                     CompletableFuture<List<EntryEvent>>[] notificationFutures 
= watches.stream()
-                            .map(watch -> notifyWatch(watch, updatedEntries, 
newRevision))
+                            .map(watch -> notifyWatch(watch, updatedEntries, 
newRevision, time))
                             .toArray(CompletableFuture[]::new);
 
                     return allOf(notificationFutures)
@@ -138,7 +138,7 @@ public class WatchProcessor implements ManuallyCloseable {
                 }, watchExecutor);
     }
 
-    private CompletableFuture<List<EntryEvent>> notifyWatch(Watch watch, 
List<Entry> updatedEntries, long revision) {
+    private CompletableFuture<List<EntryEvent>> notifyWatch(Watch watch, 
List<Entry> updatedEntries, long revision, HybridTimestamp time) {
         CompletableFuture<List<EntryEvent>> eventFuture = supplyAsync(() -> {
             List<EntryEvent> entryEvents = List.of();
 
@@ -165,7 +165,7 @@ public class WatchProcessor implements ManuallyCloseable {
                 .thenCompose(entryEvents -> {
                     CompletableFuture<Void> eventNotificationFuture = 
entryEvents.isEmpty()
                             ? watch.onRevisionUpdated(revision)
-                            : watch.onUpdate(new WatchEvent(entryEvents, 
revision));
+                            : watch.onUpdate(new WatchEvent(entryEvents, 
revision, time));
 
                     return eventNotificationFuture.thenApply(v -> entryEvents);
                 })
@@ -199,7 +199,7 @@ public class WatchProcessor implements ManuallyCloseable {
                 acceptedEntries.addAll(future.join());
             }
 
-            var event = new WatchEvent(acceptedEntries, revision);
+            var event = new WatchEvent(acceptedEntries, revision, time);
 
             return revisionCallback.onRevisionApplied(event, time)
                     .whenComplete((ignored, e) -> {
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 9458da0828..8a5858f3ea 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -1550,7 +1550,8 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         finishReplay();
     }
 
-    private HybridTimestamp timestampByRevision(long revision) {
+    @Override
+    public HybridTimestamp timestampByRevision(long revision) {
         try {
             byte[] tsBytes = revisionToTs.get(longToBytes(revision));
 
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 98656f1133..85b241cb25 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -33,6 +33,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Objects;
 import java.util.OptionalLong;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -409,6 +410,13 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
         return incrementPrefix(key);
     }
 
+    @Override
+    public HybridTimestamp timestampByRevision(long revision) {
+        synchronized (mux) {
+            return Objects.requireNonNull(revToTsMap.get(revision), "Revision 
" + revision + " not found");
+        }
+    }
+
     @Override
     public void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev, 
WatchListener listener) {
         assert keyFrom != null : "keyFrom couldn't be null.";
diff --git a/modules/rest-api/openapi/openapi.yaml 
b/modules/rest-api/openapi/openapi.yaml
index 358a88f66a..f7c1ac06d5 100644
--- a/modules/rest-api/openapi/openapi.yaml
+++ b/modules/rest-api/openapi/openapi.yaml
@@ -775,7 +775,7 @@ components:
       description: Unique tag that identifies the cluster.
     DeploymentStatus:
       type: string
-      description: REST presentation of the deployment statuses.
+      description: Status of deployment process.
       enum:
       - UPLOADING
       - DEPLOYED
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
index 509c95045d..4aea1bca3d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
@@ -173,7 +173,7 @@ public class Cluster {
      *     with this call.
      * @param initParametersConfigurator Configure {@link InitParameters} 
before initializing the cluster.
      */
-    public void startAndInit(
+    private void startAndInit(
             int nodeCount,
             int[] cmgNodes,
             String nodeBootstrapConfigTemplate,
@@ -196,7 +196,7 @@ public class Cluster {
 
         initParametersConfigurator.accept(builder);
 
-        IgnitionManager.init(builder.build());
+        TestIgnitionManager.init(builder.build());
 
         for (CompletableFuture<IgniteImpl> future : futures) {
             assertThat(future, willCompleteSuccessfully());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
index f578776261..88f177aa8b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
@@ -78,7 +78,7 @@ public class AbstractOneNodeBenchmark {
 
         var fut =  TestIgnitionManager.start(NODE_NAME, config, 
workDir.resolve(NODE_NAME));
 
-        IgnitionManager.init(new InitParametersBuilder()
+        TestIgnitionManager.init(new InitParametersBuilder()
                 .clusterName("cluster")
                 .destinationNodeName(NODE_NAME)
                 .cmgNodeNames(List.of(NODE_NAME))
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterInitTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterInitTest.java
index accda25e4d..0c5eac07c2 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterInitTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterInitTest.java
@@ -67,12 +67,12 @@ public class ItClusterInitTest extends IgniteAbstractTest {
                 .clusterName("cluster")
                 .build();
 
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         
assertThat(allOf(nodesByName.values().toArray(CompletableFuture[]::new)), 
willCompleteSuccessfully());
 
         // init is idempotent
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         InitParameters initParametersWithWrongNodesList1 = 
InitParameters.builder()
                 .destinationNodeName(nodeName)
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/component/ItRestAddressReportTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/component/ItRestAddressReportTest.java
index db649c9bde..23f07deb2d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/component/ItRestAddressReportTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/component/ItRestAddressReportTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.InitParameters;
 import org.apache.ignite.app.IgniteRunner;
 import org.apache.ignite.internal.IgniteIntegrationTest;
 import org.apache.ignite.internal.runner.app.IgniteRunnerTest;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.junit.jupiter.api.DisplayName;
@@ -76,7 +77,7 @@ public class ItRestAddressReportTest extends 
IgniteIntegrationTest {
                 .clusterName("cluster")
                 .build();
 
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         // Then node is started
         assertThat(ign, willCompleteSuccessfully());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index a51e9ac216..5483bc84e6 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogServiceImpl;
+import org.apache.ignite.internal.catalog.ClockWaiter;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
@@ -96,6 +97,7 @@ import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
@@ -595,6 +597,8 @@ public class ItRebalanceDistributedTest {
 
         private final CatalogManager catalogManager;
 
+        private final ClockWaiter clockWaiter;
+
         private List<IgniteComponent> nodeComponents;
 
         private final ConfigurationTreeGenerator nodeCfgGenerator;
@@ -678,15 +682,16 @@ public class ItRebalanceDistributedTest {
 
             LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
 
+            KeyValueStorage keyValueStorage = 
testInfo.getTestMethod().get().isAnnotationPresent(UseRocksMetaStorage.class)
+                    ? new RocksDbKeyValueStorage(nodeName, resolveDir(dir, 
"metaStorage"))
+                    : new SimpleInMemoryKeyValueStorage(nodeName);
             metaStorageManager = new MetaStorageManagerImpl(
                     vaultManager,
                     clusterService,
                     cmgManager,
                     logicalTopologyService,
                     raftManager,
-                    
testInfo.getTestMethod().get().isAnnotationPresent(UseRocksMetaStorage.class)
-                            ? new RocksDbKeyValueStorage(nodeName, 
resolveDir(dir, "metaStorage"))
-                            : new SimpleInMemoryKeyValueStorage(nodeName),
+                    keyValueStorage,
                     hybridClock
             );
 
@@ -757,7 +762,12 @@ public class ItRebalanceDistributedTest {
                     metaStorageManager,
                     clusterService);
 
-            catalogManager = new CatalogServiceImpl(new 
UpdateLogImpl(metaStorageManager, vaultManager), hybridClock);
+            clockWaiter = new ClockWaiter("test", hybridClock);
+
+            catalogManager = new CatalogServiceImpl(
+                    new UpdateLogImpl(metaStorageManager),
+                    clockWaiter
+            );
 
             schemaManager = new SchemaManager(registry, tablesCfg, 
metaStorageManager);
 
@@ -852,6 +862,7 @@ public class ItRebalanceDistributedTest {
                     cmgManager,
                     metaStorageManager,
                     clusterCfgMgr,
+                    clockWaiter,
                     catalogManager,
                     distributionZoneManager,
                     replicaManager,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index f6d0bc3fe2..bf6e993ac5 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -115,7 +115,7 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
             + "      netClusterNodes: [ {} ]\n"
             + "    }\n"
             + "  },\n"
-            + "  raft.rpcInstallSnapshotTimeout: 10000,"
+            + "  raft.rpcInstallSnapshotTimeout: 10000,\n"
             + "  clientConnector.port: {}\n"
             + "}";
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
index 43d1624430..e46f65bea6 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
@@ -189,7 +189,7 @@ abstract class AbstractSchemaChangeTest extends 
IgniteIntegrationTest {
                 .clusterName("cluster")
                 .build();
 
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         await(CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])));
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
index 42c877fef3..275c4c2333 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -122,7 +122,7 @@ public class ItDataSchemaSyncTest extends 
IgniteAbstractTest {
                 .clusterName("cluster")
                 .build();
 
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         for (CompletableFuture<Ignite> future : futures) {
             assertThat(future, willCompleteSuccessfully());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index cd8d3a1102..192592d2ce 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -111,7 +111,7 @@ public class ItIgniteInMemoryNodeRestartTest extends 
BaseIgniteRestartTest {
                     .clusterName("cluster")
                     .build();
 
-            IgnitionManager.init(initParameters);
+            TestIgnitionManager.init(initParameters);
         }
 
         assertThat(future, willCompleteSuccessfully());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 8bec4299f1..f80687fd09 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.BaseIgniteRestartTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.catalog.CatalogServiceImpl;
+import org.apache.ignite.internal.catalog.ClockWaiter;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
@@ -323,7 +324,12 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 new RaftGroupEventsClientListener()
         );
 
-        var catalogManager = new CatalogServiceImpl(new 
UpdateLogImpl(metaStorageMgr, vault), hybridClock);
+        var clockWaiter = new ClockWaiter("test", hybridClock);
+
+        var catalogManager = new CatalogServiceImpl(
+                new UpdateLogImpl(metaStorageMgr),
+                clockWaiter
+        );
 
         TableManager tableManager = new TableManager(
                 name,
@@ -393,6 +399,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 metaStorageMgr,
                 clusterCfgMgr,
                 dataStorageManager,
+                clockWaiter,
                 catalogManager,
                 schemaManager,
                 distributionZoneManager,
@@ -462,7 +469,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                     .metaStorageNodeNames(List.of(nodeName))
                     .clusterName("cluster")
                     .build();
-            IgnitionManager.init(initParameters);
+            TestIgnitionManager.init(initParameters);
         }
 
         assertThat(future, willCompleteSuccessfully());
@@ -523,7 +530,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                     .metaStorageNodeNames(List.of(nodeName))
                     .clusterName("cluster")
                     .build();
-            IgnitionManager.init(initParameters);
+            TestIgnitionManager.init(initParameters);
         }
 
         return futures.stream()
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableCreationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableCreationTest.java
index 44b6ad60e5..0f2e007aab 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableCreationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableCreationTest.java
@@ -195,7 +195,7 @@ class ItTableCreationTest extends IgniteIntegrationTest {
                 .metaStorageNodeNames(List.of(metaStorageNode))
                 .clusterName("cluster")
                 .build();
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         for (CompletableFuture<Ignite> future : futures) {
             assertThat(future, willCompleteSuccessfully());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index 72bf12d6ea..cb96e88300 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -120,7 +120,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
                 .metaStorageNodeNames(List.of(metaStorageNodeName))
                 .clusterName("cluster")
                 .build();
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         for (CompletableFuture<Ignite> future : futures) {
             assertThat(future, willCompleteSuccessfully());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index 31e19d7cc9..c7eb34f143 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -243,7 +243,7 @@ public class PlatformTestNodeRunner {
                 .metaStorageNodeNames(List.of(metaStorageNodeName))
                 .clusterName("cluster")
                 .build();
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         System.out.println("Initialization complete");
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
index 296663e899..47032c827f 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
@@ -107,7 +107,7 @@ public abstract class ItAbstractThinClientTest extends 
IgniteAbstractTest {
                 .metaStorageNodeNames(List.of(metaStorageNode))
                 .clusterName("cluster")
                 .build();
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         for (CompletableFuture<Ignite> future : futures) {
             assertThat(future, willCompleteSuccessfully());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index 65fe5d3de5..4347405dee 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -161,7 +161,7 @@ public abstract class ClusterPerClassIntegrationTest 
extends IgniteIntegrationTe
                 .metaStorageNodeNames(List.of(metaStorageNodeName))
                 .clusterName("cluster")
                 .build();
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         for (CompletableFuture<Ignite> future : futures) {
             assertThat(future, willCompleteSuccessfully());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/ItSqlLogicTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/ItSqlLogicTest.java
index 97bd0673c7..aec92abc83 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/ItSqlLogicTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/ItSqlLogicTest.java
@@ -335,7 +335,7 @@ public class ItSqlLogicTest extends IgniteIntegrationTest {
                 .metaStorageNodeNames(List.of(metaStorageNodeName))
                 .clusterName("cluster")
                 .build();
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         for (CompletableFuture<Ignite> future : futures) {
             assertThat(future, willCompleteSuccessfully());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
index 5ad0371190..653449f663 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
@@ -35,7 +35,6 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.InitParameters;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.client.IgniteClientConnectionException;
@@ -43,6 +42,7 @@ import org.apache.ignite.client.SslConfiguration;
 import org.apache.ignite.internal.Cluster;
 import org.apache.ignite.internal.IgniteIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.intellij.lang.annotations.Language;
 import org.junit.jupiter.api.AfterAll;
@@ -517,7 +517,7 @@ public class ItSslTest extends IgniteIntegrationTest {
                 .clusterName("cluster")
                 .build();
 
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         // First node will initialize the cluster with single node 
successfully since the second node can't connect to it.
         assertThat(node1, willCompleteSuccessfully());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
index 01d4ab0183..b41540f488 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
@@ -133,7 +133,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
                 .clusterName("cluster")
                 .build();
 
-        IgnitionManager.init(initParameters);
+        TestIgnitionManager.init(initParameters);
 
         assertThat(future, willCompleteSuccessfully());
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index c8d20a5375..3d76fbb7df 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -42,6 +42,8 @@ import org.apache.ignite.configuration.ConfigurationModule;
 import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogServiceImpl;
+import org.apache.ignite.internal.catalog.ClockWaiter;
+import 
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
@@ -276,6 +278,8 @@ public class IgniteImpl implements Ignite {
     /** A hybrid logical clock. */
     private final HybridClock clock;
 
+    private final ClockWaiter clockWaiter;
+
     private final OutgoingSnapshotsManager outgoingSnapshotsManager;
 
     private final RestAddressReporter restAddressReporter;
@@ -346,6 +350,8 @@ public class IgniteImpl implements Ignite {
 
         clock = new HybridClockImpl();
 
+        clockWaiter = new ClockWaiter(name, clock);
+
         RaftConfiguration raftConfiguration = 
nodeConfigRegistry.getConfiguration(RaftConfiguration.KEY);
 
         // TODO https://issues.apache.org/jira/browse/IGNITE-19051
@@ -485,7 +491,15 @@ public class IgniteImpl implements Ignite {
 
         outgoingSnapshotsManager = new 
OutgoingSnapshotsManager(clusterSvc.messagingService());
 
-        catalogManager = new CatalogServiceImpl(new 
UpdateLogImpl(metaStorageMgr, vaultMgr), clock);
+        SchemaSynchronizationConfiguration schemaSyncConfig = 
clusterConfigRegistry.getConfiguration(
+                SchemaSynchronizationConfiguration.KEY
+        );
+
+        catalogManager = new CatalogServiceImpl(
+                new UpdateLogImpl(metaStorageMgr),
+                clockWaiter,
+                () -> schemaSyncConfig.delayDuration().value()
+        );
 
         distributedTblMgr = new TableManager(
                 name,
@@ -675,6 +689,7 @@ public class IgniteImpl implements Ignite {
 
             // Start the components that are required to join the cluster.
             lifecycleManager.startComponents(
+                    clockWaiter,
                     nettyBootstrapFactory,
                     clusterSvc,
                     restComponent,
diff --git 
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
 
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
index 485f6c2f86..4e47a634e6 100644
--- 
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
+++ 
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
@@ -42,6 +42,7 @@ import 
org.apache.ignite.internal.configuration.TestConfigurationChanger;
 import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
 import org.apache.ignite.internal.configuration.tree.ConstructableTreeNode;
 import 
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
@@ -214,7 +215,7 @@ public class DistributedConfigurationCatchUpTest {
                         new EntryEvent(null, new EntryImpl(MASTER_KEY.bytes(), 
null, newRevision, -1)),
                         // Add a mock entry to simulate a configuration update.
                         new EntryEvent(null, new EntryImpl((DISTRIBUTED_PREFIX 
+ "foobar").getBytes(UTF_8), null, newRevision, -1))
-                ), newRevision));
+                ), newRevision, HybridTimestamp.MAX_VALUE));
 
                 return true;
             });

Reply via email to