This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c67ad9d99a IGNITE-20039 Add functionality to switch the IndexManager 
to catalog events #2 (#2354)
c67ad9d99a is described below

commit c67ad9d99acdbf3cb96129eef7421552836f8b98
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Jul 25 18:01:57 2023 +0300

    IGNITE-20039 Add functionality to switch the IndexManager to catalog events 
#2 (#2354)
---
 .../apache/ignite/internal/catalog/Catalog.java    |  14 +-
 ...logServiceImpl.java => CatalogManagerImpl.java} |  41 +-
 .../ignite/internal/catalog/CatalogService.java    |  24 +-
 .../descriptors/CatalogSchemaDescriptor.java       |   5 +-
 .../internal/catalog/storage/UpdateLogImpl.java    |  19 +-
 ...ceSelfTest.java => CatalogManagerSelfTest.java} | 628 +++++++++++----------
 .../catalog/storage/UpdateLogImplTest.java         |  94 +--
 .../storage/ItRebalanceDistributedTest.java        |   4 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   4 +-
 10 files changed, 470 insertions(+), 367 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
index 2a6c06b5e3..80057fcc0c 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.CollectionUtils;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Catalog descriptor represents database schema snapshot.
@@ -115,14 +116,22 @@ public class Catalog {
         return schemasByName.values();
     }
 
-    public CatalogTableDescriptor table(int tableId) {
+    public @Nullable CatalogTableDescriptor table(int tableId) {
         return tablesById.get(tableId);
     }
 
-    public CatalogIndexDescriptor index(int indexId) {
+    public Collection<CatalogTableDescriptor> tables() {
+        return tablesById.values();
+    }
+
+    public @Nullable CatalogIndexDescriptor index(int indexId) {
         return indexesById.get(indexId);
     }
 
+    public Collection<CatalogIndexDescriptor> indexes() {
+        return indexesById.values();
+    }
+
     public CatalogZoneDescriptor zone(String name) {
         return zonesByName.get(name);
     }
@@ -135,7 +144,6 @@ public class Catalog {
         return zonesByName.values();
     }
 
-    /** {@inheritDoc} */
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
similarity index 96%
rename from 
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
rename to 
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index a1981a8924..a6b29af3cb 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -25,6 +25,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -101,14 +102,14 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Catalog service implementation.
  */
-public class CatalogServiceImpl extends Producer<CatalogEvent, 
CatalogEventParameters> implements CatalogManager {
+public class CatalogManagerImpl extends Producer<CatalogEvent, 
CatalogEventParameters> implements CatalogManager {
     private static final int MAX_RETRY_COUNT = 10;
 
     /** Safe time to wait before new Catalog version activation. */
     private static final int DEFAULT_DELAY_DURATION = 0;
 
     /** The logger. */
-    private static final IgniteLogger LOG = 
Loggers.forClass(CatalogServiceImpl.class);
+    private static final IgniteLogger LOG = 
Loggers.forClass(CatalogManagerImpl.class);
 
     /** Versioned catalog descriptors. */
     private final NavigableMap<Integer, Catalog> catalogByVer = new 
ConcurrentSkipListMap<>();
@@ -127,21 +128,21 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
     /**
      * Constructor.
      */
-    public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter) {
+    public CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter) {
         this(updateLog, clockWaiter, DEFAULT_DELAY_DURATION);
     }
 
     /**
      * Constructor.
      */
-    CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long 
delayDurationMs) {
+    CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long 
delayDurationMs) {
         this(updateLog, clockWaiter, () -> delayDurationMs);
     }
 
     /**
      * Constructor.
      */
-    public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter, 
LongSupplier delayDurationMsSupplier) {
+    public CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter, 
LongSupplier delayDurationMsSupplier) {
         this.updateLog = updateLog;
         this.clockWaiter = clockWaiter;
         this.delayDurationMsSupplier = delayDurationMsSupplier;
@@ -183,30 +184,45 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
     }
 
     @Override
-    public CatalogTableDescriptor table(String tableName, long timestamp) {
+    public @Nullable CatalogTableDescriptor table(String tableName, long 
timestamp) {
         return 
catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).table(tableName);
     }
 
     @Override
-    public CatalogTableDescriptor table(int tableId, long timestamp) {
+    public @Nullable CatalogTableDescriptor table(int tableId, long timestamp) 
{
         return catalogAt(timestamp).table(tableId);
     }
 
     @Override
-    public CatalogTableDescriptor table(int tableId, int catalogVersion) {
+    public @Nullable CatalogTableDescriptor table(int tableId, int 
catalogVersion) {
         return catalog(catalogVersion).table(tableId);
     }
 
     @Override
-    public CatalogIndexDescriptor index(String indexName, long timestamp) {
+    public Collection<CatalogTableDescriptor> tables(int catalogVersion) {
+        return catalog(catalogVersion).tables();
+    }
+
+    @Override
+    public @Nullable CatalogIndexDescriptor index(String indexName, long 
timestamp) {
         return 
catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).index(indexName);
     }
 
     @Override
-    public CatalogIndexDescriptor index(int indexId, long timestamp) {
+    public @Nullable CatalogIndexDescriptor index(int indexId, long timestamp) 
{
         return catalogAt(timestamp).index(indexId);
     }
 
+    @Override
+    public @Nullable CatalogIndexDescriptor index(int indexId, int 
catalogVersion) {
+        return catalog(catalogVersion).index(indexId);
+    }
+
+    @Override
+    public Collection<CatalogIndexDescriptor> indexes(int catalogVersion) {
+        return catalog(catalogVersion).indexes();
+    }
+
     @Override
     public @Nullable CatalogSchemaDescriptor schema(int version) {
         Catalog catalog = catalog(version);
@@ -254,6 +270,11 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
         return catalogAt(timestamp).version();
     }
 
+    @Override
+    public int latestCatalogVersion() {
+        return catalogByVer.lastEntry().getKey();
+    }
+
     private Catalog catalog(int version) {
         return catalogByVer.get(version);
     }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index c7364ebea3..c1d30f1b4c 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.catalog;
 
+import java.util.Collection;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
@@ -40,15 +41,21 @@ public interface CatalogService {
 
     String DEFAULT_ZONE_NAME = "Default";
 
-    CatalogTableDescriptor table(String tableName, long timestamp);
+    @Nullable CatalogTableDescriptor table(String tableName, long timestamp);
 
-    CatalogTableDescriptor table(int tableId, long timestamp);
+    @Nullable CatalogTableDescriptor table(int tableId, long timestamp);
 
-    CatalogTableDescriptor table(int tableId, int catalogVersion);
+    @Nullable CatalogTableDescriptor table(int tableId, int catalogVersion);
 
-    CatalogIndexDescriptor index(String indexName, long timestamp);
+    Collection<CatalogTableDescriptor> tables(int catalogVersion);
 
-    CatalogIndexDescriptor index(int indexId, long timestamp);
+    @Nullable CatalogIndexDescriptor index(String indexName, long timestamp);
+
+    @Nullable CatalogIndexDescriptor index(int indexId, long timestamp);
+
+    @Nullable CatalogIndexDescriptor index(int indexId, int catalogVersion);
+
+    Collection<CatalogIndexDescriptor> indexes(int catalogVersion);
 
     CatalogSchemaDescriptor schema(int version);
 
@@ -64,5 +71,12 @@ public interface CatalogService {
 
     int activeCatalogVersion(long timestamp);
 
+    /**
+     * Returns the latest registered version of the catalog.
+     *
+     * <p>NOTE: This method should only be used at the start of components 
that may be removed or moved in the future.
+     */
+    int latestCatalogVersion();
+
     void listen(CatalogEvent evt, EventListener<CatalogEventParameters> 
closure);
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptor.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptor.java
index 9b2e6bd0ed..d314a3bf17 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptor.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptor.java
@@ -26,6 +26,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Schema definition contains database schema objects.
@@ -74,11 +75,11 @@ public class CatalogSchemaDescriptor extends 
CatalogObjectDescriptor {
         return indexes;
     }
 
-    public CatalogTableDescriptor table(String name) {
+    public @Nullable CatalogTableDescriptor table(String name) {
         return tablesMap.get(name);
     }
 
-    public CatalogIndexDescriptor index(String name) {
+    public @Nullable CatalogIndexDescriptor index(String name) {
         return indexesMap.get(name);
     }
 
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 1b7994fabc..4d0e00198c 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -58,7 +58,7 @@ public class UpdateLogImpl implements UpdateLog {
     private final MetaStorageManager metastore;
 
     private volatile OnUpdateHandler onUpdateHandler;
-    private volatile @Nullable UpdateListener listener = null;
+    private volatile @Nullable UpdateListener listener;
 
     /**
      * Creates the object.
@@ -69,7 +69,6 @@ public class UpdateLogImpl implements UpdateLog {
         this.metastore = metastore;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void start() {
         if (!busyLock.enterBusy()) {
@@ -86,19 +85,17 @@ public class UpdateLogImpl implements UpdateLog {
                 );
             }
 
-            restoreStateFromVault(handler);
+            recoveryStateFromMetastore(handler);
 
             UpdateListener listener = new UpdateListener(onUpdateHandler);
             this.listener = listener;
 
             metastore.registerPrefixWatch(CatalogKey.updatePrefix(), listener);
-
         } finally {
             busyLock.leaveBusy();
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
         if (!stopGuard.compareAndSet(false, true)) {
@@ -115,13 +112,11 @@ public class UpdateLogImpl implements UpdateLog {
         }
     }
 
-    /** {@inheritDoc} */
     @Override
     public void registerUpdateHandler(OnUpdateHandler handler) {
         onUpdateHandler = handler;
     }
 
-    /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> append(VersionedUpdate update) {
         if (!busyLock.enterBusy()) {
@@ -149,15 +144,19 @@ public class UpdateLogImpl implements UpdateLog {
         }
     }
 
-    private void restoreStateFromVault(OnUpdateHandler handler) {
-        long appliedRevision = metastore.appliedRevision();
+    private void recoveryStateFromMetastore(OnUpdateHandler handler) {
+        CompletableFuture<Long> recoveryFinishedFuture = 
metastore.recoveryFinishedFuture();
+
+        assert recoveryFinishedFuture.isDone();
+
+        long recoveryRevision = recoveryFinishedFuture.join();
 
         int ver = 1;
 
         // TODO: IGNITE-19790 Read range from metastore
         while (true) {
             ByteArray key = CatalogKey.update(ver++);
-            Entry entry = metastore.getLocally(key, appliedRevision);
+            Entry entry = metastore.getLocally(key, recoveryRevision);
 
             if (entry.empty() || entry.tombstone()) {
                 break;
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
similarity index 77%
rename from 
modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
rename to 
modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index f0e5e0e300..44a5e29fa5 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -26,7 +26,9 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -106,6 +108,7 @@ import org.apache.ignite.internal.manager.EventListener;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
 import org.apache.ignite.lang.ColumnAlreadyExistsException;
@@ -132,7 +135,7 @@ import org.mockito.ArgumentCaptor;
 /**
  * Catalog service self test.
  */
-public class CatalogServiceSelfTest {
+public class CatalogManagerSelfTest {
     private static final String SCHEMA_NAME = DEFAULT_SCHEMA_NAME;
     private static final String ZONE_NAME = DEFAULT_ZONE_NAME;
     private static final String TABLE_NAME = "myTable";
@@ -147,9 +150,9 @@ public class CatalogServiceSelfTest {
 
     private UpdateLog updateLog;
 
-    private CatalogServiceImpl service;
+    private CatalogManagerImpl manager;
 
-    private HybridClock clock;
+    private final HybridClock clock = new HybridClockImpl();
 
     private ClockWaiter clockWaiter;
 
@@ -159,38 +162,39 @@ public class CatalogServiceSelfTest {
 
         metastore = StandaloneMetaStorageManager.create(vault, new 
SimpleInMemoryKeyValueStorage("test"));
 
-        clock = new HybridClockImpl();
         clockWaiter = spy(new ClockWaiter("test", clock));
         updateLog = spy(new UpdateLogImpl(metastore));
-        service = new CatalogServiceImpl(updateLog, clockWaiter);
+        manager = new CatalogManagerImpl(updateLog, clockWaiter);
 
         vault.start();
         metastore.start();
         clockWaiter.start();
-        service.start();
+        manager.start();
 
         assertThat("Watches were not deployed", metastore.deployWatches(), 
willCompleteSuccessfully());
     }
 
     @AfterEach
     public void tearDown() throws Exception {
-        service.stop();
-        clockWaiter.stop();
-        metastore.stop();
-        vault.stop();
+        IgniteUtils.closeAll(
+                manager == null ? null : manager::stop,
+                clockWaiter == null ? null : clockWaiter::stop,
+                metastore == null ? null : metastore::stop,
+                vault == null ? null : vault::stop
+        );
     }
 
     @Test
     public void testEmptyCatalog() {
-        CatalogSchemaDescriptor schema = service.schema(DEFAULT_SCHEMA_NAME, 
0);
+        CatalogSchemaDescriptor schema = manager.schema(DEFAULT_SCHEMA_NAME, 
0);
 
         assertNotNull(schema);
-        assertSame(schema, service.activeSchema(DEFAULT_SCHEMA_NAME, 
clock.nowLong()));
-        assertSame(schema, service.schema(0));
-        assertSame(schema, service.activeSchema(clock.nowLong()));
+        assertSame(schema, manager.activeSchema(DEFAULT_SCHEMA_NAME, 
clock.nowLong()));
+        assertSame(schema, manager.schema(0));
+        assertSame(schema, manager.activeSchema(clock.nowLong()));
 
-        assertNull(service.schema(1));
-        assertThrows(IllegalStateException.class, () -> 
service.activeSchema(-1L));
+        assertNull(manager.schema(1));
+        assertThrows(IllegalStateException.class, () -> 
manager.activeSchema(-1L));
 
         // Validate default schema.
         assertEquals(DEFAULT_SCHEMA_NAME, schema.name());
@@ -199,7 +203,7 @@ public class CatalogServiceSelfTest {
         assertEquals(0, schema.indexes().length);
 
         // Default distribution zone must exists.
-        CatalogZoneDescriptor zone = service.zone(DEFAULT_ZONE_NAME, 
clock.nowLong());
+        CatalogZoneDescriptor zone = manager.zone(DEFAULT_ZONE_NAME, 
clock.nowLong());
         assertEquals(DEFAULT_ZONE_NAME, zone.name());
         assertEquals(CreateZoneParams.DEFAULT_PARTITION_COUNT, 
zone.partitions());
         assertEquals(CreateZoneParams.DEFAULT_REPLICA_COUNT, zone.replicas());
@@ -224,50 +228,51 @@ public class CatalogServiceSelfTest {
                 .colocationColumns(List.of("key2"))
                 .build();
 
-        assertThat(service.createTable(params), willBe(nullValue()));
+        assertThat(manager.createTable(params), willBe(nullValue()));
 
         // Validate catalog version from the past.
-        CatalogSchemaDescriptor schema = service.schema(0);
+        CatalogSchemaDescriptor schema = manager.schema(0);
 
         assertNotNull(schema);
         assertEquals(SCHEMA_NAME, schema.name());
-        assertSame(schema, service.activeSchema(0L));
-        assertSame(schema, service.activeSchema(123L));
+        assertSame(schema, manager.activeSchema(0L));
+        assertSame(schema, manager.activeSchema(123L));
 
         assertNull(schema.table(TABLE_NAME));
-        assertNull(service.table(TABLE_NAME, 123L));
-        assertNull(service.index(createPkIndexName(TABLE_NAME), 123L));
+        assertNull(manager.table(TABLE_NAME, 123L));
+        assertNull(manager.index(createPkIndexName(TABLE_NAME), 123L));
 
         // Validate actual catalog
-        schema = service.schema(SCHEMA_NAME, 1);
+        schema = manager.schema(SCHEMA_NAME, 1);
         CatalogTableDescriptor table = schema.table(TABLE_NAME);
         CatalogHashIndexDescriptor pkIndex = (CatalogHashIndexDescriptor) 
schema.index(createPkIndexName(TABLE_NAME));
 
         assertNotNull(schema);
         assertEquals(SCHEMA_NAME, schema.name());
-        assertSame(schema, service.activeSchema(clock.nowLong()));
+        assertSame(schema, manager.activeSchema(clock.nowLong()));
 
-        assertSame(table, service.table(TABLE_NAME, clock.nowLong()));
-        assertSame(table, service.table(table.id(), clock.nowLong()));
+        assertSame(table, manager.table(TABLE_NAME, clock.nowLong()));
+        assertSame(table, manager.table(table.id(), clock.nowLong()));
 
-        assertSame(pkIndex, service.index(createPkIndexName(TABLE_NAME), 
clock.nowLong()));
-        assertSame(pkIndex, service.index(pkIndex.id(), clock.nowLong()));
+        assertSame(pkIndex, manager.index(createPkIndexName(TABLE_NAME), 
clock.nowLong()));
+        assertSame(pkIndex, manager.index(pkIndex.id(), clock.nowLong()));
 
         // Validate newly created table
         assertEquals(TABLE_NAME, table.name());
-        assertEquals(service.zone(ZONE_NAME, clock.nowLong()).id(), 
table.zoneId());
+        assertEquals(manager.zone(ZONE_NAME, clock.nowLong()).id(), 
table.zoneId());
 
         // Validate newly created pk index
+        assertEquals(3L, pkIndex.id());
         assertEquals(createPkIndexName(TABLE_NAME), pkIndex.name());
         assertEquals(table.id(), pkIndex.tableId());
         assertEquals(table.primaryKeyColumns(), pkIndex.columns());
         assertTrue(pkIndex.unique());
 
         // Validate another table creation.
-        assertThat(service.createTable(simpleTable(TABLE_NAME_2)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME_2)), 
willBe(nullValue()));
 
         // Validate actual catalog has both tables.
-        schema = service.schema(2);
+        schema = manager.schema(2);
         table = schema.table(TABLE_NAME);
         pkIndex = (CatalogHashIndexDescriptor) 
schema.index(createPkIndexName(TABLE_NAME));
         CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2);
@@ -275,43 +280,43 @@ public class CatalogServiceSelfTest {
 
         assertNotNull(schema);
         assertEquals(SCHEMA_NAME, schema.name());
-        assertSame(schema, service.activeSchema(clock.nowLong()));
+        assertSame(schema, manager.activeSchema(clock.nowLong()));
 
-        assertSame(table, service.table(TABLE_NAME, clock.nowLong()));
-        assertSame(table, service.table(table.id(), clock.nowLong()));
+        assertSame(table, manager.table(TABLE_NAME, clock.nowLong()));
+        assertSame(table, manager.table(table.id(), clock.nowLong()));
 
-        assertSame(pkIndex, service.index(createPkIndexName(TABLE_NAME), 
clock.nowLong()));
-        assertSame(pkIndex, service.index(pkIndex.id(), clock.nowLong()));
+        assertSame(pkIndex, manager.index(createPkIndexName(TABLE_NAME), 
clock.nowLong()));
+        assertSame(pkIndex, manager.index(pkIndex.id(), clock.nowLong()));
 
-        assertSame(table2, service.table(TABLE_NAME_2, clock.nowLong()));
-        assertSame(table2, service.table(table2.id(), clock.nowLong()));
+        assertSame(table2, manager.table(TABLE_NAME_2, clock.nowLong()));
+        assertSame(table2, manager.table(table2.id(), clock.nowLong()));
 
-        assertSame(pkIndex2, service.index(createPkIndexName(TABLE_NAME_2), 
clock.nowLong()));
-        assertSame(pkIndex2, service.index(pkIndex2.id(), clock.nowLong()));
+        assertSame(pkIndex2, manager.index(createPkIndexName(TABLE_NAME_2), 
clock.nowLong()));
+        assertSame(pkIndex2, manager.index(pkIndex2.id(), clock.nowLong()));
 
         assertNotSame(table, table2);
         assertNotSame(pkIndex, pkIndex2);
 
         // Try to create another table with same name.
-        assertThat(service.createTable(simpleTable(TABLE_NAME_2)), 
willThrowFast(TableAlreadyExistsException.class));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME_2)), 
willThrowFast(TableAlreadyExistsException.class));
 
         // Validate schema wasn't changed.
-        assertSame(schema, service.activeSchema(clock.nowLong()));
+        assertSame(schema, manager.activeSchema(clock.nowLong()));
     }
 
     @Test
     public void testDropTable() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
-        assertThat(service.createTable(simpleTable(TABLE_NAME_2)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME_2)), 
willBe(nullValue()));
 
         long beforeDropTimestamp = clock.nowLong();
 
         DropTableParams dropTableParams = 
DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build();
 
-        assertThat(service.dropTable(dropTableParams), willBe(nullValue()));
+        assertThat(manager.dropTable(dropTableParams), willBe(nullValue()));
 
         // Validate catalog version from the past.
-        CatalogSchemaDescriptor schema = service.schema(2);
+        CatalogSchemaDescriptor schema = manager.schema(2);
         CatalogTableDescriptor table1 = schema.table(TABLE_NAME);
         CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2);
         CatalogIndexDescriptor pkIndex1 = 
schema.index(createPkIndexName(TABLE_NAME));
@@ -322,51 +327,51 @@ public class CatalogServiceSelfTest {
 
         assertNotNull(schema);
         assertEquals(SCHEMA_NAME, schema.name());
-        assertSame(schema, service.activeSchema(beforeDropTimestamp));
+        assertSame(schema, manager.activeSchema(beforeDropTimestamp));
 
-        assertSame(table1, service.table(TABLE_NAME, beforeDropTimestamp));
-        assertSame(table1, service.table(table1.id(), beforeDropTimestamp));
+        assertSame(table1, manager.table(TABLE_NAME, beforeDropTimestamp));
+        assertSame(table1, manager.table(table1.id(), beforeDropTimestamp));
 
-        assertSame(pkIndex1, service.index(createPkIndexName(TABLE_NAME), 
beforeDropTimestamp));
-        assertSame(pkIndex1, service.index(pkIndex1.id(), 
beforeDropTimestamp));
+        assertSame(pkIndex1, manager.index(createPkIndexName(TABLE_NAME), 
beforeDropTimestamp));
+        assertSame(pkIndex1, manager.index(pkIndex1.id(), 
beforeDropTimestamp));
 
-        assertSame(table2, service.table(TABLE_NAME_2, beforeDropTimestamp));
-        assertSame(table2, service.table(table2.id(), beforeDropTimestamp));
+        assertSame(table2, manager.table(TABLE_NAME_2, beforeDropTimestamp));
+        assertSame(table2, manager.table(table2.id(), beforeDropTimestamp));
 
-        assertSame(pkIndex2, service.index(createPkIndexName(TABLE_NAME_2), 
beforeDropTimestamp));
-        assertSame(pkIndex2, service.index(pkIndex2.id(), 
beforeDropTimestamp));
+        assertSame(pkIndex2, manager.index(createPkIndexName(TABLE_NAME_2), 
beforeDropTimestamp));
+        assertSame(pkIndex2, manager.index(pkIndex2.id(), 
beforeDropTimestamp));
 
         // Validate actual catalog
-        schema = service.schema(3);
+        schema = manager.schema(3);
 
         assertNotNull(schema);
         assertEquals(SCHEMA_NAME, schema.name());
-        assertSame(schema, service.activeSchema(clock.nowLong()));
+        assertSame(schema, manager.activeSchema(clock.nowLong()));
 
         assertNull(schema.table(TABLE_NAME));
-        assertNull(service.table(TABLE_NAME, clock.nowLong()));
-        assertNull(service.table(table1.id(), clock.nowLong()));
+        assertNull(manager.table(TABLE_NAME, clock.nowLong()));
+        assertNull(manager.table(table1.id(), clock.nowLong()));
 
         assertNull(schema.index(createPkIndexName(TABLE_NAME)));
-        assertNull(service.index(createPkIndexName(TABLE_NAME), 
clock.nowLong()));
-        assertNull(service.index(pkIndex1.id(), clock.nowLong()));
+        assertNull(manager.index(createPkIndexName(TABLE_NAME), 
clock.nowLong()));
+        assertNull(manager.index(pkIndex1.id(), clock.nowLong()));
 
-        assertSame(table2, service.table(TABLE_NAME_2, clock.nowLong()));
-        assertSame(table2, service.table(table2.id(), clock.nowLong()));
+        assertSame(table2, manager.table(TABLE_NAME_2, clock.nowLong()));
+        assertSame(table2, manager.table(table2.id(), clock.nowLong()));
 
-        assertSame(pkIndex2, service.index(createPkIndexName(TABLE_NAME_2), 
clock.nowLong()));
-        assertSame(pkIndex2, service.index(pkIndex2.id(), clock.nowLong()));
+        assertSame(pkIndex2, manager.index(createPkIndexName(TABLE_NAME_2), 
clock.nowLong()));
+        assertSame(pkIndex2, manager.index(pkIndex2.id(), clock.nowLong()));
 
         // Try to drop table once again.
-        assertThat(service.dropTable(dropTableParams), 
willThrowFast(TableNotFoundException.class));
+        assertThat(manager.dropTable(dropTableParams), 
willThrowFast(TableNotFoundException.class));
 
         // Validate schema wasn't changed.
-        assertSame(schema, service.activeSchema(clock.nowLong()));
+        assertSame(schema, manager.activeSchema(clock.nowLong()));
     }
 
     @Test
     public void testAddColumn() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         AlterTableAddColumnParams params = AlterTableAddColumnParams.builder()
                 .schemaName(SCHEMA_NAME)
@@ -382,17 +387,17 @@ public class CatalogServiceSelfTest {
 
         long beforeAddedTimestamp = clock.nowLong();
 
-        assertThat(service.addColumn(params), willBe(nullValue()));
+        assertThat(manager.addColumn(params), willBe(nullValue()));
 
         // Validate catalog version from the past.
-        CatalogSchemaDescriptor schema = 
service.activeSchema(beforeAddedTimestamp);
+        CatalogSchemaDescriptor schema = 
manager.activeSchema(beforeAddedTimestamp);
         assertNotNull(schema);
         assertNotNull(schema.table(TABLE_NAME));
 
         assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
 
         // Validate actual catalog
-        schema = service.activeSchema(clock.nowLong());
+        schema = manager.activeSchema(clock.nowLong());
         assertNotNull(schema);
         assertNotNull(schema.table(TABLE_NAME));
 
@@ -413,7 +418,7 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testDropColumn() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         // Validate dropping column
         AlterTableDropColumnParams params = 
AlterTableDropColumnParams.builder()
@@ -424,17 +429,17 @@ public class CatalogServiceSelfTest {
 
         long beforeAddedTimestamp = clock.nowLong();
 
-        assertThat(service.dropColumn(params), willBe(nullValue()));
+        assertThat(manager.dropColumn(params), willBe(nullValue()));
 
         // Validate catalog version from the past.
-        CatalogSchemaDescriptor schema = 
service.activeSchema(beforeAddedTimestamp);
+        CatalogSchemaDescriptor schema = 
manager.activeSchema(beforeAddedTimestamp);
         assertNotNull(schema);
         assertNotNull(schema.table(TABLE_NAME));
 
         assertNotNull(schema.table(TABLE_NAME).column("VAL"));
 
         // Validate actual catalog
-        schema = service.activeSchema(clock.nowLong());
+        schema = manager.activeSchema(clock.nowLong());
         assertNotNull(schema);
         assertNotNull(schema.table(TABLE_NAME));
 
@@ -443,7 +448,7 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testCreateDropColumnIfTableNotExists() {
-        assertNull(service.table(TABLE_NAME, clock.nowLong()));
+        assertNull(manager.table(TABLE_NAME, clock.nowLong()));
 
         // Try to add a new column.
         AlterTableAddColumnParams addColumnParams = 
AlterTableAddColumnParams.builder()
@@ -452,7 +457,7 @@ public class CatalogServiceSelfTest {
                 
.columns(List.of(ColumnParams.builder().name(NEW_COLUMN_NAME).type(ColumnType.INT32).nullable(true).build()))
                 .build();
 
-        assertThat(service.addColumn(addColumnParams), 
willThrow(TableNotFoundException.class));
+        assertThat(manager.addColumn(addColumnParams), 
willThrow(TableNotFoundException.class));
 
         // Try to drop column.
         AlterTableDropColumnParams dropColumnParams = 
AlterTableDropColumnParams.builder()
@@ -461,13 +466,13 @@ public class CatalogServiceSelfTest {
                 .columns(Set.of("VAL"))
                 .build();
 
-        assertThat(service.dropColumn(dropColumnParams), 
willThrow(TableNotFoundException.class));
+        assertThat(manager.dropColumn(dropColumnParams), 
willThrow(TableNotFoundException.class));
     }
 
     @Test
     public void testDropIndexedColumn() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
-        assertThat(service.createIndex(simpleIndex(INDEX_NAME, TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createIndex(simpleIndex(INDEX_NAME, TABLE_NAME)), 
willBe(nullValue()));
 
         // Try to drop indexed column
         AlterTableDropColumnParams params = 
AlterTableDropColumnParams.builder()
@@ -476,7 +481,7 @@ public class CatalogServiceSelfTest {
                 .columns(Set.of("VAL"))
                 .build();
 
-        assertThat(service.dropColumn(params), willThrow(SqlException.class,
+        assertThat(manager.dropColumn(params), willThrow(SqlException.class,
                 "Can't drop indexed column: [columnName=VAL, 
indexName=myIndex]"));
 
         // Try to drop PK column
@@ -486,13 +491,13 @@ public class CatalogServiceSelfTest {
                 .columns(Set.of("ID"))
                 .build();
 
-        assertThat(service.dropColumn(params), willThrow(SqlException.class, 
"Can't drop primary key column: [name=ID]"));
+        assertThat(manager.dropColumn(params), willThrow(SqlException.class, 
"Can't drop primary key column: [name=ID]"));
 
         // Validate actual catalog
-        CatalogSchemaDescriptor schema = service.activeSchema(clock.nowLong());
+        CatalogSchemaDescriptor schema = manager.activeSchema(clock.nowLong());
         assertNotNull(schema);
         assertNotNull(schema.table(TABLE_NAME));
-        assertSame(service.schema(2), schema);
+        assertSame(manager.schema(2), schema);
 
         assertNotNull(schema.table(TABLE_NAME).column("ID"));
         assertNotNull(schema.table(TABLE_NAME).column("VAL"));
@@ -500,7 +505,7 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testAddDropMultipleColumns() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         // Add duplicate column.
         AlterTableAddColumnParams addColumnParams = 
AlterTableAddColumnParams.builder()
@@ -512,10 +517,10 @@ public class CatalogServiceSelfTest {
                 ))
                 .build();
 
-        assertThat(service.addColumn(addColumnParams), 
willThrow(ColumnAlreadyExistsException.class));
+        assertThat(manager.addColumn(addColumnParams), 
willThrow(ColumnAlreadyExistsException.class));
 
         // Validate no column added.
-        CatalogSchemaDescriptor schema = service.activeSchema(clock.nowLong());
+        CatalogSchemaDescriptor schema = manager.activeSchema(clock.nowLong());
 
         assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
 
@@ -529,10 +534,10 @@ public class CatalogServiceSelfTest {
                 ))
                 .build();
 
-        assertThat(service.addColumn(addColumnParams), willBe(nullValue()));
+        assertThat(manager.addColumn(addColumnParams), willBe(nullValue()));
 
         // Validate both columns added.
-        schema = service.activeSchema(clock.nowLong());
+        schema = manager.activeSchema(clock.nowLong());
 
         assertNotNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
         assertNotNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME_2));
@@ -544,10 +549,10 @@ public class CatalogServiceSelfTest {
                 .columns(Set.of(NEW_COLUMN_NAME, NEW_COLUMN_NAME_2))
                 .build();
 
-        assertThat(service.dropColumn(dropColumnParams), willBe(nullValue()));
+        assertThat(manager.dropColumn(dropColumnParams), willBe(nullValue()));
 
         // Validate both columns dropped.
-        schema = service.activeSchema(clock.nowLong());
+        schema = manager.activeSchema(clock.nowLong());
 
         assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
         assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME_2));
@@ -559,10 +564,10 @@ public class CatalogServiceSelfTest {
                 .columns(Set.of(NEW_COLUMN_NAME, "VAL"))
                 .build();
 
-        assertThat(service.dropColumn(dropColumnParams), 
willThrow(ColumnNotFoundException.class));
+        assertThat(manager.dropColumn(dropColumnParams), 
willThrow(ColumnNotFoundException.class));
 
         // Validate no column dropped.
-        schema = service.activeSchema(clock.nowLong());
+        schema = manager.activeSchema(clock.nowLong());
 
         assertNotNull(schema.table(TABLE_NAME).column("VAL"));
     }
@@ -574,36 +579,36 @@ public class CatalogServiceSelfTest {
      */
     @Test
     public void testAlterColumnDefault() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         int schemaVer = 1;
-        assertNotNull(service.schema(schemaVer));
-        assertNull(service.schema(schemaVer + 1));
+        assertNotNull(manager.schema(schemaVer));
+        assertNull(manager.schema(schemaVer + 1));
 
         // NULL-> NULL : No-op.
         assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> 
DefaultValue.constant(null)),
                 willBe(nullValue()));
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
 
         // NULL -> 1 : Ok.
         assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> 
DefaultValue.constant(1)),
                 willBe(nullValue()));
-        assertNotNull(service.schema(++schemaVer));
+        assertNotNull(manager.schema(++schemaVer));
 
         // 1 -> 1 : No-op.
         assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> 
DefaultValue.constant(1)),
                 willBe(nullValue()));
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
 
         // 1 -> 2 : Ok.
         assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> 
DefaultValue.constant(2)),
                 willBe(nullValue()));
-        assertNotNull(service.schema(++schemaVer));
+        assertNotNull(manager.schema(++schemaVer));
 
         // 2 -> NULL : Ok.
         assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () -> 
DefaultValue.constant(null)),
                 willBe(nullValue()));
-        assertNotNull(service.schema(++schemaVer));
+        assertNotNull(manager.schema(++schemaVer));
     }
 
     /**
@@ -616,21 +621,21 @@ public class CatalogServiceSelfTest {
      */
     @Test
     public void testAlterColumnNotNull() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         int schemaVer = 1;
-        assertNotNull(service.schema(schemaVer));
-        assertNull(service.schema(schemaVer + 1));
+        assertNotNull(manager.schema(schemaVer));
+        assertNull(manager.schema(schemaVer + 1));
 
         // NULLABLE -> NULLABLE : No-op.
         // NOT NULL -> NOT NULL : No-op.
         assertThat(changeColumn(TABLE_NAME, "VAL", null, false, null), 
willBe(nullValue()));
         assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, true, null), 
willBe(nullValue()));
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
 
         // NOT NULL -> NULlABLE : Ok.
         assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, false, 
null), willBe(nullValue()));
-        assertNotNull(service.schema(++schemaVer));
+        assertNotNull(manager.schema(++schemaVer));
 
         // DROP NOT NULL for PK : PK column can't be `null`.
         assertThat(changeColumn(TABLE_NAME, "ID", null, false, null),
@@ -642,7 +647,7 @@ public class CatalogServiceSelfTest {
         assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, true, null),
                 willThrowFast(SqlException.class, "Cannot set NOT NULL for 
column 'VAL_NOT_NULL'."));
 
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
     }
 
     /**
@@ -658,32 +663,32 @@ public class CatalogServiceSelfTest {
         ColumnParams pkCol = 
ColumnParams.builder().name("ID").type(ColumnType.INT32).build();
         ColumnParams col = 
ColumnParams.builder().name("COL_DECIMAL").type(ColumnType.DECIMAL).precision(10).build();
 
-        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col))), willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col))), willBe(nullValue()));
 
         int schemaVer = 1;
-        assertNotNull(service.schema(schemaVer));
-        assertNull(service.schema(schemaVer + 1));
+        assertNotNull(manager.schema(schemaVer));
+        assertNull(manager.schema(schemaVer + 1));
 
         // 10 ->11 : Ok.
         assertThat(
                 changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), 11, null, null), null, null),
                 willBe(nullValue())
         );
-        assertNotNull(service.schema(++schemaVer));
+        assertNotNull(manager.schema(++schemaVer));
 
         // No change.
         assertThat(
                 changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), 11, null, null), null, null),
                 willBe(nullValue())
         );
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
 
         // 11 -> 10 : Forbidden because this change lead to incompatible 
schemas.
         assertThat(
                 changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), 10, null, null), null, null),
                 willThrowFast(SqlException.class, "Cannot decrease precision 
to 10 for column '" + col.name() + "'.")
         );
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
     }
 
     /**
@@ -696,11 +701,11 @@ public class CatalogServiceSelfTest {
         ColumnParams col = 
ColumnParams.builder().name("COL").type(type).build();
         ColumnParams colWithPrecision = 
ColumnParams.builder().name("COL_PRECISION").type(type).precision(3).build();
 
-        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col, colWithPrecision))), willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col, colWithPrecision))), willBe(nullValue()));
 
         int schemaVer = 1;
-        assertNotNull(service.schema(schemaVer));
-        assertNull(service.schema(schemaVer + 1));
+        assertNotNull(manager.schema(schemaVer));
+        assertNull(manager.schema(schemaVer + 1));
 
         assertThat(changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(type, 3, null, null), null, null),
                 willThrowFast(SqlException.class, "Cannot change precision for 
column '" + col.name() + "'"));
@@ -714,7 +719,7 @@ public class CatalogServiceSelfTest {
         assertThat(changeColumn(TABLE_NAME, colWithPrecision.name(), new 
TestColumnTypeParams(type, 4, null, null), null, null),
                 willThrowFast(SqlException.class, "Cannot change precision for 
column '" + colWithPrecision.name() + "'"));
 
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
     }
 
     /**
@@ -731,11 +736,11 @@ public class CatalogServiceSelfTest {
         ColumnParams pkCol = 
ColumnParams.builder().name("ID").type(ColumnType.INT32).build();
         ColumnParams col = ColumnParams.builder().name("COL_" + 
type).length(10).type(type).build();
 
-        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col))), willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col))), willBe(nullValue()));
 
         int schemaVer = 1;
-        assertNotNull(service.schema(schemaVer));
-        assertNull(service.schema(schemaVer + 1));
+        assertNotNull(manager.schema(schemaVer));
+        assertNull(manager.schema(schemaVer + 1));
 
         // 10 -> 11 : Ok.
         assertThat(
@@ -743,7 +748,7 @@ public class CatalogServiceSelfTest {
                 willBe(nullValue())
         );
 
-        CatalogSchemaDescriptor schema = service.schema(++schemaVer);
+        CatalogSchemaDescriptor schema = manager.schema(++schemaVer);
         assertNotNull(schema);
 
         // 11 -> 10 : Error.
@@ -751,26 +756,26 @@ public class CatalogServiceSelfTest {
                 changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), null, 10, null), null, null),
                 willThrowFast(SqlException.class, "Cannot decrease length to 
10 for column '" + col.name() + "'.")
         );
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
 
         // 11 -> 11 : No-op.
         assertThat(
                 changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), null, 11, null), null, null),
                 willBe(nullValue())
         );
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
 
         // No change.
         assertThat(changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type()), null, null),
                 willBe(nullValue()));
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
 
         // 11 -> 10 : failed.
         assertThat(
                 changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), null, 10, null), null, null),
                 willThrowFast(SqlException.class)
         );
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
     }
 
     /**
@@ -783,11 +788,11 @@ public class CatalogServiceSelfTest {
         ColumnParams col = 
ColumnParams.builder().name("COL").type(type).build();
         ColumnParams colWithLength = 
ColumnParams.builder().name("COL_PRECISION").type(type).length(10).build();
 
-        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col, colWithLength))), willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col, colWithLength))), willBe(nullValue()));
 
         int schemaVer = 1;
-        assertNotNull(service.schema(schemaVer));
-        assertNull(service.schema(schemaVer + 1));
+        assertNotNull(manager.schema(schemaVer));
+        assertNull(manager.schema(schemaVer + 1));
 
         assertThat(changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(type, null, 10, null), null, null),
                 willThrowFast(SqlException.class, "Cannot change length for 
column '" + col.name() + "'"));
@@ -801,7 +806,7 @@ public class CatalogServiceSelfTest {
         assertThat(changeColumn(TABLE_NAME, colWithLength.name(), new 
TestColumnTypeParams(type, null, 11, null), null, null),
                 willThrowFast(SqlException.class, "Cannot change length for 
column '" + colWithLength.name() + "'"));
 
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
     }
 
     /**
@@ -812,31 +817,31 @@ public class CatalogServiceSelfTest {
     public void testAlterColumnTypeScaleIsRejected(ColumnType type) {
         ColumnParams pkCol = 
ColumnParams.builder().name("ID").type(ColumnType.INT32).build();
         ColumnParams col = ColumnParams.builder().name("COL_" + 
type).type(type).scale(3).build();
-        assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col))), willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME, List.of(pkCol, 
col))), willBe(nullValue()));
 
         int schemaVer = 1;
-        assertNotNull(service.schema(schemaVer));
-        assertNull(service.schema(schemaVer + 1));
+        assertNotNull(manager.schema(schemaVer));
+        assertNull(manager.schema(schemaVer + 1));
 
         // ANY-> UNDEFINED SCALE : No-op.
         assertThat(changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type()), null, null),
                 willBe(nullValue()));
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
 
         // 3 -> 3 : No-op.
         assertThat(changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), null, null, 3), null, null),
                 willBe(nullValue()));
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
 
         // 3 -> 4 : Error.
         assertThat(changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), null, null, 4), null, null),
                 willThrowFast(SqlException.class, "Cannot change scale for 
column '" + col.name() + "'."));
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
 
         // 3 -> 2 : Error.
         assertThat(changeColumn(TABLE_NAME, col.name(), new 
TestColumnTypeParams(col.type(), null, null, 2), null, null),
                 willThrowFast(SqlException.class, "Cannot change scale for 
column '" + col.name() + "'."));
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
     }
 
     /**
@@ -864,11 +869,11 @@ public class CatalogServiceSelfTest {
 
         CreateTableParams createTableParams = simpleTable(TABLE_NAME, 
tableColumns);
 
-        assertThat(service.createTable(createTableParams), 
willBe(nullValue()));
+        assertThat(manager.createTable(createTableParams), 
willBe(nullValue()));
 
         int schemaVer = 1;
-        assertNotNull(service.schema(schemaVer));
-        assertNull(service.schema(schemaVer + 1));
+        assertNotNull(manager.schema(schemaVer));
+        assertNull(manager.schema(schemaVer + 1));
 
         for (ColumnParams col : testColumns) {
             TypeSafeMatcher<CompletableFuture<?>> matcher;
@@ -885,14 +890,14 @@ public class CatalogServiceSelfTest {
             TestColumnTypeParams tyoeParams = new TestColumnTypeParams(target);
 
             assertThat(col.type() + " -> " + target, changeColumn(TABLE_NAME, 
col.name(), tyoeParams, null, null), matcher);
-            assertNotNull(service.schema(schemaVer));
-            assertNull(service.schema(schemaVer + 1));
+            assertNotNull(manager.schema(schemaVer));
+            assertNull(manager.schema(schemaVer + 1));
         }
     }
 
     @Test
     public void testAlterColumnTypeRejectedForPrimaryKey() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         assertThat(changeColumn(TABLE_NAME, "ID", new 
TestColumnTypeParams(ColumnType.INT64), null, null),
                 willThrowFast(SqlException.class, "Cannot change data type for 
primary key column 'ID'."));
@@ -904,11 +909,11 @@ public class CatalogServiceSelfTest {
      */
     @Test
     public void testAlterColumnMultipleChanges() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         int schemaVer = 1;
-        assertNotNull(service.schema(schemaVer));
-        assertNull(service.schema(schemaVer + 1));
+        assertNotNull(manager.schema(schemaVer));
+        assertNull(manager.schema(schemaVer + 1));
 
         Supplier<DefaultValue> dflt = () -> DefaultValue.constant(null);
         boolean notNull = false;
@@ -917,7 +922,7 @@ public class CatalogServiceSelfTest {
         // Ensures that 3 different actions applied.
         assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams, 
notNull, dflt), willBe(nullValue()));
 
-        CatalogSchemaDescriptor schema = service.schema(++schemaVer);
+        CatalogSchemaDescriptor schema = manager.schema(++schemaVer);
         assertNotNull(schema);
 
         CatalogTableColumnDescriptor desc = 
schema.table(TABLE_NAME).column("VAL_NOT_NULL");
@@ -929,24 +934,24 @@ public class CatalogServiceSelfTest {
         dflt = () -> DefaultValue.constant(2);
         assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams, 
notNull, dflt), willBe(nullValue()));
 
-        schema = service.schema(++schemaVer);
+        schema = manager.schema(++schemaVer);
         assertNotNull(schema);
         assertEquals(DefaultValue.constant(2), 
schema.table(TABLE_NAME).column("VAL_NOT_NULL").defaultValue());
 
         // Ensures that no action will be applied.
         assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams, 
notNull, dflt), willBe(nullValue()));
-        assertNull(service.schema(schemaVer + 1));
+        assertNull(manager.schema(schemaVer + 1));
     }
 
     @Test
     public void testAlterColumnForNonExistingTableRejected() {
-        assertNotNull(service.schema(0));
-        assertNull(service.schema(1));
+        assertNotNull(manager.schema(0));
+        assertNull(manager.schema(1));
 
         assertThat(changeColumn(TABLE_NAME, "ID", null, null, null), 
willThrowFast(TableNotFoundException.class));
 
-        assertNotNull(service.schema(0));
-        assertNull(service.schema(1));
+        assertNotNull(manager.schema(0));
+        assertNull(manager.schema(1));
     }
 
     @Test
@@ -958,49 +963,49 @@ public class CatalogServiceSelfTest {
                 .columns(List.of("VAL"))
                 .build();
 
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
-        assertThat(service.createIndex(params), willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createIndex(params), willBe(nullValue()));
 
         long beforeDropTimestamp = clock.nowLong();
 
         DropTableParams dropTableParams = 
DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build();
 
-        assertThat(service.dropTable(dropTableParams), willBe(nullValue()));
+        assertThat(manager.dropTable(dropTableParams), willBe(nullValue()));
 
         // Validate catalog version from the past.
-        CatalogSchemaDescriptor schema = service.schema(2);
+        CatalogSchemaDescriptor schema = manager.schema(2);
         CatalogTableDescriptor table = schema.table(TABLE_NAME);
         CatalogIndexDescriptor index = schema.index(INDEX_NAME);
 
         assertNotNull(schema);
         assertEquals(SCHEMA_NAME, schema.name());
-        assertSame(schema, service.activeSchema(beforeDropTimestamp));
+        assertSame(schema, manager.activeSchema(beforeDropTimestamp));
 
-        assertSame(table, service.table(TABLE_NAME, beforeDropTimestamp));
-        assertSame(table, service.table(table.id(), beforeDropTimestamp));
+        assertSame(table, manager.table(TABLE_NAME, beforeDropTimestamp));
+        assertSame(table, manager.table(table.id(), beforeDropTimestamp));
 
-        assertSame(index, service.index(INDEX_NAME, beforeDropTimestamp));
-        assertSame(index, service.index(index.id(), beforeDropTimestamp));
+        assertSame(index, manager.index(INDEX_NAME, beforeDropTimestamp));
+        assertSame(index, manager.index(index.id(), beforeDropTimestamp));
 
         // Validate actual catalog
-        schema = service.schema(3);
+        schema = manager.schema(3);
 
         assertNotNull(schema);
         assertEquals(SCHEMA_NAME, schema.name());
-        assertSame(schema, service.activeSchema(clock.nowLong()));
+        assertSame(schema, manager.activeSchema(clock.nowLong()));
 
         assertNull(schema.table(TABLE_NAME));
-        assertNull(service.table(TABLE_NAME, clock.nowLong()));
-        assertNull(service.table(table.id(), clock.nowLong()));
+        assertNull(manager.table(TABLE_NAME, clock.nowLong()));
+        assertNull(manager.table(table.id(), clock.nowLong()));
 
         assertNull(schema.index(INDEX_NAME));
-        assertNull(service.index(INDEX_NAME, clock.nowLong()));
-        assertNull(service.index(index.id(), clock.nowLong()));
+        assertNull(manager.index(INDEX_NAME, clock.nowLong()));
+        assertNull(manager.index(index.id(), clock.nowLong()));
     }
 
     @Test
     public void testCreateHashIndex() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         CreateHashIndexParams params = CreateHashIndexParams.builder()
                 .schemaName(SCHEMA_NAME)
@@ -1009,25 +1014,26 @@ public class CatalogServiceSelfTest {
                 .columns(List.of("VAL", "ID"))
                 .build();
 
-        assertThat(service.createIndex(params), willBe(nullValue()));
+        assertThat(manager.createIndex(params), willBe(nullValue()));
 
         // Validate catalog version from the past.
-        CatalogSchemaDescriptor schema = service.schema(1);
+        CatalogSchemaDescriptor schema = manager.schema(1);
 
         assertNotNull(schema);
         assertNull(schema.index(INDEX_NAME));
-        assertNull(service.index(INDEX_NAME, 123L));
+        assertNull(manager.index(INDEX_NAME, 123L));
 
         // Validate actual catalog
-        schema = service.schema(2);
+        schema = manager.schema(2);
 
         CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor) 
schema.index(INDEX_NAME);
 
         assertNotNull(schema);
-        assertSame(index, service.index(INDEX_NAME, clock.nowLong()));
-        assertSame(index, service.index(index.id(), clock.nowLong()));
+        assertSame(index, manager.index(INDEX_NAME, clock.nowLong()));
+        assertSame(index, manager.index(index.id(), clock.nowLong()));
 
         // Validate newly created hash index
+        assertEquals(4L, index.id());
         assertEquals(INDEX_NAME, index.name());
         assertEquals(schema.table(TABLE_NAME).id(), index.tableId());
         assertEquals(List.of("VAL", "ID"), index.columns());
@@ -1037,7 +1043,7 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testCreateSortedIndex() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         CreateSortedIndexParams params = CreateSortedIndexParams.builder()
                 .schemaName(SCHEMA_NAME)
@@ -1048,26 +1054,27 @@ public class CatalogServiceSelfTest {
                 .collations(List.of(CatalogColumnCollation.DESC_NULLS_FIRST, 
CatalogColumnCollation.ASC_NULLS_LAST))
                 .build();
 
-        assertThat(service.createIndex(params), willBe(nullValue()));
+        assertThat(manager.createIndex(params), willBe(nullValue()));
 
         // Validate catalog version from the past.
-        CatalogSchemaDescriptor schema = service.schema(1);
+        CatalogSchemaDescriptor schema = manager.schema(1);
 
         assertNotNull(schema);
         assertNull(schema.index(INDEX_NAME));
-        assertNull(service.index(INDEX_NAME, 123L));
-        assertNull(service.index(4, 123L));
+        assertNull(manager.index(INDEX_NAME, 123L));
+        assertNull(manager.index(4, 123L));
 
         // Validate actual catalog
-        schema = service.schema(2);
+        schema = manager.schema(2);
 
         CatalogSortedIndexDescriptor index = (CatalogSortedIndexDescriptor) 
schema.index(INDEX_NAME);
 
         assertNotNull(schema);
-        assertSame(index, service.index(INDEX_NAME, clock.nowLong()));
-        assertSame(index, service.index(index.id(), clock.nowLong()));
+        assertSame(index, manager.index(INDEX_NAME, clock.nowLong()));
+        assertSame(index, manager.index(index.id(), clock.nowLong()));
 
         // Validate newly created sorted index
+        assertEquals(4L, index.id());
         assertEquals(INDEX_NAME, index.name());
         assertEquals(schema.table(TABLE_NAME).id(), index.tableId());
         assertEquals("VAL", index.columns().get(0).name());
@@ -1080,7 +1087,7 @@ public class CatalogServiceSelfTest {
 
     @Test
     public void testCreateIndexWithSameName() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         CreateHashIndexParams params = CreateHashIndexParams.builder()
                 .indexName(INDEX_NAME)
@@ -1088,13 +1095,13 @@ public class CatalogServiceSelfTest {
                 .columns(List.of("VAL"))
                 .build();
 
-        assertThat(service.createIndex(params), willBe(nullValue()));
-        assertThat(service.createIndex(params), 
willThrow(IndexAlreadyExistsException.class));
+        assertThat(manager.createIndex(params), willBe(nullValue()));
+        assertThat(manager.createIndex(params), 
willThrow(IndexAlreadyExistsException.class));
     }
 
     @Test
     public void testCreateIndexOnDuplicateColumns() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         CreateHashIndexParams params = CreateHashIndexParams.builder()
                 .indexName(INDEX_NAME)
@@ -1102,7 +1109,7 @@ public class CatalogServiceSelfTest {
                 .columns(List.of("VAL", "VAL"))
                 .build();
 
-        assertThat(service.createIndex(params),
+        assertThat(manager.createIndex(params),
                 willThrow(IgniteInternalException.class, "Can't create index 
on duplicate columns: VAL, VAL"));
     }
 
@@ -1114,13 +1121,13 @@ public class CatalogServiceSelfTest {
 
         
doNothing().when(updateLogMock).registerUpdateHandler(updateHandlerCapture.capture());
 
-        CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, 
clockWaiter);
-        service.start();
+        CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, 
clockWaiter);
+        manager.start();
 
         when(updateLogMock.append(any())).thenAnswer(invocation -> {
             // here we emulate concurrent updates. First of all, we return a 
future completed with "false"
-            // as if someone has concurrently appended an update. Besides, in 
order to unblock service and allow to
-            // make another attempt, we must notify service with the same 
version as in current attempt.
+            // as if someone has concurrently appended an update. Besides, in 
order to unblock manager and allow to
+            // make another attempt, we must notify manager with the same 
version as in current attempt.
             VersionedUpdate updateFromInvocation = invocation.getArgument(0, 
VersionedUpdate.class);
 
             VersionedUpdate update = new VersionedUpdate(
@@ -1134,7 +1141,7 @@ public class CatalogServiceSelfTest {
             return completedFuture(false);
         });
 
-        CompletableFuture<Void> createTableFut = 
service.createTable(simpleTable("T"));
+        CompletableFuture<Void> createTableFut = 
manager.createTable(simpleTable("T"));
 
         assertThat(createTableFut, willThrow(IgniteInternalException.class, 
"Max retry limit exceeded"));
 
@@ -1146,9 +1153,9 @@ public class CatalogServiceSelfTest {
     public void catalogActivationTime() throws Exception {
         final long delayDuration = TimeUnit.DAYS.toMillis(365);
 
-        CatalogServiceImpl service = new CatalogServiceImpl(updateLog, 
clockWaiter, delayDuration);
+        CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, 
clockWaiter, delayDuration);
 
-        service.start();
+        manager.start();
 
         try {
             CreateTableParams params = CreateTableParams.builder()
@@ -1161,7 +1168,7 @@ public class CatalogServiceSelfTest {
                     .primaryKeyColumns(List.of("key"))
                     .build();
 
-            service.createTable(params);
+            manager.createTable(params);
 
             verify(updateLog).append(any());
             // TODO IGNITE-19400: recheck createTable future completion 
guarantees
@@ -1169,15 +1176,15 @@ public class CatalogServiceSelfTest {
             // This waits till the new Catalog version lands in the internal 
structures.
             verify(clockWaiter, timeout(10_000)).waitFor(any());
 
-            assertSame(service.schema(0), 
service.activeSchema(clock.nowLong()));
-            assertNull(service.table(TABLE_NAME, clock.nowLong()));
+            assertSame(manager.schema(0), 
manager.activeSchema(clock.nowLong()));
+            assertNull(manager.table(TABLE_NAME, clock.nowLong()));
 
             clock.update(clock.now().addPhysicalTime(delayDuration));
 
-            assertSame(service.schema(1), 
service.activeSchema(clock.nowLong()));
-            assertNotNull(service.table(TABLE_NAME, clock.nowLong()));
+            assertSame(manager.schema(1), 
manager.activeSchema(clock.nowLong()));
+            assertNotNull(manager.table(TABLE_NAME, clock.nowLong()));
         } finally {
-            service.stop();
+            manager.stop();
         }
     }
 
@@ -1185,13 +1192,13 @@ public class CatalogServiceSelfTest {
     public void catalogServiceManagesUpdateLogLifecycle() throws Exception {
         UpdateLog updateLogMock = mock(UpdateLog.class);
 
-        CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, 
clockWaiter);
+        CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock, 
clockWaiter);
 
-        service.start();
+        manager.start();
 
         verify(updateLogMock).start();
 
-        service.stop();
+        manager.stop();
 
         verify(updateLogMock).stop();
     }
@@ -1216,13 +1223,13 @@ public class CatalogServiceSelfTest {
         EventListener<CatalogEventParameters> eventListener = 
mock(EventListener.class);
         when(eventListener.notify(any(), 
any())).thenReturn(completedFuture(false));
 
-        service.listen(CatalogEvent.TABLE_CREATE, eventListener);
-        service.listen(CatalogEvent.TABLE_DROP, eventListener);
+        manager.listen(CatalogEvent.TABLE_CREATE, eventListener);
+        manager.listen(CatalogEvent.TABLE_DROP, eventListener);
 
-        assertThat(service.createTable(createTableParams), 
willBe(nullValue()));
+        assertThat(manager.createTable(createTableParams), 
willBe(nullValue()));
         verify(eventListener).notify(any(CreateTableEventParameters.class), 
isNull());
 
-        assertThat(service.dropTable(dropTableparams), willBe(nullValue()));
+        assertThat(manager.dropTable(dropTableparams), willBe(nullValue()));
         verify(eventListener).notify(any(DropTableEventParameters.class), 
isNull());
 
         verifyNoMoreInteractions(eventListener);
@@ -1257,36 +1264,36 @@ public class CatalogServiceSelfTest {
         EventListener<CatalogEventParameters> eventListener = 
mock(EventListener.class);
         when(eventListener.notify(any(), 
any())).thenReturn(completedFuture(false));
 
-        service.listen(CatalogEvent.INDEX_CREATE, eventListener);
-        service.listen(CatalogEvent.INDEX_DROP, eventListener);
+        manager.listen(CatalogEvent.INDEX_CREATE, eventListener);
+        manager.listen(CatalogEvent.INDEX_DROP, eventListener);
 
         // Try to create index without table.
-        assertThat(service.createIndex(createIndexParams), 
willThrow(TableNotFoundException.class));
+        assertThat(manager.createIndex(createIndexParams), 
willThrow(TableNotFoundException.class));
         verifyNoInteractions(eventListener);
 
         // Create table with PK index.
-        assertThat(service.createTable(createTableParams), 
willCompleteSuccessfully());
+        assertThat(manager.createTable(createTableParams), 
willCompleteSuccessfully());
         verify(eventListener).notify(any(CreateIndexEventParameters.class), 
isNull());
 
         clearInvocations(eventListener);
 
         // Create index.
-        assertThat(service.createIndex(createIndexParams), 
willCompleteSuccessfully());
+        assertThat(manager.createIndex(createIndexParams), 
willCompleteSuccessfully());
         verify(eventListener).notify(any(CreateIndexEventParameters.class), 
isNull());
 
         clearInvocations(eventListener);
 
         // Drop index.
-        assertThat(service.dropIndex(dropIndexParams), willBe(nullValue()));
+        assertThat(manager.dropIndex(dropIndexParams), willBe(nullValue()));
         verify(eventListener).notify(any(DropIndexEventParameters.class), 
isNull());
 
         clearInvocations(eventListener);
 
         // Drop table with pk index.
-        assertThat(service.dropTable(dropTableparams), willBe(nullValue()));
+        assertThat(manager.dropTable(dropTableparams), willBe(nullValue()));
 
         // Try drop index once again.
-        assertThat(service.dropIndex(dropIndexParams), 
willThrow(IndexNotFoundException.class));
+        assertThat(manager.dropIndex(dropIndexParams), 
willThrow(IndexNotFoundException.class));
 
         verify(eventListener).notify(any(DropIndexEventParameters.class), 
isNull());
     }
@@ -1303,19 +1310,19 @@ public class CatalogServiceSelfTest {
                 .filter("expression")
                 .build();
 
-        assertThat(service.createDistributionZone(params), 
willCompleteSuccessfully());
+        assertThat(manager.createDistributionZone(params), 
willCompleteSuccessfully());
 
         // Validate catalog version from the past.
-        assertNull(service.zone(zoneName, 0));
-        assertNull(service.zone(2, 0));
-        assertNull(service.zone(zoneName, 123L));
-        assertNull(service.zone(2, 123L));
+        assertNull(manager.zone(zoneName, 0));
+        assertNull(manager.zone(2, 0));
+        assertNull(manager.zone(zoneName, 123L));
+        assertNull(manager.zone(2, 123L));
 
         // Validate actual catalog
-        CatalogZoneDescriptor zone = service.zone(zoneName, clock.nowLong());
+        CatalogZoneDescriptor zone = manager.zone(zoneName, clock.nowLong());
 
         assertNotNull(zone);
-        assertSame(zone, service.zone(zone.id(), clock.nowLong()));
+        assertSame(zone, manager.zone(zone.id(), clock.nowLong()));
 
         // Validate newly created zone
         assertEquals(zoneName, zone.name());
@@ -1335,7 +1342,7 @@ public class CatalogServiceSelfTest {
                 .zoneName(zoneName)
                 .build();
 
-        assertThat(service.createDistributionZone(createZoneParams), 
willCompleteSuccessfully());
+        assertThat(manager.createDistributionZone(createZoneParams), 
willCompleteSuccessfully());
 
         long beforeDropTimestamp = clock.nowLong();
 
@@ -1343,24 +1350,24 @@ public class CatalogServiceSelfTest {
                 .zoneName(zoneName)
                 .build();
 
-        CompletableFuture<Void> fut = service.dropDistributionZone(params);
+        CompletableFuture<Void> fut = manager.dropDistributionZone(params);
 
         assertThat(fut, willCompleteSuccessfully());
 
         // Validate catalog version from the past.
-        CatalogZoneDescriptor zone = service.zone(zoneName, 
beforeDropTimestamp);
+        CatalogZoneDescriptor zone = manager.zone(zoneName, 
beforeDropTimestamp);
 
         assertNotNull(zone);
         assertEquals(zoneName, zone.name());
 
-        assertSame(zone, service.zone(zone.id(), beforeDropTimestamp));
+        assertSame(zone, manager.zone(zone.id(), beforeDropTimestamp));
 
         // Validate actual catalog
-        assertNull(service.zone(zoneName, clock.nowLong()));
-        assertNull(service.zone(zone.id(), clock.nowLong()));
+        assertNull(manager.zone(zoneName, clock.nowLong()));
+        assertNull(manager.zone(zone.id(), clock.nowLong()));
 
         // Try to drop non-existing zone.
-        assertThat(service.dropDistributionZone(params), 
willThrow(DistributionZoneNotFoundException.class));
+        assertThat(manager.dropDistributionZone(params), 
willThrow(DistributionZoneNotFoundException.class));
     }
 
     @Test
@@ -1373,7 +1380,7 @@ public class CatalogServiceSelfTest {
                 .replicas(15)
                 .build();
 
-        assertThat(service.createDistributionZone(createParams), 
willCompleteSuccessfully());
+        assertThat(manager.createDistributionZone(createParams), 
willCompleteSuccessfully());
 
         long beforeDropTimestamp = clock.nowLong();
 
@@ -1386,29 +1393,29 @@ public class CatalogServiceSelfTest {
                 .newZoneName(newZoneName)
                 .build();
 
-        assertThat(service.renameDistributionZone(renameZoneParams), 
willCompleteSuccessfully());
+        assertThat(manager.renameDistributionZone(renameZoneParams), 
willCompleteSuccessfully());
 
         // Validate catalog version from the past.
-        CatalogZoneDescriptor zone = service.zone(zoneName, 
beforeDropTimestamp);
+        CatalogZoneDescriptor zone = manager.zone(zoneName, 
beforeDropTimestamp);
 
         assertNotNull(zone);
         assertEquals(zoneName, zone.name());
 
-        assertSame(zone, service.zone(zone.id(), beforeDropTimestamp));
+        assertSame(zone, manager.zone(zone.id(), beforeDropTimestamp));
 
         // Validate actual catalog
-        zone = service.zone(newZoneName, clock.nowLong());
+        zone = manager.zone(newZoneName, clock.nowLong());
 
         assertNotNull(zone);
-        assertNull(service.zone(zoneName, clock.nowLong()));
+        assertNull(manager.zone(zoneName, clock.nowLong()));
         assertEquals(newZoneName, zone.name());
 
-        assertSame(zone, service.zone(zone.id(), clock.nowLong()));
+        assertSame(zone, manager.zone(zone.id(), clock.nowLong()));
     }
 
     @Test
     public void testDefaultZone() {
-        CatalogZoneDescriptor defaultZone = service.zone(DEFAULT_ZONE_NAME, 
clock.nowLong());
+        CatalogZoneDescriptor defaultZone = manager.zone(DEFAULT_ZONE_NAME, 
clock.nowLong());
 
         // Try to create zone with default zone name.
         CreateZoneParams createParams = CreateZoneParams.builder()
@@ -1416,10 +1423,10 @@ public class CatalogServiceSelfTest {
                 .partitions(42)
                 .replicas(15)
                 .build();
-        assertThat(service.createDistributionZone(createParams), 
willThrow(IgniteInternalException.class));
+        assertThat(manager.createDistributionZone(createParams), 
willThrow(IgniteInternalException.class));
 
         // Validate default zone wasn't changed.
-        assertSame(defaultZone, service.zone(DEFAULT_ZONE_NAME, 
clock.nowLong()));
+        assertSame(defaultZone, manager.zone(DEFAULT_ZONE_NAME, 
clock.nowLong()));
 
         // Try to rename default zone.
         String newDefaultZoneName = "RenamedDefaultZone";
@@ -1428,20 +1435,20 @@ public class CatalogServiceSelfTest {
                 .zoneName(DEFAULT_ZONE_NAME)
                 .newZoneName(newDefaultZoneName)
                 .build();
-        assertThat(service.renameDistributionZone(renameZoneParams), 
willThrow(IgniteInternalException.class));
+        assertThat(manager.renameDistributionZone(renameZoneParams), 
willThrow(IgniteInternalException.class));
 
         // Validate default zone wasn't changed.
-        assertNull(service.zone(newDefaultZoneName, clock.nowLong()));
-        assertSame(defaultZone, service.zone(DEFAULT_ZONE_NAME, 
clock.nowLong()));
+        assertNull(manager.zone(newDefaultZoneName, clock.nowLong()));
+        assertSame(defaultZone, manager.zone(DEFAULT_ZONE_NAME, 
clock.nowLong()));
 
         // Try to drop default zone.
         DropZoneParams dropZoneParams = DropZoneParams.builder()
                 .zoneName(DEFAULT_ZONE_NAME)
                 .build();
-        assertThat(service.dropDistributionZone(dropZoneParams), 
willThrow(IgniteInternalException.class));
+        assertThat(manager.dropDistributionZone(dropZoneParams), 
willThrow(IgniteInternalException.class));
 
         // Validate default zone wasn't changed.
-        assertSame(defaultZone, service.zone(DEFAULT_ZONE_NAME, 
clock.nowLong()));
+        assertSame(defaultZone, manager.zone(DEFAULT_ZONE_NAME, 
clock.nowLong()));
     }
 
     @Test
@@ -1465,13 +1472,13 @@ public class CatalogServiceSelfTest {
                 .filter("newExpression")
                 .build();
 
-        assertThat(service.createDistributionZone(createParams), 
willCompleteSuccessfully());
-        assertThat(service.alterDistributionZone(alterZoneParams), 
willCompleteSuccessfully());
+        assertThat(manager.createDistributionZone(createParams), 
willCompleteSuccessfully());
+        assertThat(manager.alterDistributionZone(alterZoneParams), 
willCompleteSuccessfully());
 
         // Validate actual catalog
-        CatalogZoneDescriptor zone = service.zone(zoneName, clock.nowLong());
+        CatalogZoneDescriptor zone = manager.zone(zoneName, clock.nowLong());
         assertNotNull(zone);
-        assertSame(zone, service.zone(zone.id(), clock.nowLong()));
+        assertSame(zone, manager.zone(zone.id(), clock.nowLong()));
 
         assertEquals(zoneName, zone.name());
         assertEquals(10, zone.partitions());
@@ -1492,7 +1499,7 @@ public class CatalogServiceSelfTest {
                 .replicas(15)
                 .build();
 
-        assertThat(service.createDistributionZone(params), 
willCompleteSuccessfully());
+        assertThat(manager.createDistributionZone(params), 
willCompleteSuccessfully());
 
         // Try to create zone with same name.
         params = CreateZoneParams.builder()
@@ -1501,14 +1508,14 @@ public class CatalogServiceSelfTest {
                 .replicas(1)
                 .build();
 
-        assertThat(service.createDistributionZone(params), 
willThrowFast(DistributionZoneAlreadyExistsException.class));
+        assertThat(manager.createDistributionZone(params), 
willThrowFast(DistributionZoneAlreadyExistsException.class));
 
         // Validate zone was NOT changed
-        CatalogZoneDescriptor zone = service.zone(zoneName, clock.nowLong());
+        CatalogZoneDescriptor zone = manager.zone(zoneName, clock.nowLong());
 
         assertNotNull(zone);
-        assertSame(zone, service.zone(zoneName, clock.nowLong()));
-        assertSame(zone, service.zone(zone.id(), clock.nowLong()));
+        assertSame(zone, manager.zone(zoneName, clock.nowLong()));
+        assertSame(zone, manager.zone(zone.id(), clock.nowLong()));
 
         assertEquals(zoneName, zone.name());
         assertEquals(42, zone.partitions());
@@ -1530,16 +1537,16 @@ public class CatalogServiceSelfTest {
         EventListener<CatalogEventParameters> eventListener = 
mock(EventListener.class);
         when(eventListener.notify(any(), 
any())).thenReturn(completedFuture(false));
 
-        service.listen(CatalogEvent.ZONE_CREATE, eventListener);
-        service.listen(CatalogEvent.ZONE_DROP, eventListener);
+        manager.listen(CatalogEvent.ZONE_CREATE, eventListener);
+        manager.listen(CatalogEvent.ZONE_DROP, eventListener);
 
-        CompletableFuture<Void> fut = 
service.createDistributionZone(createZoneParams);
+        CompletableFuture<Void> fut = 
manager.createDistributionZone(createZoneParams);
 
         assertThat(fut, willCompleteSuccessfully());
 
         verify(eventListener).notify(any(CreateZoneEventParameters.class), 
isNull());
 
-        fut = service.dropDistributionZone(dropZoneParams);
+        fut = manager.dropDistributionZone(dropZoneParams);
 
         assertThat(fut, willCompleteSuccessfully());
 
@@ -1570,25 +1577,25 @@ public class CatalogServiceSelfTest {
         EventListener<CatalogEventParameters> eventListener = 
mock(EventListener.class);
         when(eventListener.notify(any(), 
any())).thenReturn(completedFuture(false));
 
-        service.listen(CatalogEvent.TABLE_ALTER, eventListener);
+        manager.listen(CatalogEvent.TABLE_ALTER, eventListener);
 
         // Try to add column without table.
-        assertThat(service.addColumn(addColumnParams), 
willThrow(TableNotFoundException.class));
+        assertThat(manager.addColumn(addColumnParams), 
willThrow(TableNotFoundException.class));
         verifyNoInteractions(eventListener);
 
         // Create table.
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         // Add column.
-        assertThat(service.addColumn(addColumnParams), willBe(nullValue()));
+        assertThat(manager.addColumn(addColumnParams), willBe(nullValue()));
         verify(eventListener).notify(any(AddColumnEventParameters.class), 
isNull());
 
         // Drop column.
-        assertThat(service.dropColumn(dropColumnParams), willBe(nullValue()));
+        assertThat(manager.dropColumn(dropColumnParams), willBe(nullValue()));
         verify(eventListener).notify(any(DropColumnEventParameters.class), 
isNull());
 
         // Try drop column once again.
-        assertThat(service.dropColumn(dropColumnParams), 
willThrow(ColumnNotFoundException.class));
+        assertThat(manager.dropColumn(dropColumnParams), 
willThrow(ColumnNotFoundException.class));
 
         verifyNoMoreInteractions(eventListener);
     }
@@ -1599,9 +1606,9 @@ public class CatalogServiceSelfTest {
 
         HybridTimestamp startTs = clock.now();
 
-        CatalogServiceImpl service = new CatalogServiceImpl(updateLog, 
clockWaiter, delayDuration);
+        CatalogManagerImpl manager = new CatalogManagerImpl(updateLog, 
clockWaiter, delayDuration);
 
-        service.start();
+        manager.start();
 
         try {
             CreateTableParams params = CreateTableParams.builder()
@@ -1614,7 +1621,7 @@ public class CatalogServiceSelfTest {
                     .primaryKeyColumns(List.of("key"))
                     .build();
 
-            CompletableFuture<Void> future = service.createTable(params);
+            CompletableFuture<Void> future = manager.createTable(params);
 
             assertThat(future.isDone(), is(false));
 
@@ -1627,7 +1634,7 @@ public class CatalogServiceSelfTest {
                     greaterThanOrEqualTo(delayDuration + 
HybridTimestamp.maxClockSkew())
             );
         } finally {
-            service.stop();
+            manager.stop();
         }
     }
 
@@ -1635,9 +1642,9 @@ public class CatalogServiceSelfTest {
     void testGetCatalogEntityInCatalogEvent() {
         CompletableFuture<Void> result = new CompletableFuture<>();
 
-        service.listen(CatalogEvent.TABLE_CREATE, (parameters, exception) -> {
+        manager.listen(CatalogEvent.TABLE_CREATE, (parameters, exception) -> {
             try {
-                assertNotNull(service.schema(parameters.catalogVersion()));
+                assertNotNull(manager.schema(parameters.catalogVersion()));
 
                 result.complete(null);
 
@@ -1649,26 +1656,26 @@ public class CatalogServiceSelfTest {
             }
         });
 
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
         assertThat(result, willCompleteSuccessfully());
     }
 
     @Test
     void testGetTableByIdAndCatalogVersion() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
-        CatalogTableDescriptor table = service.table(TABLE_NAME, 
clock.nowLong());
+        CatalogTableDescriptor table = manager.table(TABLE_NAME, 
clock.nowLong());
 
-        assertNull(service.table(table.id(), 0));
-        assertSame(table, service.table(table.id(), 1));
+        assertNull(manager.table(table.id(), 0));
+        assertSame(table, manager.table(table.id(), 1));
     }
 
     @Test
     void testGetTableIdOnDropIndexEvent() {
-        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
 
         assertThat(
-                service.createIndex(
+                manager.createIndex(
                         CreateHashIndexParams.builder()
                                 .schemaName(SCHEMA_NAME)
                                 .tableName(TABLE_NAME)
@@ -1679,9 +1686,9 @@ public class CatalogServiceSelfTest {
                 willBe(nullValue())
         );
 
-        int tableId = service.table(TABLE_NAME, clock.nowLong()).id();
-        int pkIndexId = service.index(createPkIndexName(TABLE_NAME), 
clock.nowLong()).id();
-        int indexId = service.index(INDEX_NAME, clock.nowLong()).id();
+        int tableId = manager.table(TABLE_NAME, clock.nowLong()).id();
+        int pkIndexId = manager.index(createPkIndexName(TABLE_NAME), 
clock.nowLong()).id();
+        int indexId = manager.index(INDEX_NAME, clock.nowLong()).id();
 
         assertNotEquals(tableId, indexId);
 
@@ -1691,11 +1698,11 @@ public class CatalogServiceSelfTest {
 
         
doReturn(completedFuture(false)).when(eventListener).notify(captor.capture(), 
any());
 
-        service.listen(CatalogEvent.INDEX_DROP, eventListener);
+        manager.listen(CatalogEvent.INDEX_DROP, eventListener);
 
         // Let's remove the index.
         assertThat(
-                
service.dropIndex(DropIndexParams.builder().schemaName(SCHEMA_NAME).indexName(INDEX_NAME).build()),
+                
manager.dropIndex(DropIndexParams.builder().schemaName(SCHEMA_NAME).indexName(INDEX_NAME).build()),
                 willBe(nullValue())
         );
 
@@ -1706,7 +1713,7 @@ public class CatalogServiceSelfTest {
 
         // Let's delete the table.
         assertThat(
-                
service.dropTable(DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build()),
+                
manager.dropTable(DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build()),
                 willBe(nullValue())
         );
 
@@ -1721,7 +1728,7 @@ public class CatalogServiceSelfTest {
     void testCreateTableErrors() {
         // Table must have at least one column.
         assertThat(
-                service.createTable(
+                manager.createTable(
                         CreateTableParams.builder()
                                 .schemaName(SCHEMA_NAME)
                                 .zone(ZONE_NAME)
@@ -1735,7 +1742,7 @@ public class CatalogServiceSelfTest {
 
         // Table must have PK columns.
         assertThat(
-                service.createTable(
+                manager.createTable(
                         CreateTableParams.builder()
                                 .schemaName(SCHEMA_NAME)
                                 .zone(ZONE_NAME)
@@ -1751,7 +1758,7 @@ public class CatalogServiceSelfTest {
 
         // PK column must be a valid column
         assertThat(
-                service.createTable(
+                manager.createTable(
                         CreateTableParams.builder()
                                 .schemaName(SCHEMA_NAME)
                                 .zone(ZONE_NAME)
@@ -1767,7 +1774,7 @@ public class CatalogServiceSelfTest {
 
         // Column names must be unique.
         assertThat(
-                service.createTable(
+                manager.createTable(
                         CreateTableParams.builder()
                                 .schemaName(SCHEMA_NAME)
                                 .tableName(TABLE_NAME)
@@ -1784,7 +1791,7 @@ public class CatalogServiceSelfTest {
 
         // PK column names must be unique.
         assertThat(
-                service.createTable(
+                manager.createTable(
                         CreateTableParams.builder()
                                 .schemaName(SCHEMA_NAME)
                                 .tableName(TABLE_NAME)
@@ -1800,7 +1807,7 @@ public class CatalogServiceSelfTest {
 
         // Colocated columns names must be unique.
         assertThat(
-                service.createTable(
+                manager.createTable(
                         CreateTableParams.builder()
                                 .schemaName(SCHEMA_NAME)
                                 .tableName(TABLE_NAME)
@@ -1818,7 +1825,7 @@ public class CatalogServiceSelfTest {
 
         // Colocated columns must be valid primary key columns.
         assertThat(
-                service.createTable(
+                manager.createTable(
                         CreateTableParams.builder()
                                 .schemaName(SCHEMA_NAME)
                                 .tableName(TABLE_NAME)
@@ -1834,6 +1841,37 @@ public class CatalogServiceSelfTest {
                 willThrowFast(IgniteInternalException.class, "Colocation 
columns must be subset of primary key: outstandingColumns=[val]"));
     }
 
+    @Test
+    void testLatestCatalogVersion() {
+        assertEquals(0, manager.latestCatalogVersion());
+
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertEquals(1, manager.latestCatalogVersion());
+
+        assertThat(manager.createIndex(simpleIndex(INDEX_NAME, TABLE_NAME)), 
willBe(nullValue()));
+        assertEquals(2, manager.latestCatalogVersion());
+    }
+
+    @Test
+    void testTables() {
+        assertThat(manager.createTable(simpleTable(TABLE_NAME + 0)), 
willBe(nullValue()));
+        assertThat(manager.createTable(simpleTable(TABLE_NAME + 1)), 
willBe(nullValue()));
+
+        assertThat(manager.tables(0), empty());
+        assertThat(manager.tables(1), hasItems(table(1, TABLE_NAME + 0)));
+        assertThat(manager.tables(2), hasItems(table(2, TABLE_NAME + 0), 
table(2, TABLE_NAME + 1)));
+    }
+
+    @Test
+    void testIndexes() {
+        assertThat(manager.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+        assertThat(manager.createIndex(simpleIndex(INDEX_NAME, TABLE_NAME)), 
willBe(nullValue()));
+
+        assertThat(manager.indexes(0), empty());
+        assertThat(manager.indexes(1), hasItems(index(1, 
createPkIndexName(TABLE_NAME))));
+        assertThat(manager.indexes(2), hasItems(index(2, 
createPkIndexName(TABLE_NAME)), index(2, INDEX_NAME)));
+    }
+
     private CompletableFuture<Void> changeColumn(
             String tab,
             String col,
@@ -1866,7 +1904,7 @@ public class CatalogServiceSelfTest {
             }
         }
 
-        return service.alterColumn(builder.build());
+        return manager.alterColumn(builder.build());
     }
 
     private static CreateTableParams simpleTable(String name) {
@@ -1923,4 +1961,12 @@ public class CatalogServiceSelfTest {
     private static String createPkIndexName(String tableName) {
         return tableName + "_PK";
     }
+
+    private @Nullable CatalogTableDescriptor table(int catalogVersion, String 
tableName) {
+        return manager.schema(catalogVersion).table(tableName);
+    }
+
+    private @Nullable CatalogIndexDescriptor index(int catalogVersion, String 
indexName) {
+        return manager.schema(catalogVersion).index(indexName);
+    }
 }
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index 7fbfb55391..275bae9979 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -23,18 +23,21 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -47,6 +50,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 /** Tests to verify {@link UpdateLogImpl}. */
 @SuppressWarnings("ConstantConditions")
 class UpdateLogImplTest {
+    private KeyValueStorage keyValueStorage;
 
     private MetaStorageManager metastore;
 
@@ -56,7 +60,9 @@ class UpdateLogImplTest {
     void setUp() {
         vault = new VaultManager(new InMemoryVaultService());
 
-        metastore = StandaloneMetaStorageManager.create(vault, new 
SimpleInMemoryKeyValueStorage("test"));
+        keyValueStorage = new SimpleInMemoryKeyValueStorage("test");
+
+        metastore = StandaloneMetaStorageManager.create(vault, 
keyValueStorage);
 
         vault.start();
         metastore.start();
@@ -64,61 +70,69 @@ class UpdateLogImplTest {
 
     @AfterEach
     public void tearDown() throws Exception {
-        metastore.stop();
-        vault.stop();
+        IgniteUtils.closeAll(
+                metastore == null ? null : metastore::stop,
+                vault == null ? null : vault::stop
+        );
     }
 
     @Test
-    public void logReplayedOnStart() throws Exception {
-        // first, let's append a few entries to the log
-        UpdateLogImpl updateLog = createUpdateLogImpl();
+    void logReplayedOnStart() throws Exception {
+        // First, let's append a few entries to the update log.
+        UpdateLogImpl updateLogImpl = createAndStartUpdateLogImpl((update, ts, 
causalityToken) -> {/* no-op */});
 
-        long revisionBefore = metastore.appliedRevision();
+        assertThat(metastore.deployWatches(), willCompleteSuccessfully());
 
-        updateLog.registerUpdateHandler((update, ts, causalityToken) -> {/* 
no-op */});
-        updateLog.start();
+        List<VersionedUpdate> expectedUpdates = 
List.of(singleEntryUpdateOfVersion(1), singleEntryUpdateOfVersion(2));
 
-        assertThat("Watches were not deployed", metastore.deployWatches(), 
willCompleteSuccessfully());
+        appendUpdates(updateLogImpl, expectedUpdates);
 
-        List<VersionedUpdate> expectedVersions = List.of(
-                new VersionedUpdate(1, 1L, List.of(new 
TestUpdateEntry("foo"))),
-                new VersionedUpdate(2, 2L, List.of(new TestUpdateEntry("bar")))
-        );
+        // Let's restart the log and metastore with recovery.
+        updateLogImpl.stop();
 
-        for (VersionedUpdate update : expectedVersions) {
-            assertThat(updateLog.append(update), willBe(true));
-        }
+        restartMetastore();
 
-        // and wait till metastore apply necessary revision
-        assertTrue(
-                waitForCondition(
-                        () -> metastore.appliedRevision() - 
expectedVersions.size() == revisionBefore,
-                        TimeUnit.SECONDS.toMillis(5)
-                )
-        );
+        var actualUpdates = new ArrayList<VersionedUpdate>();
+
+        createAndStartUpdateLogImpl((update, ts, causalityToken) -> 
actualUpdates.add(update));
+
+        // Let's check that we have recovered to the latest version.
+        assertThat(actualUpdates, equalTo(expectedUpdates));
+    }
 
-        updateLog.stop();
+    private UpdateLogImpl createUpdateLogImpl() {
+        return new UpdateLogImpl(metastore);
+    }
 
-        // now let's create new component over a stuffed vault/metastore
-        // and check if log is replayed on start
-        updateLog = createUpdateLogImpl();
+    private UpdateLogImpl createAndStartUpdateLogImpl(OnUpdateHandler 
onUpdateHandler) {
+        UpdateLogImpl updateLogImpl = createUpdateLogImpl();
 
-        List<VersionedUpdate> actualVersions = new ArrayList<>();
-        List<Long> actualCausalityTokens = new ArrayList<>();
+        updateLogImpl.registerUpdateHandler(onUpdateHandler);
+        updateLogImpl.start();
 
-        updateLog.registerUpdateHandler((update, ts, causalityToken) -> {
-            actualVersions.add(update);
-            actualCausalityTokens.add(causalityToken);
-        });
+        return updateLogImpl;
+    }
 
-        updateLog.start();
+    private void appendUpdates(UpdateLogImpl updateLogImpl, 
Collection<VersionedUpdate> updates) throws Exception {
+        long revisionBeforeAppend = metastore.appliedRevision();
+
+        updates.forEach(update -> assertThat(updateLogImpl.append(update), 
willBe(true)));
 
-        assertEquals(expectedVersions, actualVersions);
-        assertEquals(List.of(revisionBefore + 1, revisionBefore + 2), 
actualCausalityTokens);
+        assertTrue(waitForCondition(
+                () -> metastore.appliedRevision() - updates.size() == 
revisionBeforeAppend,
+                TimeUnit.SECONDS.toMillis(1))
+        );
     }
 
-    private UpdateLogImpl createUpdateLogImpl() {
-        return new UpdateLogImpl(metastore);
+    private void restartMetastore() throws Exception {
+        long recoverRevision = metastore.appliedRevision();
+
+        metastore.stop();
+
+        metastore = StandaloneMetaStorageManager.create(vault, 
keyValueStorage);
+        metastore.start();
+
+        assertThat(metastore.recoveryFinishedFuture(), 
willBe(recoverRevision));
     }
 
     @Test
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 77b022c0d4..1b5979fa23 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -71,7 +71,7 @@ import 
org.apache.ignite.client.handler.configuration.ClientConnectorConfigurati
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.catalog.CatalogManager;
-import org.apache.ignite.internal.catalog.CatalogServiceImpl;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
 import org.apache.ignite.internal.catalog.ClockWaiter;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
@@ -785,7 +785,7 @@ public class ItRebalanceDistributedTest {
 
             clockWaiter = new ClockWaiter("test", hybridClock);
 
-            catalogManager = new CatalogServiceImpl(
+            catalogManager = new CatalogManagerImpl(
                     new UpdateLogImpl(metaStorageManager),
                     clockWaiter
             );
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 7a9b323950..2f542b5a61 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -58,7 +58,7 @@ import org.apache.ignite.InitParameters;
 import org.apache.ignite.internal.BaseIgniteRestartTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.baseline.BaselineManager;
-import org.apache.ignite.internal.catalog.CatalogServiceImpl;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
 import org.apache.ignite.internal.catalog.ClockWaiter;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -351,7 +351,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
 
         var clockWaiter = new ClockWaiter("test", hybridClock);
 
-        var catalogManager = new CatalogServiceImpl(
+        var catalogManager = new CatalogManagerImpl(
                 new UpdateLogImpl(metaStorageMgr),
                 clockWaiter
         );
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index d89c111b1a..4ba7dfbcb5 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -41,7 +41,7 @@ import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.configuration.ConfigurationModule;
 import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.catalog.CatalogManager;
-import org.apache.ignite.internal.catalog.CatalogServiceImpl;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
 import org.apache.ignite.internal.catalog.ClockWaiter;
 import 
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
@@ -514,7 +514,7 @@ public class IgniteImpl implements Ignite {
                 SchemaSynchronizationConfiguration.KEY
         );
 
-        catalogManager = new CatalogServiceImpl(
+        catalogManager = new CatalogManagerImpl(
                 new UpdateLogImpl(metaStorageMgr),
                 clockWaiter,
                 () -> schemaSyncConfig.delayDuration().value()

Reply via email to