This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a27ba28d53 IGNITE-19447 Switch schema validation to CatalogService 
(#2787)
a27ba28d53 is described below

commit a27ba28d5366cf69cb2948c7a4e267a2cb7c63be
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Nov 6 17:22:46 2023 +0400

    IGNITE-19447 Switch schema validation to CatalogService (#2787)
---
 .../internal/catalog/CatalogManagerSelfTest.java   |   6 +-
 .../ItTxDistributedCleanupRecoveryTest.java        |   6 +-
 ...xDistributedTestSingleNodeNoCleanupMessage.java |   6 +-
 .../internal/table/distributed/TableManager.java   |   4 +-
 .../replicator/PartitionReplicaListener.java       |   8 +-
 ...ator.java => SchemaCompatibilityValidator.java} |  41 ++--
 .../replicator/TableDefinitionDiffKey.java         |  61 ++++++
 .../schema/CatalogValidationSchemasSource.java     | 217 +++++++++++++++++++++
 .../table/distributed/schema/FullTableSchema.java  |  35 +---
 .../distributed/schema/TableDefinitionDiff.java    |  30 +--
 .../{Schemas.java => ValidationSchemasSource.java} |  25 ++-
 .../table/distributed/schema/package-info.java     |  22 ---
 .../PartitionReplicaListenerDurableUnlockTest.java |   4 +-
 .../PartitionReplicaListenerIndexLockingTest.java  |   4 +-
 .../replication/PartitionReplicaListenerTest.java  |  21 +-
 .../schema/CatalogValidationSchemasSourceTest.java | 208 ++++++++++++++++++++
 .../distributed/schema/FullTableSchemaTest.java    |  36 +---
 .../apache/ignite/distributed/ItTxTestCluster.java |  10 +-
 .../table/impl/DummyInternalTableImpl.java         |   2 +-
 .../ignite/internal/table/impl/DummySchemas.java   |  84 --------
 .../table/impl/DummyValidationSchemasSource.java}  |  48 ++---
 21 files changed, 594 insertions(+), 284 deletions(-)

diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index 9a170b444e..ed41ce3795 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -1586,13 +1586,17 @@ public class CatalogManagerSelfTest extends 
BaseCatalogManagerTest {
     public void addColumnIncrementsTableVersion() {
         createSomeTable(TABLE_NAME);
 
-        assertThat(manager.execute(addColumnParams(TABLE_NAME, 
columnParams("val2", INT32))), willCompleteSuccessfully());
+        addSomeColumn();
 
         CatalogTableDescriptor table = manager.table(TABLE_NAME, 
Long.MAX_VALUE);
 
         assertThat(table.tableVersion(), is(2));
     }
 
+    private void addSomeColumn() {
+        assertThat(manager.execute(addColumnParams(TABLE_NAME, 
columnParams("val2", INT32))), willCompleteSuccessfully());
+    }
+
     @Test
     public void dropColumnIncrementsTableVersion() {
         createSomeTable(TABLE_NAME);
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
index 1f4af67d6f..3a06284c77 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
@@ -41,7 +41,7 @@ import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -106,7 +106,7 @@ public class ItTxDistributedCleanupRecoveryTest extends 
ItTxDistributedTestSingl
                     TxStateStorage txStateStorage,
                     TransactionStateResolver transactionStateResolver,
                     StorageUpdateHandler storageUpdateHandler,
-                    Schemas schemas,
+                    ValidationSchemasSource validationSchemasSource,
                     ClusterNode localNode,
                     SchemaSyncService schemaSyncService,
                     CatalogService catalogService,
@@ -128,7 +128,7 @@ public class ItTxDistributedCleanupRecoveryTest extends 
ItTxDistributedTestSingl
                         txStateStorage,
                         transactionStateResolver,
                         storageUpdateHandler,
-                        schemas,
+                        validationSchemasSource,
                         localNode,
                         schemaSyncService,
                         catalogService,
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index bffc3a32dd..cac9e1c778 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -46,7 +46,7 @@ import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
@@ -136,7 +136,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends ItTxDistribut
                     TxStateStorage txStateStorage,
                     TransactionStateResolver transactionStateResolver,
                     StorageUpdateHandler storageUpdateHandler,
-                    Schemas schemas,
+                    ValidationSchemasSource validationSchemasSource,
                     ClusterNode localNode,
                     SchemaSyncService schemaSyncService,
                     CatalogService catalogService,
@@ -158,7 +158,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends ItTxDistribut
                         txStateStorage,
                         transactionStateResolver,
                         storageUpdateHandler,
-                        schemas,
+                        validationSchemasSource,
                         localNode,
                         schemaSyncService,
                         catalogService,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index fe1f5e0802..6d2e40f546 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -147,7 +147,7 @@ import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.Outgo
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.SnapshotAwarePartitionDataStorage;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
-import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
+import 
org.apache.ignite.internal.table.distributed.schema.CatalogValidationSchemasSource;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
@@ -893,7 +893,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 txStatePartitionStorage,
                 transactionStateResolver,
                 partitionUpdateHandlers.storageUpdateHandler,
-                new NonHistoricSchemas(schemaManager, schemaSyncService),
+                new CatalogValidationSchemasSource(catalogService, 
schemaManager),
                 localNode(),
                 schemaSyncService,
                 catalogService,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 1ca4fd8b1e..2d15d14a35 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -140,7 +140,7 @@ import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWrit
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSwapRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.tx.Lock;
 import org.apache.ignite.internal.tx.LockKey;
 import org.apache.ignite.internal.tx.LockManager;
@@ -238,7 +238,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     /** Cleanup futures. */
     private final ConcurrentHashMap<RowId, CompletableFuture<?>> rowCleanupMap 
= new ConcurrentHashMap<>();
 
-    private final SchemaCompatValidator schemaCompatValidator;
+    private final SchemaCompatibilityValidator schemaCompatValidator;
 
     /** Instance of the local node. */
     private final ClusterNode localNode;
@@ -300,7 +300,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             TxStateStorage txStateStorage,
             TransactionStateResolver transactionStateResolver,
             StorageUpdateHandler storageUpdateHandler,
-            Schemas schemas,
+            ValidationSchemasSource validationSchemasSource,
             ClusterNode localNode,
             SchemaSyncService schemaSyncService,
             CatalogService catalogService,
@@ -328,7 +328,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         cursors = new 
ConcurrentSkipListMap<>(IgniteUuid.globalOrderComparator());
 
-        schemaCompatValidator = new SchemaCompatValidator(schemas, 
catalogService);
+        schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
 
         placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
this::onPrimaryElected);
         placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
this::onPrimaryExpired);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidator.java
similarity index 85%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
rename to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidator.java
index 8aef07a5e6..724cf28f1a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidator.java
@@ -24,26 +24,38 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.tx.TransactionIds;
 
 /**
  * Validates schema compatibility.
  */
-class SchemaCompatValidator {
-    private final Schemas schemas;
+class SchemaCompatibilityValidator {
+    private final ValidationSchemasSource validationSchemasSource;
     private final CatalogService catalogService;
+    private final SchemaSyncService schemaSyncService;
+
+    // TODO: Remove entries from cache when compacting schemas in 
SchemaManager https://issues.apache.org/jira/browse/IGNITE-20789
+    private final ConcurrentMap<TableDefinitionDiffKey, TableDefinitionDiff> 
diffCache = new ConcurrentHashMap<>();
 
     /** Constructor. */
-    SchemaCompatValidator(Schemas schemas, CatalogService catalogService) {
-        this.schemas = schemas;
+    SchemaCompatibilityValidator(
+            ValidationSchemasSource validationSchemasSource,
+            CatalogService catalogService,
+            SchemaSyncService schemaSyncService
+    ) {
+        this.validationSchemasSource = validationSchemasSource;
         this.catalogService = catalogService;
+        this.schemaSyncService = schemaSyncService;
     }
 
     /**
@@ -72,7 +84,7 @@ class SchemaCompatValidator {
         // so we don't need to account for clock skew.
         assert commitTimestamp.compareTo(beginTimestamp) > 0;
 
-        return schemas.waitForSchemasAvailability(commitTimestamp)
+        return schemaSyncService.waitForMetadataCompleteness(commitTimestamp)
                 .thenApply(ignored -> validateCommit(tableIds, 
commitTimestamp, beginTimestamp));
     }
 
@@ -116,7 +128,7 @@ class SchemaCompatValidator {
             HybridTimestamp commitTimestamp,
             int tableId
     ) {
-        List<FullTableSchema> tableSchemas = 
schemas.tableSchemaVersionsBetween(tableId, beginTimestamp, commitTimestamp);
+        List<FullTableSchema> tableSchemas = 
validationSchemasSource.tableSchemaVersionsBetween(tableId, beginTimestamp, 
commitTimestamp);
 
         assert !tableSchemas.isEmpty();
 
@@ -132,7 +144,10 @@ class SchemaCompatValidator {
     }
 
     private boolean isForwardCompatible(FullTableSchema prevSchema, 
FullTableSchema nextSchema) {
-        TableDefinitionDiff diff = nextSchema.diffFrom(prevSchema);
+        TableDefinitionDiff diff = diffCache.computeIfAbsent(
+                new TableDefinitionDiffKey(prevSchema.tableId(), 
prevSchema.schemaVersion(), nextSchema.schemaVersion()),
+                key -> nextSchema.diffFrom(prevSchema)
+        );
 
         // TODO: IGNITE-19229 - more sophisticated logic.
         return diff.isEmpty();
@@ -156,8 +171,8 @@ class SchemaCompatValidator {
     CompletableFuture<CompatValidationResult> validateBackwards(int 
tupleSchemaVersion, int tableId, UUID txId) {
         HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
 
-        return schemas.waitForSchemasAvailability(beginTimestamp)
-                .thenCompose(ignored -> 
schemas.waitForSchemaAvailability(tableId, tupleSchemaVersion))
+        return schemaSyncService.waitForMetadataCompleteness(beginTimestamp)
+                .thenCompose(ignored -> 
validationSchemasSource.waitForSchemaAvailability(tableId, tupleSchemaVersion))
                 .thenApply(ignored -> 
validateBackwardSchemaCompatibility(tupleSchemaVersion, tableId, 
beginTimestamp));
     }
 
@@ -166,7 +181,11 @@ class SchemaCompatValidator {
             int tableId,
             HybridTimestamp beginTimestamp
     ) {
-        List<FullTableSchema> tableSchemas = 
schemas.tableSchemaVersionsBetween(tableId, beginTimestamp, tupleSchemaVersion);
+        List<FullTableSchema> tableSchemas = 
validationSchemasSource.tableSchemaVersionsBetween(
+                tableId,
+                beginTimestamp,
+                tupleSchemaVersion
+        );
 
         if (tableSchemas.isEmpty()) {
             // The tuple was not written with a future schema.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TableDefinitionDiffKey.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TableDefinitionDiffKey.java
new file mode 100644
index 0000000000..9d0daded3d
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TableDefinitionDiffKey.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+/**
+ * Key for table definitions diff.
+ */
+class TableDefinitionDiffKey {
+    private final int tableId;
+    private final int fromSchemaVersion;
+    private final int toSchemaVersion;
+
+    TableDefinitionDiffKey(int tableId, int fromSchemaVersion, int 
toSchemaVersion) {
+        this.tableId = tableId;
+        this.fromSchemaVersion = fromSchemaVersion;
+        this.toSchemaVersion = toSchemaVersion;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        TableDefinitionDiffKey that = (TableDefinitionDiffKey) o;
+
+        if (tableId != that.tableId) {
+            return false;
+        }
+        if (fromSchemaVersion != that.fromSchemaVersion) {
+            return false;
+        }
+        return toSchemaVersion == that.toSchemaVersion;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = tableId;
+        result = 31 * result + fromSchemaVersion;
+        result = 31 * result + toSchemaVersion;
+        return result;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java
new file mode 100644
index 0000000000..e0ca393a60
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaManager;
+
+/**
+ * An implementation over {@link CatalogService}.
+ */
+public class CatalogValidationSchemasSource implements ValidationSchemasSource 
{
+    private final CatalogService catalogService;
+
+    private final SchemaManager schemaManager;
+
+    private final ConcurrentMap<CatalogVersionsSpan, List<FullTableSchema>> 
catalogVersionSpansCache = new ConcurrentHashMap<>();
+
+    // TODO: Remove entries from cache when compacting Catalog 
https://issues.apache.org/jira/browse/IGNITE-20790
+    // TODO: Remove entries from cache when compacting schemas in 
SchemaManager https://issues.apache.org/jira/browse/IGNITE-20789
+    private final ConcurrentMap<CatalogVersionToTableVersionSpan, 
List<FullTableSchema>> catalogVersionToTableVersionSpansCache
+            = new ConcurrentHashMap<>();
+
+    /** Constructor. */
+    public CatalogValidationSchemasSource(CatalogService catalogService, 
SchemaManager schemaManager) {
+        this.catalogService = catalogService;
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> waitForSchemaAvailability(int tableId, int 
schemaVersion) {
+        return schemaManager.schemaRegistry(tableId)
+                .schemaAsync(schemaVersion)
+                .thenApply(unused -> null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        // It is safe to access the Catalog as the caller must have already 
waited till the Catalog is up-to-date with the timestamps.
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+        int toCatalogVersion = 
catalogService.activeCatalogVersion(toIncluding.longValue());
+
+        return catalogVersionSpansCache.computeIfAbsent(
+                new CatalogVersionsSpan(tableId, fromCatalogVersion, 
toCatalogVersion),
+                key -> tableSchemaVersionsBetweenCatalogVersions(tableId, 
fromCatalogVersion, toCatalogVersion)
+        );
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, int toTableVersionIncluding) {
+        // It is safe to access the Catalog as the caller must have already 
waited till the Catalog is up-to-date.
+        int fromCatalogVersion = 
catalogService.activeCatalogVersion(fromIncluding.longValue());
+
+        return catalogVersionToTableVersionSpansCache.computeIfAbsent(
+                new CatalogVersionToTableVersionSpan(tableId, 
fromCatalogVersion, toTableVersionIncluding),
+                key -> 
tableSchemaVersionsBetweenCatalogAndTableVersions(tableId, fromCatalogVersion, 
toTableVersionIncluding)
+        );
+    }
+
+    private List<FullTableSchema> 
tableSchemaVersionsBetweenCatalogVersions(int tableId, int fromCatalogVersion, 
int toCatalogVersion) {
+        return tableVersionsBetween(tableId, fromCatalogVersion, 
toCatalogVersion)
+                
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
+                .collect(toList());
+    }
+
+    // It's ok to use Stream as the results of the methods that call this are 
cached.
+    private Stream<CatalogTableDescriptor> tableVersionsBetween(
+            int tableId,
+            int fromCatalogVersionIncluding,
+            int toCatalogVersionIncluding
+    ) {
+        return IntStream.rangeClosed(fromCatalogVersionIncluding, 
toCatalogVersionIncluding)
+                .mapToObj(catalogVersion -> catalogService.table(tableId, 
catalogVersion))
+                .filter(new Predicate<>() {
+                    int prevVersion = Integer.MIN_VALUE;
+
+                    @Override
+                    public boolean test(CatalogTableDescriptor 
tableDescriptor) {
+                        if (tableDescriptor.tableVersion() == prevVersion) {
+                            return false;
+                        }
+
+                        assert prevVersion == Integer.MIN_VALUE || 
tableDescriptor.tableVersion() == prevVersion + 1
+                                : String.format("Table version is expected to 
be prevVersion+1, but version is %d and prevVersion is %d",
+                                        tableDescriptor.tableVersion(), 
prevVersion);
+
+                        prevVersion = tableDescriptor.tableVersion();
+
+                        return true;
+                    }
+                });
+    }
+
+    private List<FullTableSchema> 
tableSchemaVersionsBetweenCatalogAndTableVersions(
+            int tableId,
+            int fromCatalogVersion,
+            int toTableVersion
+    ) {
+        return tableVersionsBetween(tableId, fromCatalogVersion, 
catalogService.latestCatalogVersion())
+                .takeWhile(tableDescriptor -> tableDescriptor.tableVersion() 
<= toTableVersion)
+                
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
+                .collect(toList());
+    }
+
+    private static FullTableSchema 
fullSchemaFromTableDescriptor(CatalogTableDescriptor tableDescriptor) {
+        return new FullTableSchema(
+                tableDescriptor.tableVersion(),
+                tableDescriptor.id(),
+                tableDescriptor.columns()
+        );
+    }
+
+    private static class CatalogVersionsSpan {
+        private final int tableId;
+        private final int fromCatalogVersion;
+        private final int toCatalogVersion;
+
+        private CatalogVersionsSpan(int tableId, int fromCatalogVersion, int 
toCatalogVersion) {
+            this.tableId = tableId;
+            this.fromCatalogVersion = fromCatalogVersion;
+            this.toCatalogVersion = toCatalogVersion;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            CatalogVersionsSpan that = (CatalogVersionsSpan) o;
+
+            if (tableId != that.tableId) {
+                return false;
+            }
+            if (fromCatalogVersion != that.fromCatalogVersion) {
+                return false;
+            }
+            return toCatalogVersion == that.toCatalogVersion;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = tableId;
+            result = 31 * result + fromCatalogVersion;
+            result = 31 * result + toCatalogVersion;
+            return result;
+        }
+    }
+
+    private static class CatalogVersionToTableVersionSpan {
+        private final int tableId;
+        private final int fromCatalogVersion;
+        private final int toTableVersion;
+
+        private CatalogVersionToTableVersionSpan(int tableId, int 
fromCatalogVersion, int toTableVersion) {
+            this.tableId = tableId;
+            this.fromCatalogVersion = fromCatalogVersion;
+            this.toTableVersion = toTableVersion;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            CatalogVersionToTableVersionSpan that = 
(CatalogVersionToTableVersionSpan) o;
+
+            if (tableId != that.tableId) {
+                return false;
+            }
+            if (fromCatalogVersion != that.fromCatalogVersion) {
+                return false;
+            }
+            return toTableVersion == that.toTableVersion;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = tableId;
+            result = 31 * result + fromCatalogVersion;
+            result = 31 * result + toTableVersion;
+            return result;
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java
index 597e12f926..630d3716a7 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java
@@ -28,12 +28,12 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.function.Function;
-import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
 
 /**
- * Represents a full table schema: that is, the definition of the table and 
all objects (indexes, constraints, etc)
- * that belong to the table.
+ * Represents a full table schema: that is, the definition of the table and 
all objects (constraints, etc)
+ * that belong to the table *that might affect schema compatibility* (so, 
indices are not included as they
+ * don't affect such compatibility).
  */
 public class FullTableSchema {
     private final int schemaVersion;
@@ -41,21 +41,13 @@ public class FullTableSchema {
 
     private final List<CatalogTableColumnDescriptor> columns;
 
-    private final List<CatalogIndexDescriptor> indexes;
-
     /**
      * Constructor.
      */
-    public FullTableSchema(
-            int schemaVersion,
-            int tableId,
-            List<CatalogTableColumnDescriptor> columns,
-            List<CatalogIndexDescriptor> indexes
-    ) {
+    public FullTableSchema(int schemaVersion, int tableId, 
List<CatalogTableColumnDescriptor> columns) {
         this.schemaVersion = schemaVersion;
         this.tableId = tableId;
-        this.columns = columns;
-        this.indexes = indexes;
+        this.columns = List.copyOf(columns);
     }
 
     /**
@@ -85,15 +77,6 @@ public class FullTableSchema {
         return columns;
     }
 
-    /**
-     * Returns definitions of indexes belonging to the table.
-     *
-     * @return Definitions of indexes belonging to the table.
-     */
-    public List<CatalogIndexDescriptor> indexes() {
-        return indexes;
-    }
-
     /**
      * Computes a diff between this and a previous schema.
      *
@@ -118,13 +101,7 @@ public class FullTableSchema {
             }
         }
 
-        Map<String, CatalogIndexDescriptor> prevIndexesByName = 
toMapByName(prevSchema.indexes, CatalogIndexDescriptor::name);
-        Map<String, CatalogIndexDescriptor> thisIndexesByName = 
toMapByName(this.indexes, CatalogIndexDescriptor::name);
-
-        List<CatalogIndexDescriptor> addedIndexes = 
subtractKeyed(thisIndexesByName, prevIndexesByName);
-        List<CatalogIndexDescriptor> removedIndexes = 
subtractKeyed(prevIndexesByName, thisIndexesByName);
-
-        return new TableDefinitionDiff(addedColumns, removedColumns, 
changedColumns, addedIndexes, removedIndexes);
+        return new TableDefinitionDiff(addedColumns, removedColumns, 
changedColumns);
     }
 
     private static <T> Map<String, T> toMapByName(List<T> elements, 
Function<T, String> nameExtractor) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/TableDefinitionDiff.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/TableDefinitionDiff.java
index 28da611570..b4faf5ac64 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/TableDefinitionDiff.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/TableDefinitionDiff.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.table.distributed.schema;
 import static java.util.Collections.emptyList;
 
 import java.util.List;
-import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
 
 /**
@@ -28,16 +27,13 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescript
  */
 public class TableDefinitionDiff {
     private static final TableDefinitionDiff EMPTY = new TableDefinitionDiff(
-            emptyList(), emptyList(), emptyList(), emptyList(), emptyList()
+            emptyList(), emptyList(), emptyList()
     );
 
     private final List<CatalogTableColumnDescriptor> addedColumns;
     private final List<CatalogTableColumnDescriptor> removedColumns;
     private final List<ColumnDefinitionDiff> changedColumns;
 
-    private final List<CatalogIndexDescriptor> addedIndexes;
-    private final List<CatalogIndexDescriptor> removedIndexes;
-
     // TODO: IGNITE-19229 - other change types
 
     /**
@@ -55,15 +51,11 @@ public class TableDefinitionDiff {
     public TableDefinitionDiff(
             List<CatalogTableColumnDescriptor> addedColumns,
             List<CatalogTableColumnDescriptor> removedColumns,
-            List<ColumnDefinitionDiff> changedColumns,
-            List<CatalogIndexDescriptor> addedIndexes,
-            List<CatalogIndexDescriptor> removedIndexes
+            List<ColumnDefinitionDiff> changedColumns
     ) {
         this.addedColumns = List.copyOf(addedColumns);
         this.removedColumns = List.copyOf(removedColumns);
         this.changedColumns = List.copyOf(changedColumns);
-        this.addedIndexes = List.copyOf(addedIndexes);
-        this.removedIndexes = List.copyOf(removedIndexes);
     }
 
     /**
@@ -87,20 +79,6 @@ public class TableDefinitionDiff {
         return changedColumns;
     }
 
-    /**
-     * Returns indexes that were added.
-     */
-    public List<CatalogIndexDescriptor> addedIndexes() {
-        return addedIndexes;
-    }
-
-    /**
-     * Returns indexes that were removed.
-     */
-    public List<CatalogIndexDescriptor> removedIndexes() {
-        return removedIndexes;
-    }
-
     /**
      * Returns whether this diff is empty (so no difference at all).
      *
@@ -109,8 +87,6 @@ public class TableDefinitionDiff {
     public boolean isEmpty() {
         return addedColumns.isEmpty()
                 && removedColumns.isEmpty()
-                && changedColumns.isEmpty()
-                && addedIndexes.isEmpty()
-                && removedIndexes.isEmpty();
+                && changedColumns.isEmpty();
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/Schemas.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ValidationSchemasSource.java
similarity index 73%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/Schemas.java
rename to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ValidationSchemasSource.java
index 54ff6d659a..6f2cc1dc14 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/Schemas.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ValidationSchemasSource.java
@@ -20,33 +20,28 @@ package org.apache.ignite.internal.table.distributed.schema;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaManager;
 
 /**
  * Provides access to table schemas.
  */
-public interface Schemas {
-    /**
-     * Obtains a future that completes when all schemas activating not later 
than the given timestamp are available.
-     *
-     * @param ts Timestamp we are interested in. This is the timestamp 
transaction processing logic is interested in (like beginTs or
-     *     commitTs), not the timestamp after subtraction described in section 
'Waiting for safe time in the past' of
-     *     <a 
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-98:+Schema+Synchronization";>IEP-98</a>
-     * @return Future that completes when all schemas activating not later 
than the given timestamp are available.
-     */
-    CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts);
-
+public interface ValidationSchemasSource {
     /**
      * Obtains a future that completes when the given schema version becomes 
available.
      *
+     * <p>Must only be called when it's guaranteed that the table exists from 
the point of view of {@link SchemaManager}.
+     *
      * @param tableId ID of the table of interest.
      * @param schemaVersion ID of the schema version.
      * @return Future that completes when the given schema version becomes 
available.
      */
-    CompletableFuture<?> waitForSchemaAvailability(int tableId, int 
schemaVersion);
+    CompletableFuture<Void> waitForSchemaAvailability(int tableId, int 
schemaVersion);
 
     /**
      * Returns all schema versions between (including) the two that were 
effective at the given timestamps.
      *
+     * <p>For both timestamps, schemas-related metadata must be complete, see 
{@link SchemaSyncService}.
+     *
      * @param tableId ID of the table which schemas need to be considered.
      * @param fromIncluding Start timestamp.
      * @param toIncluding End timestamp.
@@ -59,10 +54,12 @@ public interface Schemas {
      * the one identified by a schema version ID. If the starting schema (the 
one effective at fromIncluding)
      * is actually a later schema than the one identified by toIncluding, then 
an empty list is returned.
      *
+     * <p>For both fromIncluding and toIncluding, schemas-related metadata 
must be complete.
+     *
      * @param tableId ID of the table which schemas need to be considered.
      * @param fromIncluding Start timestamp.
-     * @param toIncluding End schema version ID.
+     * @param toTableVersionIncluding End schema version ID.
      * @return All schema versions between (including) the given timestamp and 
schema version.
      */
-    List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, int toIncluding);
+    List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, int toTableVersionIncluding);
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/package-info.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/package-info.java
deleted file mode 100644
index 1f92efab55..0000000000
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This is a temporary package to host code needed until CatalogService is 
ready
- * TODO: IGNITE-19447 - remove/rework when switched to full-blown usage of 
CatalogService.
- */
-package org.apache.ignite.internal.table.distributed.schema;
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
index 9a75e90c77..6e456195c3 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
@@ -55,7 +55,7 @@ import 
org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxMeta;
@@ -136,7 +136,7 @@ public class PartitionReplicaListenerDurableUnlockTest 
extends IgniteAbstractTes
                 txStateStorage,
                 mock(TransactionStateResolver.class),
                 mock(StorageUpdateHandler.class),
-                mock(Schemas.class),
+                mock(ValidationSchemasSource.class),
                 LOCAL_NODE,
                 mock(SchemaSyncService.class),
                 mock(CatalogService.class),
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index d56eb38cf1..17379ef941 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -91,7 +91,7 @@ import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestTyp
 import 
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
-import org.apache.ignite.internal.table.impl.DummySchemas;
+import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tx.Lock;
 import org.apache.ignite.internal.tx.LockManager;
@@ -244,7 +244,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                         indexUpdateHandler,
                         new GcUpdateHandler(partitionDataStorage, safeTime, 
indexUpdateHandler)
                 ),
-                new DummySchemas(schemaManager),
+                new DummyValidationSchemasSource(schemaManager),
                 localNode,
                 new AlwaysSyncedSchemaSyncService(),
                 catalogService,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 462ee6ba38..dd830a0052 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -152,7 +152,7 @@ import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestTyp
 import 
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
 import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -297,7 +297,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private PendingComparableValuesTracker<HybridTimestamp, Void> 
safeTimeClock;
 
     @Mock
-    private Schemas schemas;
+    private ValidationSchemasSource validationSchemasSource;
 
     @Spy
     private final SchemaSyncService schemaSyncService = new 
AlwaysSyncedSchemaSyncService();
@@ -393,8 +393,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         when(safeTimeClock.waitFor(any())).thenReturn(completedFuture(null));
         when(safeTimeClock.current()).thenReturn(HybridTimestamp.MIN_VALUE);
 
-        
when(schemas.waitForSchemasAvailability(any())).thenReturn(completedFuture(null));
-        when(schemas.waitForSchemaAvailability(anyInt(), 
anyInt())).thenReturn(completedFuture(null));
+        when(validationSchemasSource.waitForSchemaAvailability(anyInt(), 
anyInt())).thenReturn(completedFuture(null));
 
         lenient().when(catalogService.table(anyInt(), 
anyLong())).thenReturn(tableDescriptor);
 
@@ -490,7 +489,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                         indexUpdateHandler,
                         new GcUpdateHandler(partitionDataStorage, 
safeTimeClock, indexUpdateHandler)
                 ),
-                schemas,
+                validationSchemasSource,
                 localNode,
                 schemaSyncService,
                 catalogService,
@@ -1463,7 +1462,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
 
     @Test
     public void commitsOnSameSchemaSuccessfully() {
-        when(schemas.tableSchemaVersionsBetween(anyInt(), any(), 
any(HybridTimestamp.class)))
+        when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), 
any(), any(HybridTimestamp.class)))
                 .thenReturn(List.of(
                         tableSchema(CURRENT_SCHEMA_VERSION, 
List.of(nullableColumn("col")))
                 ));
@@ -1486,7 +1485,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     }
 
     private static FullTableSchema tableSchema(int schemaVersion, 
List<CatalogTableColumnDescriptor> columns) {
-        return new FullTableSchema(schemaVersion, 1, columns, List.of());
+        return new FullTableSchema(schemaVersion, 1, columns);
     }
 
     private AtomicReference<Boolean> interceptFinishTxCommand() {
@@ -1525,7 +1524,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     @Test
     @Disabled("IGNITE-19229")
     public void commitsOnCompatibleSchemaChangeSuccessfully() {
-        when(schemas.tableSchemaVersionsBetween(anyInt(), any(), 
any(HybridTimestamp.class)))
+        when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), 
any(), any(HybridTimestamp.class)))
                 .thenReturn(List.of(
                         tableSchema(CURRENT_SCHEMA_VERSION, 
List.of(nullableColumn("col1"))),
                         tableSchema(FUTURE_SCHEMA_VERSION, 
List.of(nullableColumn("col1"), nullableColumn("col2")))
@@ -1557,12 +1556,12 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     }
 
     private void simulateForwardIncompatibleSchemaChange(int 
fromSchemaVersion, int toSchemaVersion) {
-        when(schemas.tableSchemaVersionsBetween(anyInt(), any(), 
any(HybridTimestamp.class)))
+        when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), 
any(), any(HybridTimestamp.class)))
                 .thenReturn(incompatibleSchemaVersions(fromSchemaVersion, 
toSchemaVersion));
     }
 
     private void simulateBackwardIncompatibleSchemaChange(int 
fromSchemaVersion, int toSchemaVersion) {
-        when(schemas.tableSchemaVersionsBetween(anyInt(), any(), anyInt()))
+        when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), 
any(), anyInt()))
                 .thenReturn(incompatibleSchemaVersions(fromSchemaVersion, 
toSchemaVersion));
     }
 
@@ -2214,7 +2213,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
             Set<ReplicationGroupId> groups,
             int tableToBeDroppedId
     ) {
-        when(schemas.tableSchemaVersionsBetween(anyInt(), any(), 
any(HybridTimestamp.class)))
+        when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), 
any(), any(HybridTimestamp.class)))
                 .thenReturn(List.of(
                         tableSchema(CURRENT_SCHEMA_VERSION, 
List.of(nullableColumn("col")))
                 ));
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java
new file mode 100644
index 0000000000..e00425deb4
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static 
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.mockito.AdditionalMatchers.geq;
+import static org.mockito.AdditionalMatchers.lt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class CatalogValidationSchemasSourceTest extends BaseIgniteAbstractTest {
+    @Mock
+    private CatalogService catalogService;
+
+    @Mock
+    private SchemaManager schemaManager;
+
+    @Mock
+    private SchemaRegistry schemaRegistry;
+
+    @InjectMocks
+    private CatalogValidationSchemasSource schemas;
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    @Test
+    void waitingForSchemaAvailabilityAtVersionWorks() {
+        int tableId = 1;
+        int version = 3;
+        CompletableFuture<SchemaDescriptor> underlyingFuture = new 
CompletableFuture<>();
+
+        doReturn(schemaRegistry).when(schemaManager).schemaRegistry(tableId);
+        when(schemaRegistry.schemaAsync(version)).thenReturn(underlyingFuture);
+
+        CompletableFuture<Void> future = 
schemas.waitForSchemaAvailability(tableId, version);
+        assertThat(future, is(not(completedFuture())));
+
+        underlyingFuture.complete(mock(SchemaDescriptor.class));
+
+        assertThat(future, is(completedFuture()));
+    }
+
+    @Test
+    void tableSchemaVersionsBetweenTimestampsWorks() {
+        int tableId = 1;
+
+        CatalogTableDescriptor version3 = tableVersion(tableId, 3);
+        CatalogTableDescriptor version4 = tableVersion(tableId, 4);
+
+        HybridTimestamp from = clock.now();
+        HybridTimestamp to = clock.now();
+
+        
when(catalogService.activeCatalogVersion(from.longValue())).thenReturn(3);
+        
when(catalogService.activeCatalogVersion(to.longValue())).thenReturn(4);
+        when(catalogService.table(tableId, 3)).thenReturn(version3);
+        when(catalogService.table(tableId, 4)).thenReturn(version4);
+
+        List<FullTableSchema> fullSchemas = 
schemas.tableSchemaVersionsBetween(tableId, from, to);
+
+        assertThat(fullSchemas, hasSize(2));
+        assertThat(fullSchemas.get(0).schemaVersion(), is(3));
+        assertThat(fullSchemas.get(1).schemaVersion(), is(4));
+    }
+
+    private static CatalogTableDescriptor tableVersion(int tableId, int 
tableVersion) {
+        List<CatalogTableColumnDescriptor> columns = List.of(
+                new CatalogTableColumnDescriptor("k1", ColumnType.INT16, 
false, 0, 0, 0, null),
+                new CatalogTableColumnDescriptor("v1", ColumnType.INT32, 
false, 0, 0, 0, null)
+        );
+
+        return new CatalogTableDescriptor(
+                tableId, -1, -1, "test", 0, tableVersion, columns, 
List.of("k1"), null, INITIAL_CAUSALITY_TOKEN
+        );
+    }
+
+    @Test
+    void tableSchemaVersionsBetweenTimestampsUsesCache() {
+        int tableId = 1;
+
+        CatalogTableDescriptor version3 = tableVersion(tableId, 3);
+        CatalogTableDescriptor version4 = tableVersion(tableId, 4);
+
+        HybridTimestamp timestamp = clock.now();
+
+        
when(catalogService.activeCatalogVersion(lt(timestamp.longValue()))).thenReturn(3);
+        
when(catalogService.activeCatalogVersion(geq(timestamp.longValue()))).thenReturn(4);
+        when(catalogService.table(tableId, 3)).thenReturn(version3);
+        when(catalogService.table(tableId, 4)).thenReturn(version4);
+
+        List<FullTableSchema> fullSchemas1 = 
schemas.tableSchemaVersionsBetween(tableId, timestamp.subtractPhysicalTime(2), 
timestamp);
+        List<FullTableSchema> fullSchemas2 = 
schemas.tableSchemaVersionsBetween(
+                tableId,
+                timestamp.subtractPhysicalTime(1),
+                timestamp.addPhysicalTime(10)
+        );
+
+        assertThat(fullSchemas1.size(), is(fullSchemas2.size()));
+
+        verify(catalogService, times(1)).table(tableId, 3);
+    }
+
+    @Test
+    void tableSchemaVersionsBetweenTimestampAndVersionWorks() {
+        int tableId = 1;
+
+        CatalogTableDescriptor version3 = tableVersion(tableId, 3);
+        CatalogTableDescriptor version4 = tableVersion(tableId, 4);
+        CatalogTableDescriptor version5 = tableVersion(tableId, 5);
+
+        HybridTimestamp from = clock.now();
+
+        when(catalogService.latestCatalogVersion()).thenReturn(5);
+        
when(catalogService.activeCatalogVersion(from.longValue())).thenReturn(3);
+        when(catalogService.table(tableId, 3)).thenReturn(version3);
+        when(catalogService.table(tableId, 4)).thenReturn(version4);
+        when(catalogService.table(tableId, 5)).thenReturn(version5);
+
+        List<FullTableSchema> fullSchemas = 
schemas.tableSchemaVersionsBetween(tableId, from, 4);
+
+        assertThat(fullSchemas, hasSize(2));
+        assertThat(fullSchemas.get(0).schemaVersion(), is(3));
+        assertThat(fullSchemas.get(1).schemaVersion(), is(4));
+    }
+
+    @Test
+    void 
tableSchemaVersionsBetweenTimestampAndVersionReturnsEmptyListIfEndIsBeforeStart()
 {
+        int tableId = 1;
+
+        CatalogTableDescriptor version3 = tableVersion(tableId, 3);
+
+        HybridTimestamp from = clock.now();
+
+        when(catalogService.latestCatalogVersion()).thenReturn(3);
+        
when(catalogService.activeCatalogVersion(from.longValue())).thenReturn(3);
+        when(catalogService.table(tableId, 3)).thenReturn(version3);
+
+        List<FullTableSchema> fullSchemas = 
schemas.tableSchemaVersionsBetween(tableId, from, 2);
+
+        assertThat(fullSchemas, is(empty()));
+    }
+
+    @Test
+    void tableSchemaVersionsBetweenTimestampAndVersionUsesCache() {
+        int tableId = 1;
+
+        CatalogTableDescriptor version3 = tableVersion(tableId, 3);
+        CatalogTableDescriptor version4 = tableVersion(tableId, 4);
+
+        HybridTimestamp timestamp = clock.now();
+
+        when(catalogService.latestCatalogVersion()).thenReturn(4);
+        when(catalogService.activeCatalogVersion(anyLong())).thenReturn(3);
+        when(catalogService.table(tableId, 3)).thenReturn(version3);
+        when(catalogService.table(tableId, 4)).thenReturn(version4);
+
+        List<FullTableSchema> fullSchemas1 = 
schemas.tableSchemaVersionsBetween(tableId, timestamp.subtractPhysicalTime(2), 
4);
+        List<FullTableSchema> fullSchemas2 = 
schemas.tableSchemaVersionsBetween(tableId, timestamp.subtractPhysicalTime(1), 
4);
+
+        assertThat(fullSchemas1.size(), is(fullSchemas2.size()));
+
+        verify(catalogService, times(1)).table(tableId, 3);
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchemaTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchemaTest.java
index 6347435d75..a172121ccb 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchemaTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchemaTest.java
@@ -24,8 +24,6 @@ import static org.hamcrest.Matchers.is;
 
 import java.util.List;
 import org.apache.ignite.internal.catalog.commands.DefaultValue;
-import 
org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
 import org.apache.ignite.sql.ColumnType;
 import org.junit.jupiter.api.Test;
@@ -34,20 +32,15 @@ class FullTableSchemaTest {
     @Test
     void sameSchemasHaveEmptyDiff() {
         CatalogTableColumnDescriptor column = someColumn("a");
-        CatalogIndexDescriptor index = someIndex(1, "ind_a");
 
-        var schema1 = new FullTableSchema(1, 1, List.of(column), 
List.of(index));
-        var schema2 = new FullTableSchema(2, 1, List.of(column), 
List.of(index));
+        var schema1 = new FullTableSchema(1, 1, List.of(column));
+        var schema2 = new FullTableSchema(2, 1, List.of(column));
 
         TableDefinitionDiff diff = schema2.diffFrom(schema1);
 
         assertThat(diff.isEmpty(), is(true));
     }
 
-    private static CatalogHashIndexDescriptor someIndex(int id, String name) {
-        return new CatalogHashIndexDescriptor(id, name, 1, true, List.of("a"), 
true);
-    }
-
     private static CatalogTableColumnDescriptor someColumn(String columnName) {
         return new CatalogTableColumnDescriptor(columnName, ColumnType.INT32, 
true, 0, 0, 0, DefaultValue.constant(null));
     }
@@ -58,8 +51,8 @@ class FullTableSchemaTest {
         CatalogTableColumnDescriptor column2 = someColumn("b");
         CatalogTableColumnDescriptor column3 = someColumn("c");
 
-        var schema1 = new FullTableSchema(1, 1, List.of(column1, column2), 
List.of());
-        var schema2 = new FullTableSchema(2, 1, List.of(column2, column3), 
List.of());
+        var schema1 = new FullTableSchema(1, 1, List.of(column1, column2));
+        var schema2 = new FullTableSchema(2, 1, List.of(column2, column3));
 
         TableDefinitionDiff diff = schema2.diffFrom(schema1);
 
@@ -73,10 +66,9 @@ class FullTableSchemaTest {
     void changedColumnsAreReflectedInDiff() {
         CatalogTableColumnDescriptor column1 = someColumn("a");
 
-        var schema1 = new FullTableSchema(1, 1, List.of(column1), List.of());
+        var schema1 = new FullTableSchema(1, 1, List.of(column1));
         var schema2 = new FullTableSchema(2, 1,
-                List.of(new CatalogTableColumnDescriptor("a", 
ColumnType.STRING, true, 0, 0, 10, DefaultValue.constant(null))),
-                List.of()
+                List.of(new CatalogTableColumnDescriptor("a", 
ColumnType.STRING, true, 0, 0, 10, DefaultValue.constant(null)))
         );
 
         TableDefinitionDiff diff = schema2.diffFrom(schema1);
@@ -86,20 +78,4 @@ class FullTableSchemaTest {
         List<ColumnDefinitionDiff> changedColumns = diff.changedColumns();
         assertThat(changedColumns, is(hasSize(1)));
     }
-
-    @Test
-    void addedRemovedIndexesAreReflectedInDiff() {
-        CatalogIndexDescriptor index1 = someIndex(1, "a");
-        CatalogIndexDescriptor index2 = someIndex(2, "b");
-        CatalogIndexDescriptor index3 = someIndex(3, "c");
-
-        var schema1 = new FullTableSchema(1, 1, List.of(someColumn("a")), 
List.of(index1, index2));
-        var schema2 = new FullTableSchema(2, 1, List.of(someColumn("a")), 
List.of(index2, index3));
-
-        TableDefinitionDiff diff = schema2.diffFrom(schema1);
-
-        assertThat(diff.isEmpty(), is(false));
-        assertThat(diff.addedIndexes(), is(List.of(index3)));
-        assertThat(diff.removedIndexes(), is(List.of(index1)));
-    }
 }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index ee0b5ee172..2fcabab70e 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -110,11 +110,11 @@ import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateR
 import 
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
 import 
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
-import org.apache.ignite.internal.table.impl.DummySchemas;
+import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.TxManager;
@@ -546,7 +546,7 @@ public class ItTxTestCluster {
                                         txStateStorage,
                                         transactionStateResolver,
                                         storageUpdateHandler,
-                                        new DummySchemas(schemaManager),
+                                        new 
DummyValidationSchemasSource(schemaManager),
                                         consistentIdToNode.apply(assignment),
                                         new AlwaysSyncedSchemaSyncService(),
                                         catalogService,
@@ -642,7 +642,7 @@ public class ItTxTestCluster {
             TxStateStorage txStateStorage,
             TransactionStateResolver transactionStateResolver,
             StorageUpdateHandler storageUpdateHandler,
-            Schemas schemas,
+            ValidationSchemasSource validationSchemasSource,
             ClusterNode localNode,
             SchemaSyncService schemaSyncService,
             CatalogService catalogService,
@@ -664,7 +664,7 @@ public class ItTxTestCluster {
                 txStateStorage,
                 transactionStateResolver,
                 storageUpdateHandler,
-                schemas,
+                validationSchemasSource,
                 localNode,
                 schemaSyncService,
                 catalogService,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 41800c140c..da2716615f 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -383,7 +383,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 txStateStorage().getOrCreateTxStateStorage(PART_ID),
                 transactionStateResolver,
                 storageUpdateHandler,
-                new DummySchemas(schemaManager),
+                new DummyValidationSchemasSource(schemaManager),
                 LOCAL_NODE,
                 new AlwaysSyncedSchemaSyncService(),
                 catalogService,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemas.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemas.java
deleted file mode 100644
index 01f6525932..0000000000
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemas.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.impl;
-
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.stream.Collectors.toList;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
-import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
-
-/**
- * Dummy {@link Schemas} implementation that is not historic and always uses 
same {@link SchemaRegistry}.
- */
-public class DummySchemas implements Schemas {
-    private final SchemaRegistry schemaRegistry;
-
-    public DummySchemas(SchemaRegistry schemaRegistry) {
-        this.schemaRegistry = schemaRegistry;
-    }
-
-    @Override
-    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) 
{
-        return completedFuture(null);
-    }
-
-    @Override
-    public CompletableFuture<?> waitForSchemaAvailability(int tableId, int 
schemaVersion) {
-        return completedFuture(null);
-    }
-
-    @Override
-    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
-        SchemaDescriptor schemaDescriptor = schemaRegistry.lastKnownSchema();
-
-        List<CatalogTableColumnDescriptor> columns = 
schemaDescriptor.columnNames().stream()
-                .map(colName -> {
-                    Column column = schemaDescriptor.column(colName);
-
-                    assert column != null;
-
-                    return NonHistoricSchemas.columnDescriptor(column);
-                })
-                .collect(toList());
-
-        var fullSchema = new FullTableSchema(
-                1,
-                1,
-                columns,
-                List.of()
-        );
-
-        return List.of(fullSchema);
-    }
-
-    @Override
-    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, int toIncluding) {
-        // Returning an empty list makes sure that backward validation never 
fails, which is what we want before
-        // we switch to CatalogService completely.
-        return List.of();
-    }
-}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyValidationSchemasSource.java
similarity index 76%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java
rename to 
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyValidationSchemasSource.java
index c6eb5f38c8..4d171a88fc 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyValidationSchemasSource.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.schema;
+package org.apache.ignite.internal.table.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
@@ -29,7 +29,9 @@ import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.DefaultValueProvider;
 import 
org.apache.ignite.internal.schema.DefaultValueProvider.FunctionalValueProvider;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import 
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.type.BitmaskNativeType;
 import org.apache.ignite.internal.type.DecimalNativeType;
 import org.apache.ignite.internal.type.NativeType;
@@ -37,41 +39,23 @@ import org.apache.ignite.internal.type.TemporalNativeType;
 import org.apache.ignite.internal.type.VarlenNativeType;
 
 /**
- * A dummy implementation over {@link SchemaManager}. It is dummy because:
- *
- * <ul>
- *     <li>It imitates historicity, but always takes the latest known 
schema</li>
- *     <li>{@link #tableSchemaVersionsBetween(int, HybridTimestamp, 
HybridTimestamp)} always returns a single schema to avoid
- *     validation failures</li>
- * </ul>
- *
- * <p>The point of this implementation is to allow the system work in the 
pre-SchemaSync fashion before the switch to CatalogService
- * is possible.
+ * Dummy {@link ValidationSchemasSource} implementation that is not historic 
and always uses same {@link SchemaRegistry}.
  */
-// TODO: IGNITE-19447 - remove when switched to the CatalogService
-public class NonHistoricSchemas implements Schemas {
-    private final SchemaManager schemaManager;
-
-    private final SchemaSyncService schemaSyncService;
+public class DummyValidationSchemasSource implements ValidationSchemasSource {
+    private final SchemaRegistry schemaRegistry;
 
-    public NonHistoricSchemas(SchemaManager schemaManager, SchemaSyncService 
schemaSyncService) {
-        this.schemaManager = schemaManager;
-        this.schemaSyncService = schemaSyncService;
-    }
-
-    @Override
-    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) 
{
-        return schemaSyncService.waitForMetadataCompleteness(ts);
+    public DummyValidationSchemasSource(SchemaRegistry schemaRegistry) {
+        this.schemaRegistry = schemaRegistry;
     }
 
     @Override
-    public CompletableFuture<?> waitForSchemaAvailability(int tableId, int 
schemaVersion) {
+    public CompletableFuture<Void> waitForSchemaAvailability(int tableId, int 
schemaVersion) {
         return completedFuture(null);
     }
 
     @Override
     public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
-        SchemaDescriptor schemaDescriptor = 
schemaManager.schemaRegistry(tableId).lastKnownSchema();
+        SchemaDescriptor schemaDescriptor = schemaRegistry.lastKnownSchema();
 
         List<CatalogTableColumnDescriptor> columns = 
schemaDescriptor.columnNames().stream()
                 .map(colName -> {
@@ -86,17 +70,15 @@ public class NonHistoricSchemas implements Schemas {
         var fullSchema = new FullTableSchema(
                 1,
                 1,
-                columns,
-                List.of()
+                columns
         );
 
         return List.of(fullSchema);
     }
 
     @Override
-    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, int toIncluding) {
-        // Returning an empty list makes sure that backward validation never 
fails, which is what we want before
-        // we switch to CatalogService completely.
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, int toTableVersionIncluding) {
+        // Returning an empty list makes sure that backward validation never 
fails, which is what we want.
         return List.of();
     }
 
@@ -107,7 +89,7 @@ public class NonHistoricSchemas implements Schemas {
      * @param column Column to convert.
      * @return Conversion result.
      */
-    public static CatalogTableColumnDescriptor columnDescriptor(Column column) 
{
+    private static CatalogTableColumnDescriptor columnDescriptor(Column 
column) {
         NativeType nativeType = column.type();
         int precision;
         int scale;

Reply via email to