This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a27ba28d53 IGNITE-19447 Switch schema validation to CatalogService
(#2787)
a27ba28d53 is described below
commit a27ba28d5366cf69cb2948c7a4e267a2cb7c63be
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Nov 6 17:22:46 2023 +0400
IGNITE-19447 Switch schema validation to CatalogService (#2787)
---
.../internal/catalog/CatalogManagerSelfTest.java | 6 +-
.../ItTxDistributedCleanupRecoveryTest.java | 6 +-
...xDistributedTestSingleNodeNoCleanupMessage.java | 6 +-
.../internal/table/distributed/TableManager.java | 4 +-
.../replicator/PartitionReplicaListener.java | 8 +-
...ator.java => SchemaCompatibilityValidator.java} | 41 ++--
.../replicator/TableDefinitionDiffKey.java | 61 ++++++
.../schema/CatalogValidationSchemasSource.java | 217 +++++++++++++++++++++
.../table/distributed/schema/FullTableSchema.java | 35 +---
.../distributed/schema/TableDefinitionDiff.java | 30 +--
.../{Schemas.java => ValidationSchemasSource.java} | 25 ++-
.../table/distributed/schema/package-info.java | 22 ---
.../PartitionReplicaListenerDurableUnlockTest.java | 4 +-
.../PartitionReplicaListenerIndexLockingTest.java | 4 +-
.../replication/PartitionReplicaListenerTest.java | 21 +-
.../schema/CatalogValidationSchemasSourceTest.java | 208 ++++++++++++++++++++
.../distributed/schema/FullTableSchemaTest.java | 36 +---
.../apache/ignite/distributed/ItTxTestCluster.java | 10 +-
.../table/impl/DummyInternalTableImpl.java | 2 +-
.../ignite/internal/table/impl/DummySchemas.java | 84 --------
.../table/impl/DummyValidationSchemasSource.java} | 48 ++---
21 files changed, 594 insertions(+), 284 deletions(-)
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index 9a170b444e..ed41ce3795 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -1586,13 +1586,17 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
public void addColumnIncrementsTableVersion() {
createSomeTable(TABLE_NAME);
- assertThat(manager.execute(addColumnParams(TABLE_NAME,
columnParams("val2", INT32))), willCompleteSuccessfully());
+ addSomeColumn();
CatalogTableDescriptor table = manager.table(TABLE_NAME,
Long.MAX_VALUE);
assertThat(table.tableVersion(), is(2));
}
+ private void addSomeColumn() {
+ assertThat(manager.execute(addColumnParams(TABLE_NAME,
columnParams("val2", INT32))), willCompleteSuccessfully());
+ }
+
@Test
public void dropColumnIncrementsTableVersion() {
createSomeTable(TABLE_NAME);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
index 1f4af67d6f..3a06284c77 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
@@ -41,7 +41,7 @@ import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -106,7 +106,7 @@ public class ItTxDistributedCleanupRecoveryTest extends
ItTxDistributedTestSingl
TxStateStorage txStateStorage,
TransactionStateResolver transactionStateResolver,
StorageUpdateHandler storageUpdateHandler,
- Schemas schemas,
+ ValidationSchemasSource validationSchemasSource,
ClusterNode localNode,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
@@ -128,7 +128,7 @@ public class ItTxDistributedCleanupRecoveryTest extends
ItTxDistributedTestSingl
txStateStorage,
transactionStateResolver,
storageUpdateHandler,
- schemas,
+ validationSchemasSource,
localNode,
schemaSyncService,
catalogService,
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index bffc3a32dd..cac9e1c778 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -46,7 +46,7 @@ import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
@@ -136,7 +136,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends ItTxDistribut
TxStateStorage txStateStorage,
TransactionStateResolver transactionStateResolver,
StorageUpdateHandler storageUpdateHandler,
- Schemas schemas,
+ ValidationSchemasSource validationSchemasSource,
ClusterNode localNode,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
@@ -158,7 +158,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage
extends ItTxDistribut
txStateStorage,
transactionStateResolver,
storageUpdateHandler,
- schemas,
+ validationSchemasSource,
localNode,
schemaSyncService,
catalogService,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index fe1f5e0802..6d2e40f546 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -147,7 +147,7 @@ import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.Outgo
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.SnapshotAwarePartitionDataStorage;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
-import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
+import
org.apache.ignite.internal.table.distributed.schema.CatalogValidationSchemasSource;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
@@ -893,7 +893,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
txStatePartitionStorage,
transactionStateResolver,
partitionUpdateHandlers.storageUpdateHandler,
- new NonHistoricSchemas(schemaManager, schemaSyncService),
+ new CatalogValidationSchemasSource(catalogService,
schemaManager),
localNode(),
schemaSyncService,
catalogService,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 1ca4fd8b1e..2d15d14a35 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -140,7 +140,7 @@ import
org.apache.ignite.internal.table.distributed.replication.request.ReadWrit
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSwapRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockKey;
import org.apache.ignite.internal.tx.LockManager;
@@ -238,7 +238,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
/** Cleanup futures. */
private final ConcurrentHashMap<RowId, CompletableFuture<?>> rowCleanupMap
= new ConcurrentHashMap<>();
- private final SchemaCompatValidator schemaCompatValidator;
+ private final SchemaCompatibilityValidator schemaCompatValidator;
/** Instance of the local node. */
private final ClusterNode localNode;
@@ -300,7 +300,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
TxStateStorage txStateStorage,
TransactionStateResolver transactionStateResolver,
StorageUpdateHandler storageUpdateHandler,
- Schemas schemas,
+ ValidationSchemasSource validationSchemasSource,
ClusterNode localNode,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
@@ -328,7 +328,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
cursors = new
ConcurrentSkipListMap<>(IgniteUuid.globalOrderComparator());
- schemaCompatValidator = new SchemaCompatValidator(schemas,
catalogService);
+ schemaCompatValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
this::onPrimaryElected);
placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
this::onPrimaryExpired);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidator.java
similarity index 85%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
rename to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidator.java
index 8aef07a5e6..724cf28f1a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatibilityValidator.java
@@ -24,26 +24,38 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
+import
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
import org.apache.ignite.internal.tx.TransactionIds;
/**
* Validates schema compatibility.
*/
-class SchemaCompatValidator {
- private final Schemas schemas;
+class SchemaCompatibilityValidator {
+ private final ValidationSchemasSource validationSchemasSource;
private final CatalogService catalogService;
+ private final SchemaSyncService schemaSyncService;
+
+ // TODO: Remove entries from cache when compacting schemas in
SchemaManager https://issues.apache.org/jira/browse/IGNITE-20789
+ private final ConcurrentMap<TableDefinitionDiffKey, TableDefinitionDiff>
diffCache = new ConcurrentHashMap<>();
/** Constructor. */
- SchemaCompatValidator(Schemas schemas, CatalogService catalogService) {
- this.schemas = schemas;
+ SchemaCompatibilityValidator(
+ ValidationSchemasSource validationSchemasSource,
+ CatalogService catalogService,
+ SchemaSyncService schemaSyncService
+ ) {
+ this.validationSchemasSource = validationSchemasSource;
this.catalogService = catalogService;
+ this.schemaSyncService = schemaSyncService;
}
/**
@@ -72,7 +84,7 @@ class SchemaCompatValidator {
// so we don't need to account for clock skew.
assert commitTimestamp.compareTo(beginTimestamp) > 0;
- return schemas.waitForSchemasAvailability(commitTimestamp)
+ return schemaSyncService.waitForMetadataCompleteness(commitTimestamp)
.thenApply(ignored -> validateCommit(tableIds,
commitTimestamp, beginTimestamp));
}
@@ -116,7 +128,7 @@ class SchemaCompatValidator {
HybridTimestamp commitTimestamp,
int tableId
) {
- List<FullTableSchema> tableSchemas =
schemas.tableSchemaVersionsBetween(tableId, beginTimestamp, commitTimestamp);
+ List<FullTableSchema> tableSchemas =
validationSchemasSource.tableSchemaVersionsBetween(tableId, beginTimestamp,
commitTimestamp);
assert !tableSchemas.isEmpty();
@@ -132,7 +144,10 @@ class SchemaCompatValidator {
}
private boolean isForwardCompatible(FullTableSchema prevSchema,
FullTableSchema nextSchema) {
- TableDefinitionDiff diff = nextSchema.diffFrom(prevSchema);
+ TableDefinitionDiff diff = diffCache.computeIfAbsent(
+ new TableDefinitionDiffKey(prevSchema.tableId(),
prevSchema.schemaVersion(), nextSchema.schemaVersion()),
+ key -> nextSchema.diffFrom(prevSchema)
+ );
// TODO: IGNITE-19229 - more sophisticated logic.
return diff.isEmpty();
@@ -156,8 +171,8 @@ class SchemaCompatValidator {
CompletableFuture<CompatValidationResult> validateBackwards(int
tupleSchemaVersion, int tableId, UUID txId) {
HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
- return schemas.waitForSchemasAvailability(beginTimestamp)
- .thenCompose(ignored ->
schemas.waitForSchemaAvailability(tableId, tupleSchemaVersion))
+ return schemaSyncService.waitForMetadataCompleteness(beginTimestamp)
+ .thenCompose(ignored ->
validationSchemasSource.waitForSchemaAvailability(tableId, tupleSchemaVersion))
.thenApply(ignored ->
validateBackwardSchemaCompatibility(tupleSchemaVersion, tableId,
beginTimestamp));
}
@@ -166,7 +181,11 @@ class SchemaCompatValidator {
int tableId,
HybridTimestamp beginTimestamp
) {
- List<FullTableSchema> tableSchemas =
schemas.tableSchemaVersionsBetween(tableId, beginTimestamp, tupleSchemaVersion);
+ List<FullTableSchema> tableSchemas =
validationSchemasSource.tableSchemaVersionsBetween(
+ tableId,
+ beginTimestamp,
+ tupleSchemaVersion
+ );
if (tableSchemas.isEmpty()) {
// The tuple was not written with a future schema.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TableDefinitionDiffKey.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TableDefinitionDiffKey.java
new file mode 100644
index 0000000000..9d0daded3d
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TableDefinitionDiffKey.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+/**
+ * Key for table definitions diff.
+ */
+class TableDefinitionDiffKey {
+ private final int tableId;
+ private final int fromSchemaVersion;
+ private final int toSchemaVersion;
+
+ TableDefinitionDiffKey(int tableId, int fromSchemaVersion, int
toSchemaVersion) {
+ this.tableId = tableId;
+ this.fromSchemaVersion = fromSchemaVersion;
+ this.toSchemaVersion = toSchemaVersion;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TableDefinitionDiffKey that = (TableDefinitionDiffKey) o;
+
+ if (tableId != that.tableId) {
+ return false;
+ }
+ if (fromSchemaVersion != that.fromSchemaVersion) {
+ return false;
+ }
+ return toSchemaVersion == that.toSchemaVersion;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = tableId;
+ result = 31 * result + fromSchemaVersion;
+ result = 31 * result + toSchemaVersion;
+ return result;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java
new file mode 100644
index 0000000000..e0ca393a60
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSource.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaManager;
+
+/**
+ * An implementation over {@link CatalogService}.
+ */
+public class CatalogValidationSchemasSource implements ValidationSchemasSource
{
+ private final CatalogService catalogService;
+
+ private final SchemaManager schemaManager;
+
+ private final ConcurrentMap<CatalogVersionsSpan, List<FullTableSchema>>
catalogVersionSpansCache = new ConcurrentHashMap<>();
+
+ // TODO: Remove entries from cache when compacting Catalog
https://issues.apache.org/jira/browse/IGNITE-20790
+ // TODO: Remove entries from cache when compacting schemas in
SchemaManager https://issues.apache.org/jira/browse/IGNITE-20789
+ private final ConcurrentMap<CatalogVersionToTableVersionSpan,
List<FullTableSchema>> catalogVersionToTableVersionSpansCache
+ = new ConcurrentHashMap<>();
+
+ /** Constructor. */
+ public CatalogValidationSchemasSource(CatalogService catalogService,
SchemaManager schemaManager) {
+ this.catalogService = catalogService;
+ this.schemaManager = schemaManager;
+ }
+
+ @Override
+ public CompletableFuture<Void> waitForSchemaAvailability(int tableId, int
schemaVersion) {
+ return schemaManager.schemaRegistry(tableId)
+ .schemaAsync(schemaVersion)
+ .thenApply(unused -> null);
+ }
+
+ @Override
+ public List<FullTableSchema> tableSchemaVersionsBetween(int tableId,
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+ // It is safe to access the Catalog as the caller must have already
waited till the Catalog is up-to-date with the timestamps.
+ int fromCatalogVersion =
catalogService.activeCatalogVersion(fromIncluding.longValue());
+ int toCatalogVersion =
catalogService.activeCatalogVersion(toIncluding.longValue());
+
+ return catalogVersionSpansCache.computeIfAbsent(
+ new CatalogVersionsSpan(tableId, fromCatalogVersion,
toCatalogVersion),
+ key -> tableSchemaVersionsBetweenCatalogVersions(tableId,
fromCatalogVersion, toCatalogVersion)
+ );
+ }
+
+ @Override
+ public List<FullTableSchema> tableSchemaVersionsBetween(int tableId,
HybridTimestamp fromIncluding, int toTableVersionIncluding) {
+ // It is safe to access the Catalog as the caller must have already
waited till the Catalog is up-to-date.
+ int fromCatalogVersion =
catalogService.activeCatalogVersion(fromIncluding.longValue());
+
+ return catalogVersionToTableVersionSpansCache.computeIfAbsent(
+ new CatalogVersionToTableVersionSpan(tableId,
fromCatalogVersion, toTableVersionIncluding),
+ key ->
tableSchemaVersionsBetweenCatalogAndTableVersions(tableId, fromCatalogVersion,
toTableVersionIncluding)
+ );
+ }
+
+ private List<FullTableSchema>
tableSchemaVersionsBetweenCatalogVersions(int tableId, int fromCatalogVersion,
int toCatalogVersion) {
+ return tableVersionsBetween(tableId, fromCatalogVersion,
toCatalogVersion)
+
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
+ .collect(toList());
+ }
+
+ // It's ok to use Stream as the results of the methods that call this are
cached.
+ private Stream<CatalogTableDescriptor> tableVersionsBetween(
+ int tableId,
+ int fromCatalogVersionIncluding,
+ int toCatalogVersionIncluding
+ ) {
+ return IntStream.rangeClosed(fromCatalogVersionIncluding,
toCatalogVersionIncluding)
+ .mapToObj(catalogVersion -> catalogService.table(tableId,
catalogVersion))
+ .filter(new Predicate<>() {
+ int prevVersion = Integer.MIN_VALUE;
+
+ @Override
+ public boolean test(CatalogTableDescriptor
tableDescriptor) {
+ if (tableDescriptor.tableVersion() == prevVersion) {
+ return false;
+ }
+
+ assert prevVersion == Integer.MIN_VALUE ||
tableDescriptor.tableVersion() == prevVersion + 1
+ : String.format("Table version is expected to
be prevVersion+1, but version is %d and prevVersion is %d",
+ tableDescriptor.tableVersion(),
prevVersion);
+
+ prevVersion = tableDescriptor.tableVersion();
+
+ return true;
+ }
+ });
+ }
+
+ private List<FullTableSchema>
tableSchemaVersionsBetweenCatalogAndTableVersions(
+ int tableId,
+ int fromCatalogVersion,
+ int toTableVersion
+ ) {
+ return tableVersionsBetween(tableId, fromCatalogVersion,
catalogService.latestCatalogVersion())
+ .takeWhile(tableDescriptor -> tableDescriptor.tableVersion()
<= toTableVersion)
+
.map(CatalogValidationSchemasSource::fullSchemaFromTableDescriptor)
+ .collect(toList());
+ }
+
+ private static FullTableSchema
fullSchemaFromTableDescriptor(CatalogTableDescriptor tableDescriptor) {
+ return new FullTableSchema(
+ tableDescriptor.tableVersion(),
+ tableDescriptor.id(),
+ tableDescriptor.columns()
+ );
+ }
+
+ private static class CatalogVersionsSpan {
+ private final int tableId;
+ private final int fromCatalogVersion;
+ private final int toCatalogVersion;
+
+ private CatalogVersionsSpan(int tableId, int fromCatalogVersion, int
toCatalogVersion) {
+ this.tableId = tableId;
+ this.fromCatalogVersion = fromCatalogVersion;
+ this.toCatalogVersion = toCatalogVersion;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ CatalogVersionsSpan that = (CatalogVersionsSpan) o;
+
+ if (tableId != that.tableId) {
+ return false;
+ }
+ if (fromCatalogVersion != that.fromCatalogVersion) {
+ return false;
+ }
+ return toCatalogVersion == that.toCatalogVersion;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = tableId;
+ result = 31 * result + fromCatalogVersion;
+ result = 31 * result + toCatalogVersion;
+ return result;
+ }
+ }
+
+ private static class CatalogVersionToTableVersionSpan {
+ private final int tableId;
+ private final int fromCatalogVersion;
+ private final int toTableVersion;
+
+ private CatalogVersionToTableVersionSpan(int tableId, int
fromCatalogVersion, int toTableVersion) {
+ this.tableId = tableId;
+ this.fromCatalogVersion = fromCatalogVersion;
+ this.toTableVersion = toTableVersion;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ CatalogVersionToTableVersionSpan that =
(CatalogVersionToTableVersionSpan) o;
+
+ if (tableId != that.tableId) {
+ return false;
+ }
+ if (fromCatalogVersion != that.fromCatalogVersion) {
+ return false;
+ }
+ return toTableVersion == that.toTableVersion;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = tableId;
+ result = 31 * result + fromCatalogVersion;
+ result = 31 * result + toTableVersion;
+ return result;
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java
index 597e12f926..630d3716a7 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java
@@ -28,12 +28,12 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
-import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
/**
- * Represents a full table schema: that is, the definition of the table and
all objects (indexes, constraints, etc)
- * that belong to the table.
+ * Represents a full table schema: that is, the definition of the table and
all objects (constraints, etc)
+ * that belong to the table *that might affect schema compatibility* (so,
indices are not included as they
+ * don't affect such compatibility).
*/
public class FullTableSchema {
private final int schemaVersion;
@@ -41,21 +41,13 @@ public class FullTableSchema {
private final List<CatalogTableColumnDescriptor> columns;
- private final List<CatalogIndexDescriptor> indexes;
-
/**
* Constructor.
*/
- public FullTableSchema(
- int schemaVersion,
- int tableId,
- List<CatalogTableColumnDescriptor> columns,
- List<CatalogIndexDescriptor> indexes
- ) {
+ public FullTableSchema(int schemaVersion, int tableId,
List<CatalogTableColumnDescriptor> columns) {
this.schemaVersion = schemaVersion;
this.tableId = tableId;
- this.columns = columns;
- this.indexes = indexes;
+ this.columns = List.copyOf(columns);
}
/**
@@ -85,15 +77,6 @@ public class FullTableSchema {
return columns;
}
- /**
- * Returns definitions of indexes belonging to the table.
- *
- * @return Definitions of indexes belonging to the table.
- */
- public List<CatalogIndexDescriptor> indexes() {
- return indexes;
- }
-
/**
* Computes a diff between this and a previous schema.
*
@@ -118,13 +101,7 @@ public class FullTableSchema {
}
}
- Map<String, CatalogIndexDescriptor> prevIndexesByName =
toMapByName(prevSchema.indexes, CatalogIndexDescriptor::name);
- Map<String, CatalogIndexDescriptor> thisIndexesByName =
toMapByName(this.indexes, CatalogIndexDescriptor::name);
-
- List<CatalogIndexDescriptor> addedIndexes =
subtractKeyed(thisIndexesByName, prevIndexesByName);
- List<CatalogIndexDescriptor> removedIndexes =
subtractKeyed(prevIndexesByName, thisIndexesByName);
-
- return new TableDefinitionDiff(addedColumns, removedColumns,
changedColumns, addedIndexes, removedIndexes);
+ return new TableDefinitionDiff(addedColumns, removedColumns,
changedColumns);
}
private static <T> Map<String, T> toMapByName(List<T> elements,
Function<T, String> nameExtractor) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/TableDefinitionDiff.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/TableDefinitionDiff.java
index 28da611570..b4faf5ac64 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/TableDefinitionDiff.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/TableDefinitionDiff.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.table.distributed.schema;
import static java.util.Collections.emptyList;
import java.util.List;
-import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
/**
@@ -28,16 +27,13 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescript
*/
public class TableDefinitionDiff {
private static final TableDefinitionDiff EMPTY = new TableDefinitionDiff(
- emptyList(), emptyList(), emptyList(), emptyList(), emptyList()
+ emptyList(), emptyList(), emptyList()
);
private final List<CatalogTableColumnDescriptor> addedColumns;
private final List<CatalogTableColumnDescriptor> removedColumns;
private final List<ColumnDefinitionDiff> changedColumns;
- private final List<CatalogIndexDescriptor> addedIndexes;
- private final List<CatalogIndexDescriptor> removedIndexes;
-
// TODO: IGNITE-19229 - other change types
/**
@@ -55,15 +51,11 @@ public class TableDefinitionDiff {
public TableDefinitionDiff(
List<CatalogTableColumnDescriptor> addedColumns,
List<CatalogTableColumnDescriptor> removedColumns,
- List<ColumnDefinitionDiff> changedColumns,
- List<CatalogIndexDescriptor> addedIndexes,
- List<CatalogIndexDescriptor> removedIndexes
+ List<ColumnDefinitionDiff> changedColumns
) {
this.addedColumns = List.copyOf(addedColumns);
this.removedColumns = List.copyOf(removedColumns);
this.changedColumns = List.copyOf(changedColumns);
- this.addedIndexes = List.copyOf(addedIndexes);
- this.removedIndexes = List.copyOf(removedIndexes);
}
/**
@@ -87,20 +79,6 @@ public class TableDefinitionDiff {
return changedColumns;
}
- /**
- * Returns indexes that were added.
- */
- public List<CatalogIndexDescriptor> addedIndexes() {
- return addedIndexes;
- }
-
- /**
- * Returns indexes that were removed.
- */
- public List<CatalogIndexDescriptor> removedIndexes() {
- return removedIndexes;
- }
-
/**
* Returns whether this diff is empty (so no difference at all).
*
@@ -109,8 +87,6 @@ public class TableDefinitionDiff {
public boolean isEmpty() {
return addedColumns.isEmpty()
&& removedColumns.isEmpty()
- && changedColumns.isEmpty()
- && addedIndexes.isEmpty()
- && removedIndexes.isEmpty();
+ && changedColumns.isEmpty();
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/Schemas.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ValidationSchemasSource.java
similarity index 73%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/Schemas.java
rename to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ValidationSchemasSource.java
index 54ff6d659a..6f2cc1dc14 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/Schemas.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ValidationSchemasSource.java
@@ -20,33 +20,28 @@ package org.apache.ignite.internal.table.distributed.schema;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaManager;
/**
* Provides access to table schemas.
*/
-public interface Schemas {
- /**
- * Obtains a future that completes when all schemas activating not later
than the given timestamp are available.
- *
- * @param ts Timestamp we are interested in. This is the timestamp
transaction processing logic is interested in (like beginTs or
- * commitTs), not the timestamp after subtraction described in section
'Waiting for safe time in the past' of
- * <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-98:+Schema+Synchronization">IEP-98</a>
- * @return Future that completes when all schemas activating not later
than the given timestamp are available.
- */
- CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts);
-
+public interface ValidationSchemasSource {
/**
* Obtains a future that completes when the given schema version becomes
available.
*
+ * <p>Must only be called when it's guaranteed that the table exists from
the point of view of {@link SchemaManager}.
+ *
* @param tableId ID of the table of interest.
* @param schemaVersion ID of the schema version.
* @return Future that completes when the given schema version becomes
available.
*/
- CompletableFuture<?> waitForSchemaAvailability(int tableId, int
schemaVersion);
+ CompletableFuture<Void> waitForSchemaAvailability(int tableId, int
schemaVersion);
/**
* Returns all schema versions between (including) the two that were
effective at the given timestamps.
*
+ * <p>For both timestamps, schemas-related metadata must be complete, see
{@link SchemaSyncService}.
+ *
* @param tableId ID of the table which schemas need to be considered.
* @param fromIncluding Start timestamp.
* @param toIncluding End timestamp.
@@ -59,10 +54,12 @@ public interface Schemas {
* the one identified by a schema version ID. If the starting schema (the
one effective at fromIncluding)
* is actually a later schema than the one identified by toIncluding, then
an empty list is returned.
*
+ * <p>For both fromIncluding and toIncluding, schemas-related metadata
must be complete.
+ *
* @param tableId ID of the table which schemas need to be considered.
* @param fromIncluding Start timestamp.
- * @param toIncluding End schema version ID.
+ * @param toTableVersionIncluding End schema version ID.
* @return All schema versions between (including) the given timestamp and
schema version.
*/
- List<FullTableSchema> tableSchemaVersionsBetween(int tableId,
HybridTimestamp fromIncluding, int toIncluding);
+ List<FullTableSchema> tableSchemaVersionsBetween(int tableId,
HybridTimestamp fromIncluding, int toTableVersionIncluding);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/package-info.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/package-info.java
deleted file mode 100644
index 1f92efab55..0000000000
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This is a temporary package to host code needed until CatalogService is
ready
- * TODO: IGNITE-19447 - remove/rework when switched to full-blown usage of
CatalogService.
- */
-package org.apache.ignite.internal.table.distributed.schema;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
index 9a75e90c77..6e456195c3 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
@@ -55,7 +55,7 @@ import
org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
@@ -136,7 +136,7 @@ public class PartitionReplicaListenerDurableUnlockTest
extends IgniteAbstractTes
txStateStorage,
mock(TransactionStateResolver.class),
mock(StorageUpdateHandler.class),
- mock(Schemas.class),
+ mock(ValidationSchemasSource.class),
LOCAL_NODE,
mock(SchemaSyncService.class),
mock(CatalogService.class),
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index d56eb38cf1..17379ef941 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -91,7 +91,7 @@ import
org.apache.ignite.internal.table.distributed.replicator.action.RequestTyp
import
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
-import org.apache.ignite.internal.table.impl.DummySchemas;
+import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockManager;
@@ -244,7 +244,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
indexUpdateHandler,
new GcUpdateHandler(partitionDataStorage, safeTime,
indexUpdateHandler)
),
- new DummySchemas(schemaManager),
+ new DummyValidationSchemasSource(schemaManager),
localNode,
new AlwaysSyncedSchemaSyncService(),
catalogService,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 462ee6ba38..dd830a0052 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -152,7 +152,7 @@ import
org.apache.ignite.internal.table.distributed.replicator.action.RequestTyp
import
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -297,7 +297,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private PendingComparableValuesTracker<HybridTimestamp, Void>
safeTimeClock;
@Mock
- private Schemas schemas;
+ private ValidationSchemasSource validationSchemasSource;
@Spy
private final SchemaSyncService schemaSyncService = new
AlwaysSyncedSchemaSyncService();
@@ -393,8 +393,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
when(safeTimeClock.waitFor(any())).thenReturn(completedFuture(null));
when(safeTimeClock.current()).thenReturn(HybridTimestamp.MIN_VALUE);
-
when(schemas.waitForSchemasAvailability(any())).thenReturn(completedFuture(null));
- when(schemas.waitForSchemaAvailability(anyInt(),
anyInt())).thenReturn(completedFuture(null));
+ when(validationSchemasSource.waitForSchemaAvailability(anyInt(),
anyInt())).thenReturn(completedFuture(null));
lenient().when(catalogService.table(anyInt(),
anyLong())).thenReturn(tableDescriptor);
@@ -490,7 +489,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
indexUpdateHandler,
new GcUpdateHandler(partitionDataStorage,
safeTimeClock, indexUpdateHandler)
),
- schemas,
+ validationSchemasSource,
localNode,
schemaSyncService,
catalogService,
@@ -1463,7 +1462,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
@Test
public void commitsOnSameSchemaSuccessfully() {
- when(schemas.tableSchemaVersionsBetween(anyInt(), any(),
any(HybridTimestamp.class)))
+ when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(),
any(), any(HybridTimestamp.class)))
.thenReturn(List.of(
tableSchema(CURRENT_SCHEMA_VERSION,
List.of(nullableColumn("col")))
));
@@ -1486,7 +1485,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
private static FullTableSchema tableSchema(int schemaVersion,
List<CatalogTableColumnDescriptor> columns) {
- return new FullTableSchema(schemaVersion, 1, columns, List.of());
+ return new FullTableSchema(schemaVersion, 1, columns);
}
private AtomicReference<Boolean> interceptFinishTxCommand() {
@@ -1525,7 +1524,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
@Test
@Disabled("IGNITE-19229")
public void commitsOnCompatibleSchemaChangeSuccessfully() {
- when(schemas.tableSchemaVersionsBetween(anyInt(), any(),
any(HybridTimestamp.class)))
+ when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(),
any(), any(HybridTimestamp.class)))
.thenReturn(List.of(
tableSchema(CURRENT_SCHEMA_VERSION,
List.of(nullableColumn("col1"))),
tableSchema(FUTURE_SCHEMA_VERSION,
List.of(nullableColumn("col1"), nullableColumn("col2")))
@@ -1557,12 +1556,12 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
private void simulateForwardIncompatibleSchemaChange(int
fromSchemaVersion, int toSchemaVersion) {
- when(schemas.tableSchemaVersionsBetween(anyInt(), any(),
any(HybridTimestamp.class)))
+ when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(),
any(), any(HybridTimestamp.class)))
.thenReturn(incompatibleSchemaVersions(fromSchemaVersion,
toSchemaVersion));
}
private void simulateBackwardIncompatibleSchemaChange(int
fromSchemaVersion, int toSchemaVersion) {
- when(schemas.tableSchemaVersionsBetween(anyInt(), any(), anyInt()))
+ when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(),
any(), anyInt()))
.thenReturn(incompatibleSchemaVersions(fromSchemaVersion,
toSchemaVersion));
}
@@ -2214,7 +2213,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
Set<ReplicationGroupId> groups,
int tableToBeDroppedId
) {
- when(schemas.tableSchemaVersionsBetween(anyInt(), any(),
any(HybridTimestamp.class)))
+ when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(),
any(), any(HybridTimestamp.class)))
.thenReturn(List.of(
tableSchema(CURRENT_SCHEMA_VERSION,
List.of(nullableColumn("col")))
));
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java
new file mode 100644
index 0000000000..e00425deb4
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogValidationSchemasSourceTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.mockito.AdditionalMatchers.geq;
+import static org.mockito.AdditionalMatchers.lt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class CatalogValidationSchemasSourceTest extends BaseIgniteAbstractTest {
+ @Mock
+ private CatalogService catalogService;
+
+ @Mock
+ private SchemaManager schemaManager;
+
+ @Mock
+ private SchemaRegistry schemaRegistry;
+
+ @InjectMocks
+ private CatalogValidationSchemasSource schemas;
+
+ private final HybridClock clock = new HybridClockImpl();
+
+ @Test
+ void waitingForSchemaAvailabilityAtVersionWorks() {
+ int tableId = 1;
+ int version = 3;
+ CompletableFuture<SchemaDescriptor> underlyingFuture = new
CompletableFuture<>();
+
+ doReturn(schemaRegistry).when(schemaManager).schemaRegistry(tableId);
+ when(schemaRegistry.schemaAsync(version)).thenReturn(underlyingFuture);
+
+ CompletableFuture<Void> future =
schemas.waitForSchemaAvailability(tableId, version);
+ assertThat(future, is(not(completedFuture())));
+
+ underlyingFuture.complete(mock(SchemaDescriptor.class));
+
+ assertThat(future, is(completedFuture()));
+ }
+
+ @Test
+ void tableSchemaVersionsBetweenTimestampsWorks() {
+ int tableId = 1;
+
+ CatalogTableDescriptor version3 = tableVersion(tableId, 3);
+ CatalogTableDescriptor version4 = tableVersion(tableId, 4);
+
+ HybridTimestamp from = clock.now();
+ HybridTimestamp to = clock.now();
+
+
when(catalogService.activeCatalogVersion(from.longValue())).thenReturn(3);
+
when(catalogService.activeCatalogVersion(to.longValue())).thenReturn(4);
+ when(catalogService.table(tableId, 3)).thenReturn(version3);
+ when(catalogService.table(tableId, 4)).thenReturn(version4);
+
+ List<FullTableSchema> fullSchemas =
schemas.tableSchemaVersionsBetween(tableId, from, to);
+
+ assertThat(fullSchemas, hasSize(2));
+ assertThat(fullSchemas.get(0).schemaVersion(), is(3));
+ assertThat(fullSchemas.get(1).schemaVersion(), is(4));
+ }
+
+ private static CatalogTableDescriptor tableVersion(int tableId, int
tableVersion) {
+ List<CatalogTableColumnDescriptor> columns = List.of(
+ new CatalogTableColumnDescriptor("k1", ColumnType.INT16,
false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("v1", ColumnType.INT32,
false, 0, 0, 0, null)
+ );
+
+ return new CatalogTableDescriptor(
+ tableId, -1, -1, "test", 0, tableVersion, columns,
List.of("k1"), null, INITIAL_CAUSALITY_TOKEN
+ );
+ }
+
+ @Test
+ void tableSchemaVersionsBetweenTimestampsUsesCache() {
+ int tableId = 1;
+
+ CatalogTableDescriptor version3 = tableVersion(tableId, 3);
+ CatalogTableDescriptor version4 = tableVersion(tableId, 4);
+
+ HybridTimestamp timestamp = clock.now();
+
+
when(catalogService.activeCatalogVersion(lt(timestamp.longValue()))).thenReturn(3);
+
when(catalogService.activeCatalogVersion(geq(timestamp.longValue()))).thenReturn(4);
+ when(catalogService.table(tableId, 3)).thenReturn(version3);
+ when(catalogService.table(tableId, 4)).thenReturn(version4);
+
+ List<FullTableSchema> fullSchemas1 =
schemas.tableSchemaVersionsBetween(tableId, timestamp.subtractPhysicalTime(2),
timestamp);
+ List<FullTableSchema> fullSchemas2 =
schemas.tableSchemaVersionsBetween(
+ tableId,
+ timestamp.subtractPhysicalTime(1),
+ timestamp.addPhysicalTime(10)
+ );
+
+ assertThat(fullSchemas1.size(), is(fullSchemas2.size()));
+
+ verify(catalogService, times(1)).table(tableId, 3);
+ }
+
+ @Test
+ void tableSchemaVersionsBetweenTimestampAndVersionWorks() {
+ int tableId = 1;
+
+ CatalogTableDescriptor version3 = tableVersion(tableId, 3);
+ CatalogTableDescriptor version4 = tableVersion(tableId, 4);
+ CatalogTableDescriptor version5 = tableVersion(tableId, 5);
+
+ HybridTimestamp from = clock.now();
+
+ when(catalogService.latestCatalogVersion()).thenReturn(5);
+
when(catalogService.activeCatalogVersion(from.longValue())).thenReturn(3);
+ when(catalogService.table(tableId, 3)).thenReturn(version3);
+ when(catalogService.table(tableId, 4)).thenReturn(version4);
+ when(catalogService.table(tableId, 5)).thenReturn(version5);
+
+ List<FullTableSchema> fullSchemas =
schemas.tableSchemaVersionsBetween(tableId, from, 4);
+
+ assertThat(fullSchemas, hasSize(2));
+ assertThat(fullSchemas.get(0).schemaVersion(), is(3));
+ assertThat(fullSchemas.get(1).schemaVersion(), is(4));
+ }
+
+ @Test
+ void
tableSchemaVersionsBetweenTimestampAndVersionReturnsEmptyListIfEndIsBeforeStart()
{
+ int tableId = 1;
+
+ CatalogTableDescriptor version3 = tableVersion(tableId, 3);
+
+ HybridTimestamp from = clock.now();
+
+ when(catalogService.latestCatalogVersion()).thenReturn(3);
+
when(catalogService.activeCatalogVersion(from.longValue())).thenReturn(3);
+ when(catalogService.table(tableId, 3)).thenReturn(version3);
+
+ List<FullTableSchema> fullSchemas =
schemas.tableSchemaVersionsBetween(tableId, from, 2);
+
+ assertThat(fullSchemas, is(empty()));
+ }
+
+ @Test
+ void tableSchemaVersionsBetweenTimestampAndVersionUsesCache() {
+ int tableId = 1;
+
+ CatalogTableDescriptor version3 = tableVersion(tableId, 3);
+ CatalogTableDescriptor version4 = tableVersion(tableId, 4);
+
+ HybridTimestamp timestamp = clock.now();
+
+ when(catalogService.latestCatalogVersion()).thenReturn(4);
+ when(catalogService.activeCatalogVersion(anyLong())).thenReturn(3);
+ when(catalogService.table(tableId, 3)).thenReturn(version3);
+ when(catalogService.table(tableId, 4)).thenReturn(version4);
+
+ List<FullTableSchema> fullSchemas1 =
schemas.tableSchemaVersionsBetween(tableId, timestamp.subtractPhysicalTime(2),
4);
+ List<FullTableSchema> fullSchemas2 =
schemas.tableSchemaVersionsBetween(tableId, timestamp.subtractPhysicalTime(1),
4);
+
+ assertThat(fullSchemas1.size(), is(fullSchemas2.size()));
+
+ verify(catalogService, times(1)).table(tableId, 3);
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchemaTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchemaTest.java
index 6347435d75..a172121ccb 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchemaTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchemaTest.java
@@ -24,8 +24,6 @@ import static org.hamcrest.Matchers.is;
import java.util.List;
import org.apache.ignite.internal.catalog.commands.DefaultValue;
-import
org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.sql.ColumnType;
import org.junit.jupiter.api.Test;
@@ -34,20 +32,15 @@ class FullTableSchemaTest {
@Test
void sameSchemasHaveEmptyDiff() {
CatalogTableColumnDescriptor column = someColumn("a");
- CatalogIndexDescriptor index = someIndex(1, "ind_a");
- var schema1 = new FullTableSchema(1, 1, List.of(column),
List.of(index));
- var schema2 = new FullTableSchema(2, 1, List.of(column),
List.of(index));
+ var schema1 = new FullTableSchema(1, 1, List.of(column));
+ var schema2 = new FullTableSchema(2, 1, List.of(column));
TableDefinitionDiff diff = schema2.diffFrom(schema1);
assertThat(diff.isEmpty(), is(true));
}
- private static CatalogHashIndexDescriptor someIndex(int id, String name) {
- return new CatalogHashIndexDescriptor(id, name, 1, true, List.of("a"),
true);
- }
-
private static CatalogTableColumnDescriptor someColumn(String columnName) {
return new CatalogTableColumnDescriptor(columnName, ColumnType.INT32,
true, 0, 0, 0, DefaultValue.constant(null));
}
@@ -58,8 +51,8 @@ class FullTableSchemaTest {
CatalogTableColumnDescriptor column2 = someColumn("b");
CatalogTableColumnDescriptor column3 = someColumn("c");
- var schema1 = new FullTableSchema(1, 1, List.of(column1, column2),
List.of());
- var schema2 = new FullTableSchema(2, 1, List.of(column2, column3),
List.of());
+ var schema1 = new FullTableSchema(1, 1, List.of(column1, column2));
+ var schema2 = new FullTableSchema(2, 1, List.of(column2, column3));
TableDefinitionDiff diff = schema2.diffFrom(schema1);
@@ -73,10 +66,9 @@ class FullTableSchemaTest {
void changedColumnsAreReflectedInDiff() {
CatalogTableColumnDescriptor column1 = someColumn("a");
- var schema1 = new FullTableSchema(1, 1, List.of(column1), List.of());
+ var schema1 = new FullTableSchema(1, 1, List.of(column1));
var schema2 = new FullTableSchema(2, 1,
- List.of(new CatalogTableColumnDescriptor("a",
ColumnType.STRING, true, 0, 0, 10, DefaultValue.constant(null))),
- List.of()
+ List.of(new CatalogTableColumnDescriptor("a",
ColumnType.STRING, true, 0, 0, 10, DefaultValue.constant(null)))
);
TableDefinitionDiff diff = schema2.diffFrom(schema1);
@@ -86,20 +78,4 @@ class FullTableSchemaTest {
List<ColumnDefinitionDiff> changedColumns = diff.changedColumns();
assertThat(changedColumns, is(hasSize(1)));
}
-
- @Test
- void addedRemovedIndexesAreReflectedInDiff() {
- CatalogIndexDescriptor index1 = someIndex(1, "a");
- CatalogIndexDescriptor index2 = someIndex(2, "b");
- CatalogIndexDescriptor index3 = someIndex(3, "c");
-
- var schema1 = new FullTableSchema(1, 1, List.of(someColumn("a")),
List.of(index1, index2));
- var schema2 = new FullTableSchema(2, 1, List.of(someColumn("a")),
List.of(index2, index3));
-
- TableDefinitionDiff diff = schema2.diffFrom(schema1);
-
- assertThat(diff.isEmpty(), is(false));
- assertThat(diff.addedIndexes(), is(List.of(index3)));
- assertThat(diff.removedIndexes(), is(List.of(index1)));
- }
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index ee0b5ee172..2fcabab70e 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -110,11 +110,11 @@ import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateR
import
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
import
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
-import org.apache.ignite.internal.table.impl.DummySchemas;
+import org.apache.ignite.internal.table.impl.DummyValidationSchemasSource;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
@@ -546,7 +546,7 @@ public class ItTxTestCluster {
txStateStorage,
transactionStateResolver,
storageUpdateHandler,
- new DummySchemas(schemaManager),
+ new
DummyValidationSchemasSource(schemaManager),
consistentIdToNode.apply(assignment),
new AlwaysSyncedSchemaSyncService(),
catalogService,
@@ -642,7 +642,7 @@ public class ItTxTestCluster {
TxStateStorage txStateStorage,
TransactionStateResolver transactionStateResolver,
StorageUpdateHandler storageUpdateHandler,
- Schemas schemas,
+ ValidationSchemasSource validationSchemasSource,
ClusterNode localNode,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
@@ -664,7 +664,7 @@ public class ItTxTestCluster {
txStateStorage,
transactionStateResolver,
storageUpdateHandler,
- schemas,
+ validationSchemasSource,
localNode,
schemaSyncService,
catalogService,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 41800c140c..da2716615f 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -383,7 +383,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
txStateStorage().getOrCreateTxStateStorage(PART_ID),
transactionStateResolver,
storageUpdateHandler,
- new DummySchemas(schemaManager),
+ new DummyValidationSchemasSource(schemaManager),
LOCAL_NODE,
new AlwaysSyncedSchemaSyncService(),
catalogService,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemas.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemas.java
deleted file mode 100644
index 01f6525932..0000000000
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemas.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.impl;
-
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.stream.Collectors.toList;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
-import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
-import org.apache.ignite.internal.table.distributed.schema.Schemas;
-
-/**
- * Dummy {@link Schemas} implementation that is not historic and always uses
same {@link SchemaRegistry}.
- */
-public class DummySchemas implements Schemas {
- private final SchemaRegistry schemaRegistry;
-
- public DummySchemas(SchemaRegistry schemaRegistry) {
- this.schemaRegistry = schemaRegistry;
- }
-
- @Override
- public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts)
{
- return completedFuture(null);
- }
-
- @Override
- public CompletableFuture<?> waitForSchemaAvailability(int tableId, int
schemaVersion) {
- return completedFuture(null);
- }
-
- @Override
- public List<FullTableSchema> tableSchemaVersionsBetween(int tableId,
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
- SchemaDescriptor schemaDescriptor = schemaRegistry.lastKnownSchema();
-
- List<CatalogTableColumnDescriptor> columns =
schemaDescriptor.columnNames().stream()
- .map(colName -> {
- Column column = schemaDescriptor.column(colName);
-
- assert column != null;
-
- return NonHistoricSchemas.columnDescriptor(column);
- })
- .collect(toList());
-
- var fullSchema = new FullTableSchema(
- 1,
- 1,
- columns,
- List.of()
- );
-
- return List.of(fullSchema);
- }
-
- @Override
- public List<FullTableSchema> tableSchemaVersionsBetween(int tableId,
HybridTimestamp fromIncluding, int toIncluding) {
- // Returning an empty list makes sure that backward validation never
fails, which is what we want before
- // we switch to CatalogService completely.
- return List.of();
- }
-}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyValidationSchemasSource.java
similarity index 76%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java
rename to
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyValidationSchemasSource.java
index c6eb5f38c8..4d171a88fc 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyValidationSchemasSource.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table.distributed.schema;
+package org.apache.ignite.internal.table.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
@@ -29,7 +29,9 @@ import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.DefaultValueProvider;
import
org.apache.ignite.internal.schema.DefaultValueProvider.FunctionalValueProvider;
import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import
org.apache.ignite.internal.table.distributed.schema.ValidationSchemasSource;
import org.apache.ignite.internal.type.BitmaskNativeType;
import org.apache.ignite.internal.type.DecimalNativeType;
import org.apache.ignite.internal.type.NativeType;
@@ -37,41 +39,23 @@ import org.apache.ignite.internal.type.TemporalNativeType;
import org.apache.ignite.internal.type.VarlenNativeType;
/**
- * A dummy implementation over {@link SchemaManager}. It is dummy because:
- *
- * <ul>
- * <li>It imitates historicity, but always takes the latest known
schema</li>
- * <li>{@link #tableSchemaVersionsBetween(int, HybridTimestamp,
HybridTimestamp)} always returns a single schema to avoid
- * validation failures</li>
- * </ul>
- *
- * <p>The point of this implementation is to allow the system work in the
pre-SchemaSync fashion before the switch to CatalogService
- * is possible.
+ * Dummy {@link ValidationSchemasSource} implementation that is not historic
and always uses same {@link SchemaRegistry}.
*/
-// TODO: IGNITE-19447 - remove when switched to the CatalogService
-public class NonHistoricSchemas implements Schemas {
- private final SchemaManager schemaManager;
-
- private final SchemaSyncService schemaSyncService;
+public class DummyValidationSchemasSource implements ValidationSchemasSource {
+ private final SchemaRegistry schemaRegistry;
- public NonHistoricSchemas(SchemaManager schemaManager, SchemaSyncService
schemaSyncService) {
- this.schemaManager = schemaManager;
- this.schemaSyncService = schemaSyncService;
- }
-
- @Override
- public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts)
{
- return schemaSyncService.waitForMetadataCompleteness(ts);
+ public DummyValidationSchemasSource(SchemaRegistry schemaRegistry) {
+ this.schemaRegistry = schemaRegistry;
}
@Override
- public CompletableFuture<?> waitForSchemaAvailability(int tableId, int
schemaVersion) {
+ public CompletableFuture<Void> waitForSchemaAvailability(int tableId, int
schemaVersion) {
return completedFuture(null);
}
@Override
public List<FullTableSchema> tableSchemaVersionsBetween(int tableId,
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
- SchemaDescriptor schemaDescriptor =
schemaManager.schemaRegistry(tableId).lastKnownSchema();
+ SchemaDescriptor schemaDescriptor = schemaRegistry.lastKnownSchema();
List<CatalogTableColumnDescriptor> columns =
schemaDescriptor.columnNames().stream()
.map(colName -> {
@@ -86,17 +70,15 @@ public class NonHistoricSchemas implements Schemas {
var fullSchema = new FullTableSchema(
1,
1,
- columns,
- List.of()
+ columns
);
return List.of(fullSchema);
}
@Override
- public List<FullTableSchema> tableSchemaVersionsBetween(int tableId,
HybridTimestamp fromIncluding, int toIncluding) {
- // Returning an empty list makes sure that backward validation never
fails, which is what we want before
- // we switch to CatalogService completely.
+ public List<FullTableSchema> tableSchemaVersionsBetween(int tableId,
HybridTimestamp fromIncluding, int toTableVersionIncluding) {
+ // Returning an empty list makes sure that backward validation never
fails, which is what we want.
return List.of();
}
@@ -107,7 +89,7 @@ public class NonHistoricSchemas implements Schemas {
* @param column Column to convert.
* @return Conversion result.
*/
- public static CatalogTableColumnDescriptor columnDescriptor(Column column)
{
+ private static CatalogTableColumnDescriptor columnDescriptor(Column
column) {
NativeType nativeType = column.type();
int precision;
int scale;