This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 09565a1025 IGNITE-19082: Catalog. Cleanup dead code (#3669)
09565a1025 is described below
commit 09565a102591e91c0283d075804801b3a362c737
Author: Max Zhuravkov <[email protected]>
AuthorDate: Fri May 10 18:13:21 2024 +0300
IGNITE-19082: Catalog. Cleanup dead code (#3669)
---
.../internal/catalog/CatalogManagerImpl.java | 90 +++++----
.../ignite/internal/catalog/CatalogService.java | 7 +
.../commands/AbstractCreateIndexCommand.java | 2 +-
.../commands/AlterTableAddColumnCommand.java | 2 +-
.../commands/AlterTableAlterColumnCommand.java | 2 +-
.../commands/AlterTableDropColumnCommand.java | 2 +-
.../catalog/commands/CreateSchemaCommand.java | 96 +++++++++
.../commands/CreateSchemaCommandBuilder.java | 32 +++
.../catalog/commands/CreateSystemViewCommand.java | 4 +-
.../catalog/commands/CreateTableCommand.java | 4 +-
.../catalog/commands/DropIndexCommand.java | 2 +-
.../catalog/commands/DropTableCommand.java | 2 +-
.../descriptors/CatalogColumnCollation.java | 2 +-
.../descriptors/CatalogIndexDescriptor.java | 13 +-
.../descriptors/CatalogSystemViewDescriptor.java | 47 +++--
.../descriptors/CatalogTableDescriptor.java | 22 +--
.../events/StoppingIndexEventParameters.java | 11 +-
.../storage/AbstractChangeIndexStatusEntry.java | 18 +-
.../internal/catalog/storage/AlterColumnEntry.java | 30 ++-
.../internal/catalog/storage/DropColumnsEntry.java | 28 ++-
.../internal/catalog/storage/DropIndexEntry.java | 20 +-
.../internal/catalog/storage/DropTableEntry.java | 16 +-
.../internal/catalog/storage/NewColumnsEntry.java | 27 ++-
.../internal/catalog/storage/NewIndexEntry.java | 17 +-
.../internal/catalog/storage/NewSchemaEntry.java | 88 +++++++++
.../catalog/storage/NewSystemViewEntry.java | 13 +-
.../internal/catalog/storage/NewTableEntry.java | 14 +-
.../internal/catalog/storage/RenameTableEntry.java | 18 +-
.../CatalogEntrySerializerProvider.java | 2 +
.../serialization/MarshallableEntryType.java | 3 +-
...CatalogManagerDescriptorCausalityTokenTest.java | 9 +-
.../internal/catalog/CatalogManagerSelfTest.java | 59 +++---
.../internal/catalog/CatalogSystemViewTest.java | 5 +-
.../catalog/commands/CatalogUtilsTest.java | 2 +-
.../CreateSchemaCommandValidationTest.java | 63 ++++++
.../CatalogSystemViewDescriptorTest.java | 2 +
.../storage/CatalogEntrySerializationTest.java | 45 +++--
.../ignite/client/handler/FakeCatalogService.java | 7 +-
.../apache/ignite/internal/index/IndexManager.java | 1 -
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 +
.../java/org/apache/ignite/internal/Cluster.java | 1 +
.../schema/registry/SchemaRegistryImpl.java | 36 +---
.../schema/registry/SchemaRegistryImplTest.java | 218 ---------------------
.../internal/sql/engine/ItCreateTableDdlTest.java | 20 --
.../internal/systemview/SystemViewManagerImpl.java | 3 +-
.../internal/systemview/SystemViewManagerTest.java | 38 ++--
46 files changed, 579 insertions(+), 565 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 14e90a15ea..070c7a81a0 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Flow.Publisher;
import java.util.function.LongSupplier;
import org.apache.ignite.internal.catalog.commands.AlterZoneSetDefaultCommand;
+import org.apache.ignite.internal.catalog.commands.CreateSchemaCommand;
import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
import
org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
@@ -119,6 +120,9 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
/** Versioned catalog descriptors sorted in chronological order. */
private final NavigableMap<Long, Catalog> catalogByTs = new
ConcurrentSkipListMap<>();
+ /** A future that completes when an empty catalog is initialised. If
catalog is not empty this future when this completes starts. */
+ private final CompletableFuture<Void> catalogInitializationFuture = new
CompletableFuture<>();
+
private final UpdateLog updateLog;
private final PendingComparableValuesTracker<Integer, Void> versionTracker
= new PendingComparableValuesTracker<>(0);
@@ -158,27 +162,7 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
public CompletableFuture<Void> startAsync() {
int objectIdGen = 0;
- // TODO: IGNITE-19082 Move default schema objects initialization to
cluster init procedure.
- CatalogSchemaDescriptor publicSchema = new CatalogSchemaDescriptor(
- objectIdGen++,
- DEFAULT_SCHEMA_NAME,
- new CatalogTableDescriptor[0],
- new CatalogIndexDescriptor[0],
- new CatalogSystemViewDescriptor[0],
- INITIAL_CAUSALITY_TOKEN
- );
-
- // TODO: IGNITE-19082 Move system schema objects initialization to
cluster init procedure.
- CatalogSchemaDescriptor systemSchema = new CatalogSchemaDescriptor(
- objectIdGen++,
- SYSTEM_SCHEMA_NAME,
- new CatalogTableDescriptor[0],
- new CatalogIndexDescriptor[0],
- new CatalogSystemViewDescriptor[0],
- INITIAL_CAUSALITY_TOKEN
- );
-
- Catalog emptyCatalog = new Catalog(0, 0L, objectIdGen, List.of(),
List.of(publicSchema, systemSchema), null);
+ Catalog emptyCatalog = new Catalog(0, 0L, objectIdGen, List.of(),
List.of(), null);
registerCatalog(emptyCatalog);
@@ -187,12 +171,17 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
return updateLog.startAsync()
.thenCompose(none -> {
if (latestCatalogVersion() == emptyCatalog.version()) {
- // node has not seen any updates yet, let's try to
initialise
- // catalog with default zone
- return createDefaultZone(emptyCatalog);
- }
+ int initializedCatalogVersion = emptyCatalog.version()
+ 1;
+
+ this.catalogReadyFuture(initializedCatalogVersion)
+ .thenCompose(ignored ->
awaitVersionActivation(initializedCatalogVersion))
+ .handle((r, e) ->
catalogInitializationFuture.complete(null));
- return nullCompletedFuture();
+ return initCatalog(emptyCatalog);
+ } else {
+ catalogInitializationFuture.complete(null);
+ return nullCompletedFuture();
+ }
});
}
@@ -205,7 +194,11 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
@Override
public @Nullable CatalogTableDescriptor table(String tableName, long
timestamp) {
- return
catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).table(tableName);
+ CatalogSchemaDescriptor schema =
catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME);
+ if (schema == null) {
+ return null;
+ }
+ return schema.table(tableName);
}
@Override
@@ -225,7 +218,11 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
@Override
public @Nullable CatalogIndexDescriptor aliveIndex(String indexName, long
timestamp) {
- return
catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).aliveIndex(indexName);
+ CatalogSchemaDescriptor schema =
catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME);
+ if (schema == null) {
+ return null;
+ }
+ return schema.aliveIndex(indexName);
}
@Override
@@ -321,6 +318,11 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
return versionTracker.waitFor(version);
}
+ @Override
+ public CompletableFuture<Void> catalogInitializationFuture() {
+ return catalogInitializationFuture;
+ }
+
@Override
public @Nullable Catalog catalog(int catalogVersion) {
return catalogByVer.get(catalogVersion);
@@ -363,8 +365,9 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
return updateLog.saveSnapshot(new SnapshotEntry(catalog));
}
- private CompletableFuture<Void> createDefaultZone(Catalog emptyCatalog) {
- List<UpdateEntry> createZoneEntries = new BulkUpdateProducer(List.of(
+ private CompletableFuture<Void> initCatalog(Catalog emptyCatalog) {
+ List<CatalogCommand> initCommands = List.of(
+ // Init default zone
CreateZoneCommand.builder()
.zoneName(DEFAULT_ZONE_NAME)
.partitions(DEFAULT_PARTITION_COUNT)
@@ -378,10 +381,15 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
.build(),
AlterZoneSetDefaultCommand.builder()
.zoneName(DEFAULT_ZONE_NAME)
- .build()
- )).get(emptyCatalog);
+ .build(),
+ // Add schemas
+
CreateSchemaCommand.builder().name(DEFAULT_SCHEMA_NAME).build(),
+ CreateSchemaCommand.builder().name(SYSTEM_SCHEMA_NAME).build()
+ );
+
+ List<UpdateEntry> entries = new
BulkUpdateProducer(initCommands).get(emptyCatalog);
- return updateLog.append(new VersionedUpdate(emptyCatalog.version() +
1, 0L, createZoneEntries))
+ return updateLog.append(new VersionedUpdate(emptyCatalog.version() +
1, 0L, entries))
.handle((result, error) -> {
if (error != null) {
LOG.warn("Unable to create default zone.", error);
@@ -407,13 +415,7 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
CompletableFuture<Integer> resultFuture = new CompletableFuture<>();
saveUpdate(updateProducer, 0)
- .thenCompose(newVersion -> {
- Catalog catalog = catalogByVer.get(newVersion);
-
- HybridTimestamp tsSafeForRoReadingInPastOptimization =
calcClusterWideEnsureActivationTime(catalog);
-
- return
clockService.waitFor(tsSafeForRoReadingInPastOptimization).thenApply(unused ->
newVersion);
- })
+ .thenCompose(this::awaitVersionActivation)
.whenComplete((newVersion, err) -> {
if (err != null) {
Throwable errUnwrapped =
ExceptionUtils.unwrapCause(err);
@@ -448,6 +450,14 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
return resultFuture;
}
+ private CompletableFuture<Integer> awaitVersionActivation(int version) {
+ Catalog catalog = catalogByVer.get(version);
+
+ HybridTimestamp tsSafeForRoReadingInPastOptimization =
calcClusterWideEnsureActivationTime(catalog);
+
+ return
clockService.waitFor(tsSafeForRoReadingInPastOptimization).thenApply(unused ->
version);
+ }
+
private HybridTimestamp calcClusterWideEnsureActivationTime(Catalog
catalog) {
return clusterWideEnsuredActivationTsSafeForRoReads(
catalog,
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index eefbaa048f..cc1c65de89 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -45,8 +45,10 @@ import org.jetbrains.annotations.Nullable;
* <p>TBD: events
*/
public interface CatalogService extends EventProducer<CatalogEvent,
CatalogEventParameters> {
+ /** Default schema name. */
String DEFAULT_SCHEMA_NAME = "PUBLIC";
+ /** System schema name. */
String SYSTEM_SCHEMA_NAME = "SYSTEM";
/** Default storage profile. */
@@ -110,4 +112,9 @@ public interface CatalogService extends
EventProducer<CatalogEvent, CatalogEvent
* @param version Catalog version to wait for.
*/
CompletableFuture<Void> catalogReadyFuture(int version);
+
+ /**
+ * Returns a future, which completes when empty catalog is initialised.
Otherwise this future completes upon startup.
+ */
+ CompletableFuture<Void> catalogInitializationFuture();
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractCreateIndexCommand.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractCreateIndexCommand.java
index 08bebca51e..12bf302010 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractCreateIndexCommand.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AbstractCreateIndexCommand.java
@@ -91,7 +91,7 @@ public abstract class AbstractCreateIndexCommand extends
AbstractIndexCommand {
}
return List.of(
- new NewIndexEntry(createDescriptor(catalog.objectIdGenState(),
table.id(), catalog.version() + 1), schemaName),
+ new NewIndexEntry(createDescriptor(catalog.objectIdGenState(),
table.id(), catalog.version() + 1)),
new ObjectIdGenUpdateEntry(1)
);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommand.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommand.java
index 57a2d1a1f4..d1cc894a3d 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommand.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAddColumnCommand.java
@@ -89,7 +89,7 @@ public class AlterTableAddColumnCommand extends
AbstractTableCommand {
}
return List.of(
- new NewColumnsEntry(table.id(), columnDescriptors, schemaName)
+ new NewColumnsEntry(table.id(), columnDescriptors)
);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommand.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommand.java
index 1a06cc1691..1fda646d11 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommand.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableAlterColumnCommand.java
@@ -116,7 +116,7 @@ public class AlterTableAlterColumnCommand extends
AbstractTableCommand {
}
return List.of(
- new AlterColumnEntry(table.id(), target, schemaName)
+ new AlterColumnEntry(table.id(), target)
);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java
index 9e6ec15e32..7190859f08 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/AlterTableDropColumnCommand.java
@@ -107,7 +107,7 @@ public class AlterTableDropColumnCommand extends
AbstractTableCommand {
});
return List.of(
- new DropColumnsEntry(table.id(), columns, schemaName)
+ new DropColumnsEntry(table.id(), columns)
);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommand.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommand.java
new file mode 100644
index 0000000000..38f15e0bdf
--- /dev/null
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommand.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
+import static
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateIdentifier;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogValidationException;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import
org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.storage.NewSchemaEntry;
+import org.apache.ignite.internal.catalog.storage.ObjectIdGenUpdateEntry;
+import org.apache.ignite.internal.catalog.storage.UpdateEntry;
+
+/**
+ * Command to create a new schema.
+ */
+public class CreateSchemaCommand implements CatalogCommand {
+
+ private final String schemaName;
+
+ private CreateSchemaCommand(String schemaName) {
+ validateIdentifier(schemaName, "Name of the schema");
+
+ this.schemaName = schemaName;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public List<UpdateEntry> get(Catalog catalog) {
+ int id = catalog.objectIdGenState();
+
+ if (catalog.schema(schemaName) != null) {
+ throw new CatalogValidationException(format("Schema with name '{}'
already exists", schemaName));
+ }
+
+ CatalogSchemaDescriptor schema = new CatalogSchemaDescriptor(
+ id,
+ schemaName,
+ new CatalogTableDescriptor[0],
+ new CatalogIndexDescriptor[0],
+ new CatalogSystemViewDescriptor[0],
+ INITIAL_CAUSALITY_TOKEN
+ );
+
+ return List.of(
+ new NewSchemaEntry(schema),
+ new ObjectIdGenUpdateEntry(1)
+ );
+ }
+
+ /** Returns builder to create a command to create a new schema. */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Implementation of {@link CreateSchemaCommandBuilder}. */
+ public static class Builder implements CreateSchemaCommandBuilder {
+
+ private String name;
+
+ /** {@inheritDoc} */
+ @Override
+ public CreateSchemaCommandBuilder name(String name) {
+ this.name = name;
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CatalogCommand build() {
+ return new CreateSchemaCommand(name);
+ }
+ }
+}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandBuilder.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandBuilder.java
new file mode 100644
index 0000000000..b3e690f6b8
--- /dev/null
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandBuilder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import org.apache.ignite.internal.catalog.CatalogCommand;
+
+/**
+ * Builder for a {@link CreateSchemaCommand}.
+ */
+public interface CreateSchemaCommandBuilder {
+
+ /** Sets schema name. Should not be null or blank. */
+ CreateSchemaCommandBuilder name(String name);
+
+ /** Creates new schema command. */
+ CatalogCommand build();
+}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSystemViewCommand.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSystemViewCommand.java
index efe6e63e24..baf358dd3c 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSystemViewCommand.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateSystemViewCommand.java
@@ -108,7 +108,7 @@ public class CreateSystemViewCommand implements
CatalogCommand {
CatalogSchemaDescriptor systemSchema = schemaOrThrow(catalog,
CatalogManager.SYSTEM_SCHEMA_NAME);
List<CatalogTableColumnDescriptor> viewColumns =
columns.stream().map(CatalogUtils::fromParams).collect(toList());
- CatalogSystemViewDescriptor descriptor = new
CatalogSystemViewDescriptor(id, name, viewColumns, systemViewType);
+ CatalogSystemViewDescriptor descriptor = new
CatalogSystemViewDescriptor(id, systemSchema.id(), name, viewColumns,
systemViewType);
CatalogSystemViewDescriptor existingSystemView =
systemSchema.systemView(name);
@@ -121,7 +121,7 @@ public class CreateSystemViewCommand implements
CatalogCommand {
}
return List.of(
- new NewSystemViewEntry(descriptor, systemSchema.name()),
+ new NewSystemViewEntry(descriptor),
new ObjectIdGenUpdateEntry(1)
);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
index d8defb6a21..7a05f4458e 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
@@ -151,8 +151,8 @@ public class CreateTableCommand extends
AbstractTableCommand {
CatalogIndexDescriptor pkIndex =
createIndexDescriptor(txWaitCatalogVersion, indexName, pkIndexId, tableId);
return List.of(
- new NewTableEntry(table, schemaName),
- new NewIndexEntry(pkIndex, schemaName),
+ new NewTableEntry(table),
+ new NewIndexEntry(pkIndex),
new MakeIndexAvailableEntry(pkIndexId),
new ObjectIdGenUpdateEntry(id - catalog.objectIdGenState())
);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java
index 52cfbaf197..2efb9e4f2a 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropIndexCommand.java
@@ -90,7 +90,7 @@ public class DropIndexCommand extends AbstractIndexCommand {
case BUILDING:
return List.of(new RemoveIndexEntry(index.id()));
case AVAILABLE:
- return List.of(new DropIndexEntry(index.id(),
index.tableId()));
+ return List.of(new DropIndexEntry(index.id()));
default:
throw new IllegalStateException("Unknown index status: " +
index.status());
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java
index ed2f713013..399b2d96c6 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/DropTableCommand.java
@@ -62,7 +62,7 @@ public class DropTableCommand extends AbstractTableCommand {
updateEntries.add(new RemoveIndexEntry(index.id()));
});
- updateEntries.add(new DropTableEntry(table.id(), schemaName));
+ updateEntries.add(new DropTableEntry(table.id()));
return updateEntries;
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogColumnCollation.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogColumnCollation.java
index f4941068fc..efaf92a096 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogColumnCollation.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogColumnCollation.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.catalog.descriptors;
/**
* Enumeration of all supported collations.
*/
-// TODO: IGNITE-19082 drop similar classes in index and sql-engine modules.
+// TODO: https://issues.apache.org/jira/browse/IGNITE-22179 drop similar
classes in index and sql-engine modules.
public enum CatalogColumnCollation {
ASC_NULLS_FIRST(true, true),
ASC_NULLS_LAST(true, false),
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
index 93a1affae6..a14c84685f 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
@@ -99,12 +99,13 @@ public abstract class CatalogIndexDescriptor extends
CatalogObjectDescriptor {
/** Returns catalog index descriptor type by identifier. */
public static CatalogIndexDescriptorType forId(int id) {
- assert id == HASH.typeId || id == SORTED.typeId : "Unknown index
descriptor type ID: " + id;
-
- if (id == HASH.typeId) {
- return HASH;
- } else {
- return SORTED;
+ switch (id) {
+ case 0:
+ return HASH;
+ case 1:
+ return SORTED;
+ default:
+ throw new IllegalArgumentException("Unknown index
descriptor type id: " + id);
}
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
index 6efa73677e..edbb46b650 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
@@ -35,6 +35,8 @@ import org.apache.ignite.internal.util.io.IgniteDataOutput;
public class CatalogSystemViewDescriptor extends CatalogObjectDescriptor {
public static final CatalogObjectSerializer<CatalogSystemViewDescriptor>
SERIALIZER = new SystemViewDescriptorSerializer();
+ private final int schemaId;
+
private final List<CatalogTableColumnDescriptor> columns;
private final SystemViewType systemViewType;
@@ -43,18 +45,26 @@ public class CatalogSystemViewDescriptor extends
CatalogObjectDescriptor {
* Constructor.
*
* @param id View id.
+ * @param schemaId Schema id.
* @param name View name.
* @param columns View columns.
* @param systemViewType View type.
*/
- public CatalogSystemViewDescriptor(int id, String name,
List<CatalogTableColumnDescriptor> columns, SystemViewType systemViewType) {
- this(id, name, columns, systemViewType, INITIAL_CAUSALITY_TOKEN);
+ public CatalogSystemViewDescriptor(
+ int id,
+ int schemaId,
+ String name,
+ List<CatalogTableColumnDescriptor> columns,
+ SystemViewType systemViewType
+ ) {
+ this(id, schemaId, name, columns, systemViewType,
INITIAL_CAUSALITY_TOKEN);
}
/**
* Constructor.
*
* @param id View id.
+ * @param schemaId Schema id.
* @param name View name.
* @param columns View columns.
* @param systemViewType View type.
@@ -62,6 +72,7 @@ public class CatalogSystemViewDescriptor extends
CatalogObjectDescriptor {
*/
public CatalogSystemViewDescriptor(
int id,
+ int schemaId,
String name,
List<CatalogTableColumnDescriptor> columns,
SystemViewType systemViewType,
@@ -69,10 +80,20 @@ public class CatalogSystemViewDescriptor extends
CatalogObjectDescriptor {
) {
super(id, Type.SYSTEM_VIEW, name, causalityToken);
+ this.schemaId = schemaId;
this.columns = Objects.requireNonNull(columns, "columns");
this.systemViewType = Objects.requireNonNull(systemViewType,
"viewType");
}
+ /**
+ * Returns a schema id of this view.
+ *
+ * @return A schema id.
+ */
+ public int schemaId() {
+ return schemaId;
+ }
+
/**
* Returns a list of columns of this view.
*
@@ -101,13 +122,13 @@ public class CatalogSystemViewDescriptor extends
CatalogObjectDescriptor {
return false;
}
CatalogSystemViewDescriptor that = (CatalogSystemViewDescriptor) o;
- return Objects.equals(columns, that.columns) && systemViewType ==
that.systemViewType;
+ return schemaId == that.schemaId && Objects.equals(columns,
that.columns) && systemViewType == that.systemViewType;
}
/** {@inheritDoc} */
@Override
public int hashCode() {
- return Objects.hash(columns, systemViewType);
+ return Objects.hash(schemaId, columns, systemViewType);
}
/** {@inheritDoc} */
@@ -116,6 +137,7 @@ public class CatalogSystemViewDescriptor extends
CatalogObjectDescriptor {
return S.toString(
CatalogSystemViewDescriptor.class, this,
"id", id(),
+ "schemaId", schemaId,
"name", name(),
"columns", columns,
"systemViewType", systemViewType()
@@ -147,12 +169,13 @@ public class CatalogSystemViewDescriptor extends
CatalogObjectDescriptor {
/** Returns system view type by identifier. */
private static SystemViewType forId(int id) {
- if (id == 0) {
- return NODE;
- } else {
- assert id == 1;
-
- return CLUSTER;
+ switch (id) {
+ case 0:
+ return NODE;
+ case 1:
+ return CLUSTER;
+ default:
+ throw new IllegalArgumentException("Unknown system view
type id: " + id);
}
}
}
@@ -164,6 +187,7 @@ public class CatalogSystemViewDescriptor extends
CatalogObjectDescriptor {
@Override
public CatalogSystemViewDescriptor readFrom(IgniteDataInput input)
throws IOException {
int id = input.readInt();
+ int schemaId = input.readInt();
String name = input.readUTF();
long updateToken = input.readLong();
List<CatalogTableColumnDescriptor> columns =
readList(CatalogTableColumnDescriptor.SERIALIZER, input);
@@ -171,12 +195,13 @@ public class CatalogSystemViewDescriptor extends
CatalogObjectDescriptor {
byte sysViewTypeId = input.readByte();
SystemViewType sysViewType = SystemViewType.forId(sysViewTypeId);
- return new CatalogSystemViewDescriptor(id, name, columns,
sysViewType, updateToken);
+ return new CatalogSystemViewDescriptor(id, schemaId, name,
columns, sysViewType, updateToken);
}
@Override
public void writeTo(CatalogSystemViewDescriptor descriptor,
IgniteDataOutput output) throws IOException {
output.writeInt(descriptor.id());
+ output.writeInt(descriptor.schemaId);
output.writeUTF(descriptor.name());
output.writeLong(descriptor.updateToken());
writeList(descriptor.columns(),
CatalogTableColumnDescriptor.SERIALIZER, output);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
index e8f7b1da43..ab14b290a0 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
@@ -26,12 +26,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import
org.apache.ignite.internal.catalog.descriptors.CatalogTableSchemaVersions.TableVersion;
import
org.apache.ignite.internal.catalog.storage.serialization.CatalogObjectSerializer;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.io.IgniteDataInput;
import org.apache.ignite.internal.util.io.IgniteDataOutput;
@@ -55,7 +55,9 @@ public class CatalogTableDescriptor extends
CatalogObjectDescriptor {
private final CatalogTableSchemaVersions schemaVersions;
private final List<CatalogTableColumnDescriptor> columns;
+ @IgniteToStringInclude
private final List<String> primaryKeyColumns;
+ @IgniteToStringInclude
private final List<String> colocationColumns;
@IgniteToStringExclude
@@ -125,22 +127,12 @@ public class CatalogTableDescriptor extends
CatalogObjectDescriptor {
this.pkIndexId = pkIndexId;
this.zoneId = zoneId;
this.columns = Objects.requireNonNull(columns, "No columns defined.");
- primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary key
columns.");
- colocationColumns = colocationCols == null || colocationCols.isEmpty()
? pkCols : colocationCols;
-
+ this.primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary
key columns.");
this.columnsMap =
columns.stream().collect(Collectors.toMap(CatalogTableColumnDescriptor::name,
Function.identity()));
-
- this.schemaVersions = schemaVersions;
-
+ this.colocationColumns = Objects.requireNonNullElse(colocationCols,
pkCols);
+ this.schemaVersions = Objects.requireNonNull(schemaVersions, "No
catalog schema versions.");
+ this.storageProfile = Objects.requireNonNull(storageProfile, "No
storage profile.");
this.creationToken = creationToken;
-
- this.storageProfile = storageProfile;
-
- // TODO: IGNITE-19082 Throw proper exceptions.
- assert !columnsMap.isEmpty() : "No columns.";
-
- assert primaryKeyColumns.stream().noneMatch(c ->
Objects.requireNonNull(columnsMap.get(c), c).nullable());
- assert Set.copyOf(primaryKeyColumns).containsAll(colocationColumns);
}
/**
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java
index ac8864b013..6a2710346d 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/StoppingIndexEventParameters.java
@@ -25,7 +25,6 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
* @see CatalogEvent#INDEX_STOPPING
*/
public class StoppingIndexEventParameters extends IndexEventParameters {
- private final int tableId;
/**
* Constructor.
@@ -33,16 +32,8 @@ public class StoppingIndexEventParameters extends
IndexEventParameters {
* @param causalityToken Causality token.
* @param catalogVersion Catalog version.
* @param indexId An id of dropped index.
- * @param tableId Table ID for which the index was removed.
*/
- public StoppingIndexEventParameters(long causalityToken, int
catalogVersion, int indexId, int tableId) {
+ public StoppingIndexEventParameters(long causalityToken, int
catalogVersion, int indexId) {
super(causalityToken, catalogVersion, indexId);
-
- this.tableId = tableId;
- }
-
- /** Returns table ID for which the index was removed. */
- public int tableId() {
- return tableId;
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
index 6c6baac45f..e86cf154b4 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java
@@ -21,6 +21,8 @@ import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZo
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.indexOrThrow;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceIndex;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import org.apache.ignite.internal.catalog.Catalog;
@@ -61,19 +63,9 @@ abstract class AbstractChangeIndexStatusEntry implements
UpdateEntry {
}
static CatalogSchemaDescriptor schemaByIndexId(Catalog catalog, int
indexId) {
- CatalogIndexDescriptor index = catalog.index(indexId);
-
- assert index != null : indexId;
-
- CatalogTableDescriptor table = catalog.table(index.tableId());
-
- assert table != null : index.tableId();
-
- CatalogSchemaDescriptor schema = catalog.schema(table.schemaId());
-
- assert schema != null : table.schemaId();
-
- return schema;
+ CatalogIndexDescriptor index = indexOrThrow(catalog, indexId);
+ CatalogTableDescriptor table = tableOrThrow(catalog, index.tableId());
+ return schemaOrThrow(catalog, table.schemaId());
}
private CatalogIndexDescriptor updateIndexStatus(
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
index f7cefa593f..0d08df2bb9 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.catalog.storage;
-import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import java.io.IOException;
import org.apache.ignite.internal.catalog.Catalog;
@@ -48,19 +48,15 @@ public class AlterColumnEntry implements UpdateEntry,
Fireable {
private final CatalogTableColumnDescriptor column;
- private final String schemaName;
-
/**
* Constructs the object.
*
* @param tableId An id the table to be modified.
* @param column A modified descriptor of the column to be replaced.
- * @param schemaName Schema name.
*/
- public AlterColumnEntry(int tableId, CatalogTableColumnDescriptor column,
String schemaName) {
+ public AlterColumnEntry(int tableId, CatalogTableColumnDescriptor column) {
this.tableId = tableId;
this.column = column;
- this.schemaName = schemaName;
}
/** Returns an id the table to be modified. */
@@ -90,18 +86,17 @@ public class AlterColumnEntry implements UpdateEntry,
Fireable {
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);
-
- CatalogTableDescriptor currentTableDescriptor =
requireNonNull(catalog.table(tableId));
+ CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog,
table.schemaId());
- CatalogTableDescriptor newTableDescriptor =
currentTableDescriptor.newDescriptor(
- currentTableDescriptor.name(),
- currentTableDescriptor.tableVersion() + 1,
- currentTableDescriptor.columns().stream()
+ CatalogTableDescriptor newTable = table.newDescriptor(
+ table.name(),
+ table.tableVersion() + 1,
+ table.columns().stream()
.map(source -> source.name().equals(column.name()) ?
column : source)
.collect(toList()),
causalityToken,
- currentTableDescriptor.storageProfile()
+ table.storageProfile()
);
return new Catalog(
@@ -109,7 +104,7 @@ public class AlterColumnEntry implements UpdateEntry,
Fireable {
catalog.time(),
catalog.objectIdGenState(),
catalog.zones(),
- replaceSchema(replaceTable(schema, newTableDescriptor),
catalog.schemas()),
+ replaceSchema(replaceTable(schema, newTable),
catalog.schemas()),
defaultZoneIdOpt(catalog)
);
}
@@ -126,18 +121,15 @@ public class AlterColumnEntry implements UpdateEntry,
Fireable {
@Override
public AlterColumnEntry readFrom(IgniteDataInput input) throws
IOException {
CatalogTableColumnDescriptor descriptor =
CatalogTableColumnDescriptor.SERIALIZER.readFrom(input);
-
- String schemaName = input.readUTF();
int tableId = input.readInt();
- return new AlterColumnEntry(tableId, descriptor, schemaName);
+ return new AlterColumnEntry(tableId, descriptor);
}
@Override
public void writeTo(AlterColumnEntry value, IgniteDataOutput output)
throws IOException {
CatalogTableColumnDescriptor.SERIALIZER.writeTo(value.descriptor(), output);
- output.writeUTF(value.schemaName);
output.writeInt(value.tableId);
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
index c3cd9f8e99..c31f5fcd12 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.catalog.storage;
-import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import static
org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.writeStringCollection;
import static org.apache.ignite.internal.util.IgniteUtils.capacity;
@@ -50,19 +50,16 @@ public class DropColumnsEntry implements UpdateEntry,
Fireable {
private final int tableId;
private final Set<String> columns;
- private final String schemaName;
/**
* Constructs the object.
*
* @param tableId Table id.
* @param columns Names of columns to drop.
- * @param schemaName Schema name.
*/
- public DropColumnsEntry(int tableId, Set<String> columns, String
schemaName) {
+ public DropColumnsEntry(int tableId, Set<String> columns) {
this.tableId = tableId;
this.columns = columns;
- this.schemaName = schemaName;
}
/** Returns table id. */
@@ -92,18 +89,17 @@ public class DropColumnsEntry implements UpdateEntry,
Fireable {
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);
+ CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog,
table.schemaId());
- CatalogTableDescriptor currentTableDescriptor =
requireNonNull(catalog.table(tableId));
-
- CatalogTableDescriptor newTableDescriptor =
currentTableDescriptor.newDescriptor(
- currentTableDescriptor.name(),
- currentTableDescriptor.tableVersion() + 1,
- currentTableDescriptor.columns().stream()
+ CatalogTableDescriptor newTable = table.newDescriptor(
+ table.name(),
+ table.tableVersion() + 1,
+ table.columns().stream()
.filter(col -> !columns.contains(col.name()))
.collect(toList()),
causalityToken,
- currentTableDescriptor.storageProfile()
+ table.storageProfile()
);
return new Catalog(
@@ -111,7 +107,7 @@ public class DropColumnsEntry implements UpdateEntry,
Fireable {
catalog.time(),
catalog.objectIdGenState(),
catalog.zones(),
- replaceSchema(replaceTable(schema, newTableDescriptor),
catalog.schemas()),
+ replaceSchema(replaceTable(schema, newTable),
catalog.schemas()),
defaultZoneIdOpt(catalog)
);
}
@@ -127,16 +123,14 @@ public class DropColumnsEntry implements UpdateEntry,
Fireable {
private static class DropColumnEntrySerializer implements
CatalogObjectSerializer<DropColumnsEntry> {
@Override
public DropColumnsEntry readFrom(IgniteDataInput input) throws
IOException {
- String schemaName = input.readUTF();
int tableId = input.readInt();
Set<String> columns =
CatalogSerializationUtils.readStringCollection(input, size -> new
HashSet<>(capacity(size)));
- return new DropColumnsEntry(tableId, columns, schemaName);
+ return new DropColumnsEntry(tableId, columns);
}
@Override
public void writeTo(DropColumnsEntry object, IgniteDataOutput output)
throws IOException {
- output.writeUTF(object.schemaName);
output.writeInt(object.tableId());
writeStringCollection(object.columns(), output);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
index 134455ed2a..4241dee915 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
@@ -33,20 +33,15 @@ import org.apache.ignite.internal.util.io.IgniteDataOutput;
* the {@link CatalogIndexStatus#STOPPING} state.
*/
public class DropIndexEntry extends AbstractChangeIndexStatusEntry implements
Fireable {
- public static final DropIndexEntrySerializer SERIALIZER = new
DropIndexEntrySerializer();
-
- private final int tableId;
+ public static final CatalogObjectSerializer<DropIndexEntry> SERIALIZER =
new DropIndexEntrySerializer();
/**
* Constructs the object.
*
* @param indexId An id of an index to drop.
- * @param tableId Table ID for which the index was removed.
*/
- public DropIndexEntry(int indexId, int tableId) {
+ public DropIndexEntry(int indexId) {
super(indexId, CatalogIndexStatus.STOPPING);
-
- this.tableId = tableId;
}
/** Returns an id of an index to drop. */
@@ -54,11 +49,6 @@ public class DropIndexEntry extends
AbstractChangeIndexStatusEntry implements Fi
return indexId;
}
- /** Returns table ID for which the index was removed. */
- public int tableId() {
- return tableId;
- }
-
@Override
public int typeId() {
return MarshallableEntryType.DROP_INDEX.id();
@@ -71,7 +61,7 @@ public class DropIndexEntry extends
AbstractChangeIndexStatusEntry implements Fi
@Override
public CatalogEventParameters createEventParameters(long causalityToken,
int catalogVersion) {
- return new StoppingIndexEventParameters(causalityToken,
catalogVersion, indexId, tableId);
+ return new StoppingIndexEventParameters(causalityToken,
catalogVersion, indexId);
}
@Override
@@ -86,15 +76,13 @@ public class DropIndexEntry extends
AbstractChangeIndexStatusEntry implements Fi
@Override
public DropIndexEntry readFrom(IgniteDataInput input) throws
IOException {
int indexId = input.readInt();
- int tableId = input.readInt();
- return new DropIndexEntry(indexId, tableId);
+ return new DropIndexEntry(indexId);
}
@Override
public void writeTo(DropIndexEntry entry, IgniteDataOutput out) throws
IOException {
out.writeInt(entry.indexId());
- out.writeInt(entry.tableId());
}
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
index 509d28444f..06ccd5aee3 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
@@ -18,10 +18,11 @@
package org.apache.ignite.internal.catalog.storage;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Objects;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
@@ -43,17 +44,13 @@ public class DropTableEntry implements UpdateEntry,
Fireable {
private final int tableId;
- private final String schemaName;
-
/**
* Constructs the object.
*
* @param tableId An id of a table to drop.
- * @param schemaName A schema name.
*/
- public DropTableEntry(int tableId, String schemaName) {
+ public DropTableEntry(int tableId) {
this.tableId = tableId;
- this.schemaName = schemaName;
}
/** Returns an id of a table to drop. */
@@ -78,7 +75,8 @@ public class DropTableEntry implements UpdateEntry, Fireable {
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(schemaName));
+ CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog,
table.schemaId());
return new Catalog(
catalog.version(),
@@ -109,15 +107,13 @@ public class DropTableEntry implements UpdateEntry,
Fireable {
@Override
public DropTableEntry readFrom(IgniteDataInput input) throws
IOException {
int tableId = input.readInt();
- String schemaName = input.readUTF();
- return new DropTableEntry(tableId, schemaName);
+ return new DropTableEntry(tableId);
}
@Override
public void writeTo(DropTableEntry entry, IgniteDataOutput out) throws
IOException {
out.writeInt(entry.tableId());
- out.writeUTF(entry.schemaName);
}
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
index de24c10d4e..9e29a5f971 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
@@ -17,11 +17,11 @@
package org.apache.ignite.internal.catalog.storage;
-import static java.util.Objects.requireNonNull;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import static
org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.readList;
import static
org.apache.ignite.internal.catalog.storage.serialization.CatalogSerializationUtils.writeList;
@@ -49,7 +49,6 @@ public class NewColumnsEntry implements UpdateEntry, Fireable
{
private final int tableId;
private final List<CatalogTableColumnDescriptor> descriptors;
- private final String schemaName;
/**
* Constructs the object.
@@ -57,10 +56,9 @@ public class NewColumnsEntry implements UpdateEntry,
Fireable {
* @param tableId Table id.
* @param descriptors Descriptors of columns to add.
*/
- public NewColumnsEntry(int tableId, List<CatalogTableColumnDescriptor>
descriptors, String schemaName) {
+ public NewColumnsEntry(int tableId, List<CatalogTableColumnDescriptor>
descriptors) {
this.tableId = tableId;
this.descriptors = descriptors;
- this.schemaName = schemaName;
}
/** Returns table id. */
@@ -90,16 +88,15 @@ public class NewColumnsEntry implements UpdateEntry,
Fireable {
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);
+ CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog,
table.schemaId());
- CatalogTableDescriptor currentTableDescriptor =
requireNonNull(catalog.table(tableId));
-
- CatalogTableDescriptor newTableDescriptor =
currentTableDescriptor.newDescriptor(
- currentTableDescriptor.name(),
- currentTableDescriptor.tableVersion() + 1,
- CollectionUtils.concat(currentTableDescriptor.columns(),
descriptors),
+ CatalogTableDescriptor newTable = table.newDescriptor(
+ table.name(),
+ table.tableVersion() + 1,
+ CollectionUtils.concat(table.columns(), descriptors),
causalityToken,
- currentTableDescriptor.storageProfile()
+ table.storageProfile()
);
return new Catalog(
@@ -107,7 +104,7 @@ public class NewColumnsEntry implements UpdateEntry,
Fireable {
catalog.time(),
catalog.objectIdGenState(),
catalog.zones(),
- replaceSchema(replaceTable(schema, newTableDescriptor),
catalog.schemas()),
+ replaceSchema(replaceTable(schema, newTable),
catalog.schemas()),
defaultZoneIdOpt(catalog)
);
}
@@ -125,16 +122,14 @@ public class NewColumnsEntry implements UpdateEntry,
Fireable {
public NewColumnsEntry readFrom(IgniteDataInput in) throws IOException
{
List<CatalogTableColumnDescriptor> columns =
readList(CatalogTableColumnDescriptor.SERIALIZER, in);
int tableId = in.readInt();
- String schemaName = in.readUTF();
- return new NewColumnsEntry(tableId, columns, schemaName);
+ return new NewColumnsEntry(tableId, columns);
}
@Override
public void writeTo(NewColumnsEntry entry, IgniteDataOutput out)
throws IOException {
writeList(entry.descriptors(),
CatalogTableColumnDescriptor.SERIALIZER, out);
out.writeInt(entry.tableId());
- out.writeUTF(entry.schemaName);
}
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
index 59f294cd93..b312d99d66 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
@@ -18,13 +18,15 @@
package org.apache.ignite.internal.catalog.storage;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import java.io.IOException;
-import java.util.Objects;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
@@ -44,17 +46,13 @@ public class NewIndexEntry implements UpdateEntry, Fireable
{
private final CatalogIndexDescriptor descriptor;
- private final String schemaName;
-
/**
* Constructs the object.
*
* @param descriptor A descriptor of an index to add.
- * @param schemaName Schema name.
*/
- public NewIndexEntry(CatalogIndexDescriptor descriptor, String schemaName)
{
+ public NewIndexEntry(CatalogIndexDescriptor descriptor) {
this.descriptor = descriptor;
- this.schemaName = schemaName;
}
/** Gets descriptor of an index to add. */
@@ -79,7 +77,8 @@ public class NewIndexEntry implements UpdateEntry, Fireable {
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(schemaName));
+ CatalogTableDescriptor table = tableOrThrow(catalog,
descriptor.tableId());
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog,
table.schemaId());
descriptor.updateToken(causalityToken);
@@ -111,15 +110,13 @@ public class NewIndexEntry implements UpdateEntry,
Fireable {
private static class NewIndexEntrySerializer implements
CatalogObjectSerializer<NewIndexEntry> {
@Override
public NewIndexEntry readFrom(IgniteDataInput input) throws
IOException {
- String schemaName = input.readUTF();
CatalogIndexDescriptor descriptor =
CatalogSerializationUtils.IDX_SERIALIZER.readFrom(input);
- return new NewIndexEntry(descriptor, schemaName);
+ return new NewIndexEntry(descriptor);
}
@Override
public void writeTo(NewIndexEntry entry, IgniteDataOutput output)
throws IOException {
- output.writeUTF(entry.schemaName);
CatalogSerializationUtils.IDX_SERIALIZER.writeTo(entry.descriptor(), output);
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSchemaEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSchemaEntry.java
new file mode 100644
index 0000000000..ec809dd276
--- /dev/null
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSchemaEntry.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.storage;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogValidationException;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import
org.apache.ignite.internal.catalog.storage.serialization.CatalogObjectSerializer;
+import
org.apache.ignite.internal.catalog.storage.serialization.MarshallableEntryType;
+import org.apache.ignite.internal.util.io.IgniteDataInput;
+import org.apache.ignite.internal.util.io.IgniteDataOutput;
+
+/**
+ * New schema entry.
+ */
+public class NewSchemaEntry implements UpdateEntry {
+ public static final CatalogObjectSerializer<NewSchemaEntry> SERIALIZER =
new Serializer();
+
+ private final CatalogSchemaDescriptor descriptor;
+
+ public NewSchemaEntry(CatalogSchemaDescriptor descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
+ CatalogSchemaDescriptor schema = catalog.schema(descriptor.name());
+
+ if (schema != null) {
+ throw new CatalogValidationException(format("Schema with name '{}'
already exists", schema.name()));
+ }
+
+ descriptor.updateToken(causalityToken);
+
+ List<CatalogSchemaDescriptor> schemas = new
ArrayList<>(catalog.schemas().size() + 1);
+ schemas.addAll(catalog.schemas());
+ schemas.add(descriptor);
+
+ return new Catalog(
+ catalog.version(),
+ catalog.time(),
+ catalog.objectIdGenState(),
+ catalog.zones(),
+ schemas,
+ catalog.defaultZone().id()
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int typeId() {
+ return MarshallableEntryType.NEW_SCHEMA.id();
+ }
+
+ private static class Serializer implements
CatalogObjectSerializer<NewSchemaEntry> {
+ @Override
+ public NewSchemaEntry readFrom(IgniteDataInput input) throws
IOException {
+ CatalogSchemaDescriptor schemaDescriptor =
CatalogSchemaDescriptor.SERIALIZER.readFrom(input);
+ return new NewSchemaEntry(schemaDescriptor);
+ }
+
+ @Override
+ public void writeTo(NewSchemaEntry value, IgniteDataOutput output)
throws IOException {
+ CatalogSchemaDescriptor.SERIALIZER.writeTo(value.descriptor,
output);
+ }
+ }
+}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
index 03d130a3cc..924749f8da 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.catalog.storage;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
import java.io.IOException;
import java.util.Arrays;
@@ -45,17 +46,13 @@ public class NewSystemViewEntry implements UpdateEntry,
Fireable {
private final CatalogSystemViewDescriptor descriptor;
- private final String schemaName;
-
/**
* Constructor.
*
* @param descriptor System view descriptor.
- * @param schemaName A schema name.
*/
- public NewSystemViewEntry(CatalogSystemViewDescriptor descriptor, String
schemaName) {
+ public NewSystemViewEntry(CatalogSystemViewDescriptor descriptor) {
this.descriptor = descriptor;
- this.schemaName = schemaName;
}
@Override
@@ -78,7 +75,7 @@ public class NewSystemViewEntry implements UpdateEntry,
Fireable {
/** {@inheritDoc} */
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor systemSchema = catalog.schema(schemaName);
+ CatalogSchemaDescriptor systemSchema = schemaOrThrow(catalog,
descriptor.schemaId());
descriptor.updateToken(causalityToken);
@@ -119,15 +116,13 @@ public class NewSystemViewEntry implements UpdateEntry,
Fireable {
@Override
public NewSystemViewEntry readFrom(IgniteDataInput input) throws
IOException {
CatalogSystemViewDescriptor descriptor =
CatalogSystemViewDescriptor.SERIALIZER.readFrom(input);
- String schema = input.readUTF();
- return new NewSystemViewEntry(descriptor, schema);
+ return new NewSystemViewEntry(descriptor);
}
@Override
public void writeTo(NewSystemViewEntry entry, IgniteDataOutput output)
throws IOException {
CatalogSystemViewDescriptor.SERIALIZER.writeTo(entry.descriptor,
output);
- output.writeUTF(entry.schemaName);
}
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
index 563bd5eeb5..42b4e5b042 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
@@ -18,10 +18,10 @@
package org.apache.ignite.internal.catalog.storage;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
import java.io.IOException;
import java.util.List;
-import java.util.Objects;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
@@ -44,17 +44,13 @@ public class NewTableEntry implements UpdateEntry, Fireable
{
private final CatalogTableDescriptor descriptor;
- private final String schemaName;
-
/**
* Constructs the object.
*
* @param descriptor A descriptor of a table to add.
- * @param schemaName A schema name.
*/
- public NewTableEntry(CatalogTableDescriptor descriptor, String schemaName)
{
+ public NewTableEntry(CatalogTableDescriptor descriptor) {
this.descriptor = descriptor;
- this.schemaName = schemaName;
}
/** Returns descriptor of a table to add. */
@@ -79,7 +75,7 @@ public class NewTableEntry implements UpdateEntry, Fireable {
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(schemaName));
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog,
descriptor.schemaId());
descriptor.updateToken(causalityToken);
@@ -114,15 +110,13 @@ public class NewTableEntry implements UpdateEntry,
Fireable {
@Override
public NewTableEntry readFrom(IgniteDataInput input) throws
IOException {
CatalogTableDescriptor descriptor =
CatalogTableDescriptor.SERIALIZER.readFrom(input);
- String schemaName = input.readUTF();
- return new NewTableEntry(descriptor, schemaName);
+ return new NewTableEntry(descriptor);
}
@Override
public void writeTo(NewTableEntry entry, IgniteDataOutput output)
throws IOException {
CatalogTableDescriptor.SERIALIZER.writeTo(entry.descriptor(),
output);
- output.writeUTF(entry.schemaName);
}
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java
index 420fc4aaeb..69c3cdb135 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/RenameTableEntry.java
@@ -17,10 +17,11 @@
package org.apache.ignite.internal.catalog.storage;
-import static java.util.Objects.requireNonNull;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.defaultZoneIdOpt;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceSchema;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.replaceTable;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.tableOrThrow;
import java.io.IOException;
import org.apache.ignite.internal.catalog.Catalog;
@@ -64,16 +65,15 @@ public class RenameTableEntry implements UpdateEntry,
Fireable {
@Override
public Catalog applyUpdate(Catalog catalog, long causalityToken) {
- CatalogTableDescriptor tableDescriptor =
requireNonNull(catalog.table(tableId));
+ CatalogTableDescriptor table = tableOrThrow(catalog, tableId);
+ CatalogSchemaDescriptor schema = schemaOrThrow(catalog,
table.schemaId());
- CatalogSchemaDescriptor schemaDescriptor =
requireNonNull(catalog.schema(tableDescriptor.schemaId()));
-
- CatalogTableDescriptor newTableDescriptor =
tableDescriptor.newDescriptor(
+ CatalogTableDescriptor newTable = table.newDescriptor(
newTableName,
- tableDescriptor.tableVersion() + 1,
- tableDescriptor.columns(),
+ table.tableVersion() + 1,
+ table.columns(),
causalityToken,
- tableDescriptor.storageProfile()
+ table.storageProfile()
);
return new Catalog(
@@ -81,7 +81,7 @@ public class RenameTableEntry implements UpdateEntry,
Fireable {
catalog.time(),
catalog.objectIdGenState(),
catalog.zones(),
- replaceSchema(replaceTable(schemaDescriptor,
newTableDescriptor), catalog.schemas()),
+ replaceSchema(replaceTable(schema, newTable),
catalog.schemas()),
defaultZoneIdOpt(catalog)
);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/CatalogEntrySerializerProvider.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/CatalogEntrySerializerProvider.java
index ed7e523d54..86167adc33 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/CatalogEntrySerializerProvider.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/CatalogEntrySerializerProvider.java
@@ -28,6 +28,7 @@ import
org.apache.ignite.internal.catalog.storage.DropZoneEntry;
import org.apache.ignite.internal.catalog.storage.MakeIndexAvailableEntry;
import org.apache.ignite.internal.catalog.storage.NewColumnsEntry;
import org.apache.ignite.internal.catalog.storage.NewIndexEntry;
+import org.apache.ignite.internal.catalog.storage.NewSchemaEntry;
import org.apache.ignite.internal.catalog.storage.NewSystemViewEntry;
import org.apache.ignite.internal.catalog.storage.NewTableEntry;
import org.apache.ignite.internal.catalog.storage.NewZoneEntry;
@@ -79,6 +80,7 @@ public interface CatalogEntrySerializerProvider {
serializers[MarshallableEntryType.SNAPSHOT.id()] =
SnapshotEntry.SERIALIZER;
serializers[MarshallableEntryType.RENAME_INDEX.id()] =
RenameIndexEntry.SERIALIZER;
serializers[MarshallableEntryType.SET_DEFAULT_ZONE.id()] =
SetDefaultZoneEntry.SERIALIZER;
+ serializers[MarshallableEntryType.NEW_SCHEMA.id()] =
NewSchemaEntry.SERIALIZER;
//noinspection ThisEscapedInObjectConstruction
serializers[MarshallableEntryType.VERSIONED_UPDATE.id()] = new
VersionedUpdateSerializer(this);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/MarshallableEntryType.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/MarshallableEntryType.java
index 5306fc7b59..8cdb7230d0 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/MarshallableEntryType.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/serialization/MarshallableEntryType.java
@@ -40,7 +40,8 @@ public enum MarshallableEntryType {
SNAPSHOT(16),
VERSIONED_UPDATE(17),
RENAME_INDEX(18),
- SET_DEFAULT_ZONE(19);
+ SET_DEFAULT_ZONE(19),
+ NEW_SCHEMA(20);
/** Type ID. */
private final int id;
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
index 5d0eb96f11..dbc0185482 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
@@ -66,12 +66,12 @@ public class CatalogManagerDescriptorCausalityTokenTest
extends BaseCatalogManag
@Test
public void testEmptyCatalog() {
- CatalogSchemaDescriptor defaultSchema =
manager.schema(DEFAULT_SCHEMA_NAME, 0);
+ CatalogSchemaDescriptor defaultSchema =
manager.schema(DEFAULT_SCHEMA_NAME, 1);
assertNotNull(defaultSchema);
assertNull(manager.catalog(0).defaultZone());
assertSame(defaultSchema, manager.activeSchema(DEFAULT_SCHEMA_NAME,
clock.nowLong()));
- assertSame(defaultSchema, manager.schema(0));
+ assertSame(defaultSchema, manager.schema(1));
assertSame(defaultSchema, manager.activeSchema(clock.nowLong()));
Catalog catalogWithDefaultZone = manager.catalog(1);
@@ -89,7 +89,7 @@ public class CatalogManagerDescriptorCausalityTokenTest
extends BaseCatalogManag
assertThrows(IllegalStateException.class, () ->
manager.activeSchema(-1L));
// Validate default schema.
- assertEquals(INITIAL_CAUSALITY_TOKEN, defaultSchema.updateToken());
+ assertEquals(1, defaultSchema.updateToken());
}
@Test
@@ -108,8 +108,7 @@ public class CatalogManagerDescriptorCausalityTokenTest
extends BaseCatalogManag
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, manager.activeSchema(123L));
- assertEquals(INITIAL_CAUSALITY_TOKEN, schema.updateToken());
+ assertEquals(1, schema.updateToken());
assertNull(schema.table(TABLE_NAME));
assertNull(manager.table(TABLE_NAME, 123L));
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index e0a34fbe0e..d15c5ff889 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -108,6 +108,7 @@ import
org.apache.ignite.internal.catalog.commands.AlterZoneSetDefaultCommand;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.commands.ColumnParams.Builder;
+import org.apache.ignite.internal.catalog.commands.CreateSchemaCommand;
import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
import org.apache.ignite.internal.catalog.commands.DefaultValue;
import org.apache.ignite.internal.catalog.commands.DropIndexCommand;
@@ -173,12 +174,12 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
@Test
public void testEmptyCatalog() {
- CatalogSchemaDescriptor defaultSchema =
manager.schema(DEFAULT_SCHEMA_NAME, 0);
+ CatalogSchemaDescriptor defaultSchema =
manager.schema(DEFAULT_SCHEMA_NAME, 1);
assertNotNull(defaultSchema);
assertSame(defaultSchema, manager.activeSchema(DEFAULT_SCHEMA_NAME,
clock.nowLong()));
- assertSame(defaultSchema, manager.schema(0));
- assertSame(defaultSchema, manager.schema(defaultSchema.id(), 0));
+ assertSame(defaultSchema, manager.schema(1));
+ assertSame(defaultSchema, manager.schema(defaultSchema.id(), 1));
assertSame(defaultSchema, manager.activeSchema(clock.nowLong()));
int nonExistingVersion = manager.latestCatalogVersion() + 1;
@@ -189,7 +190,7 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
// Validate default schema.
assertEquals(DEFAULT_SCHEMA_NAME, defaultSchema.name());
- assertEquals(0, defaultSchema.id());
+ assertEquals(1, defaultSchema.id());
assertEquals(0, defaultSchema.tables().length);
assertEquals(0, defaultSchema.indexes().length);
@@ -206,15 +207,15 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
// System schema should exist.
- CatalogSchemaDescriptor systemSchema =
manager.schema(SYSTEM_SCHEMA_NAME, 0);
+ CatalogSchemaDescriptor systemSchema =
manager.schema(SYSTEM_SCHEMA_NAME, 1);
assertNotNull(systemSchema, "system schema");
assertSame(systemSchema, manager.activeSchema(SYSTEM_SCHEMA_NAME,
clock.nowLong()));
- assertSame(systemSchema, manager.schema(SYSTEM_SCHEMA_NAME, 0));
- assertSame(systemSchema, manager.schema(systemSchema.id(), 0));
+ assertSame(systemSchema, manager.schema(SYSTEM_SCHEMA_NAME, 1));
+ assertSame(systemSchema, manager.schema(systemSchema.id(), 1));
// Validate system schema.
assertEquals(SYSTEM_SCHEMA_NAME, systemSchema.name());
- assertEquals(1, systemSchema.id());
+ assertEquals(2, systemSchema.id());
assertEquals(0, systemSchema.tables().length);
assertEquals(0, systemSchema.indexes().length);
@@ -260,6 +261,8 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
@Test
public void testCreateTable() {
+ long timePriorToTableCreation = clock.nowLong();
+
int tableCreationVersion = await(
manager.execute(createTableCommand(
TABLE_NAME,
@@ -274,8 +277,7 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, manager.activeSchema(0L));
- assertSame(schema, manager.activeSchema(123L));
+ assertSame(schema, manager.activeSchema(timePriorToTableCreation));
assertNull(schema.table(TABLE_NAME));
assertNull(manager.table(TABLE_NAME, 123L));
@@ -1136,18 +1138,12 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
return falseCompletedFuture();
});
- CompletableFuture<?> createTableFut = manager.execute(List.of(
- CreateZoneCommand.builder()
- .zoneName("TEST_ZONE")
-
.storageProfilesParams(List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build()))
- .build(),
- AlterZoneSetDefaultCommand.builder()
- .zoneName("TEST_ZONE")
- .build(),
- simpleTable("T")
- ));
+ // It should not matter what a command does
+ CatalogCommand catalogCommand = catalog -> List.of(new
ObjectIdGenUpdateEntry(1));
- assertThat(createTableFut, willThrow(IgniteInternalException.class,
"Max retry limit exceeded"));
+ CompletableFuture<?> fut = manager.execute(List.of(catalogCommand));
+
+ assertThat(fut, willThrow(IgniteInternalException.class, "Max retry
limit exceeded"));
// retry limit is hardcoded at
org.apache.ignite.internal.catalog.CatalogServiceImpl.MAX_RETRY_COUNT
verify(updateLogMock, times(10)).append(any());
@@ -1203,7 +1199,7 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
assertFalse(createTableFuture2.isDone());
- verify(clockWaiter, timeout(10_000).times(2)).waitFor(any());
+ verify(clockWaiter, timeout(10_000).times(3)).waitFor(any());
Catalog catalog0 = manager.catalog(manager.latestCatalogVersion());
@@ -1832,7 +1828,6 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
StoppingIndexEventParameters stoppingEventParameters =
stoppingCaptor.getValue();
assertEquals(indexId, stoppingEventParameters.indexId());
- assertEquals(tableId, stoppingEventParameters.tableId());
// Let's drop the table.
assertThat(manager.execute(dropTableCommand(TABLE_NAME)),
willCompleteSuccessfully());
@@ -2703,6 +2698,24 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
);
}
+ @Test
+ public void testCreateSchema() {
+ String schemaName = "S1";
+
+
assertThat(manager.execute(CreateSchemaCommand.builder().name(schemaName).build()),
willCompleteSuccessfully());
+
+ Catalog latestCatalog =
manager.catalog(manager.activeCatalogVersion(clock.nowLong()));
+
+ assertNotNull(latestCatalog);
+ assertNotNull(latestCatalog.schema(schemaName));
+ assertNotNull(latestCatalog.schema(DEFAULT_SCHEMA_NAME));
+
+ assertThat(
+
manager.execute(CreateSchemaCommand.builder().name(schemaName).build()),
+ willThrowFast(CatalogValidationException.class, "Schema with
name 'S1' already exists")
+ );
+ }
+
@Test
public void testCatalogCompaction() throws Exception {
assertThat(manager.execute(simpleTable(TABLE_NAME)),
willCompleteSuccessfully());
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
index 4d06b4a724..0474a24cad 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.catalog;
import static java.util.stream.Collectors.toList;
-import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static
org.apache.ignite.internal.catalog.CatalogService.SYSTEM_SCHEMA_NAME;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -119,7 +118,7 @@ public class CatalogSystemViewTest extends
BaseCatalogManagerTest {
CatalogSchemaDescriptor schema = manager.activeSchema(clock.nowLong());
assertNotNull(schema);
- assertEquals(INITIAL_CAUSALITY_TOKEN, schema.updateToken());
+ assertEquals(1, schema.updateToken());
assertThat(manager.execute(command), willCompleteSuccessfully());
@@ -131,7 +130,7 @@ public class CatalogSystemViewTest extends
BaseCatalogManagerTest {
schema = manager.activeSchema(clock.nowLong());
assertNotNull(schema);
long schemaCausalityToken = schema.updateToken();
- assertEquals(INITIAL_CAUSALITY_TOKEN, schemaCausalityToken);
+ assertEquals(1, schemaCausalityToken);
// Assert that creation of the system view updates token for the
descriptor.
assertTrue(systemSchema.updateToken() > schemaCausalityToken);
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
index a1e24c4bb8..751b5373a0 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
@@ -389,7 +389,7 @@ public class CatalogUtilsTest extends
BaseIgniteAbstractTest {
Exception e = assertThrows(CatalogValidationException.class, () ->
replaceIndex(schema, index));
- assertThat(e.getMessage(), is(String.format("Index with ID %d has not
been found in schema with ID %d", index.id(), 0)));
+ assertThat(e.getMessage(), is(String.format("Index with ID %d has not
been found in schema with ID %d", index.id(), 1)));
}
@Test
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandValidationTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandValidationTest.java
new file mode 100644
index 0000000000..ff85120de0
--- /dev/null
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CreateSchemaCommandValidationTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.commands;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogValidationException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests to verify validation of {@link CreateSchemaCommand}.
+ */
+public class CreateSchemaCommandValidationTest extends
AbstractCommandValidationTest {
+
+ @ParameterizedTest(name = "[{index}] ''{argumentsWithNames}''")
+ @MethodSource("nullAndBlankStrings")
+ void schemaNameMustNotBeNullOrBlank(String name) {
+ CreateSchemaCommandBuilder builder =
CreateSchemaCommand.builder().name(name);
+
+ assertThrows(
+ CatalogValidationException.class,
+ builder::build,
+ "Name of the schema can't be null or blank"
+ );
+ }
+
+ @Test
+ void commandFailsWhenSchemaAlreadyExists() {
+ String schemaName = "TEST";
+
+ CreateSchemaCommandBuilder builder =
CreateSchemaCommand.builder().name(schemaName);
+
+ Catalog catalog = catalogWithSchema(schemaName);
+
+ assertThrows(
+ CatalogValidationException.class,
+ () -> builder.build().get(catalog),
+ "Schema with name 'TEST' already exists"
+ );
+ }
+
+ private static Catalog catalogWithSchema(String schemaName) {
+ return catalog(CreateSchemaCommand.builder().name(schemaName).build());
+ }
+}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptorTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptorTest.java
index 41ed56c46d..92879a1169 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptorTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptorTest.java
@@ -30,6 +30,7 @@ class CatalogSystemViewDescriptorTest {
void toStringContainsTypeAndFields() {
var descriptor = new CatalogSystemViewDescriptor(
1,
+ 2,
"view1",
List.of(),
SystemViewType.NODE
@@ -39,6 +40,7 @@ class CatalogSystemViewDescriptorTest {
assertThat(toString, startsWith("CatalogSystemViewDescriptor ["));
assertThat(toString, containsString("id=1"));
+ assertThat(toString, containsString("schemaId=2"));
assertThat(toString, containsString("name=view1"));
assertThat(toString, containsString("systemViewType=NODE"));
}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java
index ac0e23b8ed..44804040eb 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/CatalogEntrySerializationTest.java
@@ -106,15 +106,15 @@ public class CatalogEntrySerializationTest extends
BaseIgniteAbstractTest {
break;
case DROP_COLUMN:
- checkSerialization(new DropColumnsEntry(1, Set.of("C1", "C2"),
"PUBLIC"));
+ checkSerialization(new DropColumnsEntry(1, Set.of("C1",
"C2")));
break;
case DROP_INDEX:
- checkSerialization(new DropIndexEntry(231, 23), new
DropIndexEntry(231, 1));
+ checkSerialization(new DropIndexEntry(231), new
DropIndexEntry(231));
break;
case DROP_TABLE:
- checkSerialization(new DropTableEntry(23, "PUBLIC"), new
DropTableEntry(3, "SYSTEM"));
+ checkSerialization(new DropTableEntry(23), new
DropTableEntry(3));
break;
case DROP_ZONE:
@@ -169,6 +169,11 @@ public class CatalogEntrySerializationTest extends
BaseIgniteAbstractTest {
checkSerialization(new SetDefaultZoneEntry(1), new
SetDefaultZoneEntry(Integer.MAX_VALUE));
break;
+ case NEW_SCHEMA:
+ checkSerialization(new NewSchemaEntry(new
CatalogSchemaDescriptor(
+ 0, "S", new CatalogTableDescriptor[0], new
CatalogIndexDescriptor[0], new CatalogSystemViewDescriptor[0], 0)));
+ break;
+
default:
throw new UnsupportedOperationException("Test not implemented
" + type);
}
@@ -282,10 +287,10 @@ public class CatalogEntrySerializationTest extends
BaseIgniteAbstractTest {
newCatalogTableColumnDescriptor("c2",
DefaultValue.functionCall("function"));
CatalogTableColumnDescriptor desc4 =
newCatalogTableColumnDescriptor("c3", DefaultValue.constant(null));
- UpdateEntry entry1 = new AlterColumnEntry(1, desc1, "public");
- UpdateEntry entry2 = new AlterColumnEntry(1, desc2, "public");
- UpdateEntry entry3 = new AlterColumnEntry(1, desc3, "public");
- UpdateEntry entry4 = new AlterColumnEntry(1, desc4, "public");
+ UpdateEntry entry1 = new AlterColumnEntry(1, desc1);
+ UpdateEntry entry2 = new AlterColumnEntry(1, desc2);
+ UpdateEntry entry3 = new AlterColumnEntry(1, desc3);
+ UpdateEntry entry4 = new AlterColumnEntry(1, desc4);
VersionedUpdate update = newVersionedUpdate(entry1, entry2, entry3,
entry4);
@@ -296,7 +301,7 @@ public class CatalogEntrySerializationTest extends
BaseIgniteAbstractTest {
CatalogTableColumnDescriptor columnDescriptor1 =
newCatalogTableColumnDescriptor("c1", DefaultValue.constant(null));
CatalogTableColumnDescriptor columnDescriptor2 =
newCatalogTableColumnDescriptor("c2", DefaultValue.functionCall("func"));
- NewColumnsEntry entry = new NewColumnsEntry(11,
List.of(columnDescriptor1, columnDescriptor2), "PUBLIC");
+ NewColumnsEntry entry = new NewColumnsEntry(11,
List.of(columnDescriptor1, columnDescriptor2));
VersionedUpdate update = newVersionedUpdate(entry);
@@ -307,8 +312,8 @@ public class CatalogEntrySerializationTest extends
BaseIgniteAbstractTest {
CatalogSortedIndexDescriptor sortedIndexDescriptor =
newSortedIndexDescriptor("idx1");
CatalogHashIndexDescriptor hashIndexDescriptor =
newHashIndexDescriptor("idx2");
- NewIndexEntry sortedIdxEntry = new
NewIndexEntry(sortedIndexDescriptor, "PUBLIC");
- NewIndexEntry hashIdxEntry = new NewIndexEntry(hashIndexDescriptor,
"PUBLIC");
+ NewIndexEntry sortedIdxEntry = new
NewIndexEntry(sortedIndexDescriptor);
+ NewIndexEntry hashIdxEntry = new NewIndexEntry(hashIndexDescriptor);
VersionedUpdate update = newVersionedUpdate(sortedIdxEntry,
hashIdxEntry);
@@ -323,10 +328,10 @@ public class CatalogEntrySerializationTest extends
BaseIgniteAbstractTest {
List<CatalogTableColumnDescriptor> columns = List.of(col1, col2, col3,
col4);
- NewTableEntry entry1 = new NewTableEntry(newTableDescriptor("Table1",
columns, List.of("c1", "c2"), null), "PUBLIC");
- NewTableEntry entry2 = new NewTableEntry(newTableDescriptor("Table1",
columns, List.of("c1", "c2"), List.of()), "PUBLIC");
- NewTableEntry entry3 = new NewTableEntry(newTableDescriptor("Table1",
columns, List.of("c1", "c2"), List.of("c2")), "PUBLIC");
- NewTableEntry entry4 = new NewTableEntry(newTableDescriptor("Table1",
columns, List.of("c1", "c2"), List.of("c1")), "PUBLIC");
+ NewTableEntry entry1 = new NewTableEntry(newTableDescriptor("Table1",
columns, List.of("c1", "c2"), null));
+ NewTableEntry entry2 = new NewTableEntry(newTableDescriptor("Table1",
columns, List.of("c1", "c2"), List.of()));
+ NewTableEntry entry3 = new NewTableEntry(newTableDescriptor("Table1",
columns, List.of("c1", "c2"), List.of("c2")));
+ NewTableEntry entry4 = new NewTableEntry(newTableDescriptor("Table1",
columns, List.of("c1", "c2"), List.of("c1")));
VersionedUpdate update = newVersionedUpdate(entry1, entry2, entry3,
entry4);
VersionedUpdate deserialized = serialize(update);
@@ -342,12 +347,12 @@ public class CatalogEntrySerializationTest extends
BaseIgniteAbstractTest {
CatalogTableColumnDescriptor col2 =
newCatalogTableColumnDescriptor("c2", null);
CatalogSystemViewDescriptor nodeDesc =
- new CatalogSystemViewDescriptor(1, "view1", List.of(col1,
col2), SystemViewType.NODE);
+ new CatalogSystemViewDescriptor(1, 2, "view1", List.of(col1,
col2), SystemViewType.NODE);
CatalogSystemViewDescriptor clusterDesc =
- new CatalogSystemViewDescriptor(1, "view1", List.of(col1,
col2), SystemViewType.CLUSTER);
+ new CatalogSystemViewDescriptor(1, 2, "view1", List.of(col1,
col2), SystemViewType.CLUSTER);
- NewSystemViewEntry nodeEntry = new NewSystemViewEntry(nodeDesc,
"PUBLIC");
- NewSystemViewEntry clusterEntry = new NewSystemViewEntry(clusterDesc,
"PUBLIC");
+ NewSystemViewEntry nodeEntry = new NewSystemViewEntry(nodeDesc);
+ NewSystemViewEntry clusterEntry = new NewSystemViewEntry(clusterDesc);
VersionedUpdate update = newVersionedUpdate(nodeEntry, clusterEntry);
@@ -371,8 +376,8 @@ public class CatalogEntrySerializationTest extends
BaseIgniteAbstractTest {
};
CatalogSystemViewDescriptor[] views = {
- new CatalogSystemViewDescriptor(1, "view1", columns,
SystemViewType.NODE),
- new CatalogSystemViewDescriptor(1, "view2", columns,
SystemViewType.CLUSTER)
+ new CatalogSystemViewDescriptor(1, 2, "view1", columns,
SystemViewType.NODE),
+ new CatalogSystemViewDescriptor(1, 2, "view2", columns,
SystemViewType.CLUSTER)
};
CatalogStorageProfilesDescriptor profiles =
diff --git
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
index d6491b6bb7..6e9722f695 100644
---
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
+++
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakeCatalogService.java
@@ -158,7 +158,12 @@ public class FakeCatalogService implements CatalogService {
@Override
public CompletableFuture<Void> catalogReadyFuture(int version) {
- return null;
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> catalogInitializationFuture() {
+ return CompletableFuture.completedFuture(null);
}
@Override
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 5a4bfc0148..47be8726c5 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -69,7 +69,6 @@ import org.jetbrains.annotations.Nullable;
* <p>To avoid errors when using indexes while applying replication log during
node recovery, the registration of indexes was moved to the
* start of the tables.</p>
*/
-// TODO: IGNITE-19082 Delete this class
public class IndexManager implements IgniteComponent {
private static final IgniteLogger LOG =
Loggers.forClass(IndexManager.class);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index c90b014454..8503e0f529 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1114,6 +1114,7 @@ public class IgniteImpl implements Ignite {
return cmgMgr.onJoinReady();
}, startupExecutor)
.thenComposeAsync(ignored ->
awaitSelfInLocalLogicalTopology(), startupExecutor)
+ .thenCompose(ignored ->
catalogManager.catalogInitializationFuture())
.thenRunAsync(() -> {
try {
// Enable watermark events.
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index ec8875f171..9f044df7f5 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -266,6 +266,7 @@ public class Cluster {
return TestIgnitionManager.start(nodeName, config,
workDir.resolve(nodeName))
.thenApply(IgniteImpl.class::cast)
+ .thenCompose(ignite ->
ignite.catalogManager().catalogInitializationFuture().thenApply(ignored ->
ignite))
.thenApply(ignite -> {
synchronized (nodes) {
while (nodes.size() < nodeIndex) {
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index c30388f89e..58101d6a0a 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.schema.registry;
-import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
@@ -38,7 +37,6 @@ import
org.apache.ignite.internal.schema.mapping.ColumnMapping;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
/**
* Caching registry of actual schema descriptors for a table.
@@ -245,7 +243,7 @@ public class SchemaRegistryImpl implements SchemaRegistry {
* @param rowSchema Row schema.
* @return Column mapper for target schema.
*/
- ColumnMapper resolveMapping(SchemaDescriptor curSchema, SchemaDescriptor
rowSchema) {
+ private ColumnMapper resolveMapping(SchemaDescriptor curSchema,
SchemaDescriptor rowSchema) {
assert curSchema.version() > rowSchema.version();
if (curSchema.version() == rowSchema.version() + 1) {
@@ -254,9 +252,9 @@ public class SchemaRegistryImpl implements SchemaRegistry {
long mappingKey = (((long) curSchema.version()) << 32) |
(rowSchema.version());
- ColumnMapper mapping;
+ ColumnMapper mapping = mappingCache.get(mappingKey);
- if ((mapping = mappingCache.get(mappingKey)) != null) {
+ if (mapping != null) {
return mapping;
}
@@ -292,34 +290,6 @@ public class SchemaRegistryImpl implements SchemaRegistry {
makeSchemaVersionAvailable(desc);
}
- /**
- * Cleanup given schema version from history.
- *
- * @param ver Schema version to remove.
- * @throws SchemaRegistryException If incorrect schema version provided.
- */
- public void onSchemaDropped(int ver) {
- int lastVer = schemaCache.lastKey();
-
- if (ver <= 0 || ver >= lastVer || ver > schemaCache.keySet().first()) {
- throw new SchemaRegistryException("Incorrect schema version to
clean up to: " + ver);
- }
-
- if (schemaCache.remove(ver) != null) {
- mappingCache.keySet().removeIf(k -> (k & 0xFFFF_FFFFL) == ver);
- }
- }
-
- /**
- * For test purposes only.
- *
- * @return ColumnMapping cache.
- */
- @TestOnly
- Map<Long, ColumnMapper> mappingCache() {
- return unmodifiableMap(mappingCache);
- }
-
private CompletableFuture<SchemaDescriptor> tableSchemaAsync(int
schemaVer) {
if (schemaVer < lastKnownSchemaVersion()) {
return completedFuture(loadStoredSchemaByVersion(schemaVer));
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
index 25e7a7bd01..cecab95cbc 100644
---
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
@@ -21,7 +21,6 @@ import static
java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toMap;
import static
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor.INITIAL_TABLE_VERSION;
import static
org.apache.ignite.internal.schema.mapping.ColumnMapping.createMapper;
-import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
@@ -30,7 +29,6 @@ import static
org.apache.ignite.internal.type.NativeTypes.BYTES;
import static org.apache.ignite.internal.type.NativeTypes.INT64;
import static org.apache.ignite.internal.type.NativeTypes.STRING;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -45,7 +43,6 @@ import java.util.function.Function;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaUtils;
-import org.apache.ignite.internal.schema.mapping.ColumnMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -205,109 +202,6 @@ public class SchemaRegistryImplTest {
assertSameSchema(schemaV2, reg.schema(2));
}
- /**
- * Check schema cleanup.
- */
- @Test
- public void testSchemaCleanup() {
- final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
- new Column[]{new Column("keyLongCol", INT64, false)},
- new Column[]{
- new Column("valStringCol", STRING, true)
- });
-
- final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
- new Column[]{new Column("keyLongCol", INT64, false)},
- new Column[]{
- new Column("valBytesCol", BYTES, true),
- new Column("valStringCol", STRING, true)
- });
-
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null,
schemaV1);
-
- assertEquals(INITIAL_TABLE_VERSION, reg.lastKnownSchemaVersion());
-
- // Fail to cleanup initial schema
- assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaDropped(INITIAL_TABLE_VERSION));
- assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaDropped(0));
-
- // Register schema with very first version.
- assertThrows(SchemaRegistrationConflictException.class, () ->
reg.onSchemaRegistered(schemaV1));
-
- assertEquals(1, reg.lastKnownSchemaVersion());
- assertNotNull(reg.lastKnownSchema());
- assertNotNull(reg.schema(1));
-
- // Try to remove latest schema.
- assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaDropped(1));
-
- assertEquals(1, reg.lastKnownSchemaVersion());
- assertNotNull(reg.lastKnownSchema());
- assertNotNull(reg.schema(1));
-
- // Register new schema with next version.
- reg.onSchemaRegistered(schemaV2);
- reg.onSchemaRegistered(schemaV3);
-
- assertEquals(3, reg.lastKnownSchemaVersion());
- assertNotNull(reg.schema(1));
- assertNotNull(reg.schema(2));
- assertNotNull(reg.schema(3));
-
- // Remove outdated schema 1.
- reg.onSchemaDropped(1);
-
- assertEquals(3, reg.lastKnownSchemaVersion());
- assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
- assertNotNull(reg.schema(2));
- assertNotNull(reg.schema(3));
-
- // Remove non-existed schemas.
- reg.onSchemaDropped(1);
-
- assertEquals(3, reg.lastKnownSchemaVersion());
- assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
- assertNotNull(reg.schema(2));
- assertNotNull(reg.schema(3));
-
- // Register new schema with next version.
- reg.onSchemaRegistered(schemaV4);
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertNotNull(reg.schema(2));
- assertNotNull(reg.schema(3));
- assertNotNull(reg.schema(4));
-
- // Remove non-existed schemas.
- reg.onSchemaDropped(1);
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertSameSchema(schemaV4, reg.lastKnownSchema());
- assertSameSchema(schemaV2, reg.schema(2));
- assertSameSchema(schemaV3, reg.schema(3));
- assertSameSchema(schemaV4, reg.schema(4));
-
- // Out of order remove.
- assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaDropped(3));
-
- // Correct removal order.
- reg.onSchemaDropped(2);
- reg.onSchemaDropped(3);
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
- assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
- assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
- assertSameSchema(schemaV4, reg.lastKnownSchema());
- assertSameSchema(schemaV4, reg.schema(4));
-
- // Try to remove latest schema.
- assertThrows(SchemaRegistryException.class, () ->
reg.onSchemaDropped(4));
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertSameSchema(schemaV4, reg.schema(4));
- }
-
/**
* Check schema registration with full history.
*/
@@ -420,105 +314,6 @@ public class SchemaRegistryImplTest {
assertSameSchema(schemaV4, reg.schema(4));
}
- /**
- * Check schema cleanup.
- */
- @Test
- public void testSchemaWithHistoryCleanup() {
- final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
- new Column[]{new Column("keyLongCol", INT64, false)},
- new Column[]{
- new Column("valStringCol", STRING, true)
- });
-
- final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
- new Column[]{new Column("keyLongCol", INT64, false)},
- new Column[]{
- new Column("valBytesCol", BYTES, true),
- new Column("valStringCol", STRING, true)
- });
-
- Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2,
schemaV3, schemaV4);
-
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get,
schemaV4);
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertSameSchema(schemaV4, reg.lastKnownSchema());
- assertThrows(SchemaRegistryException.class, () -> reg.schema(1));
- assertSameSchema(schemaV2, reg.schema(2));
- assertSameSchema(schemaV3, reg.schema(3));
- assertSameSchema(schemaV4, reg.schema(4));
-
- history.remove(1);
- reg.onSchemaDropped(1);
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertNotNull(reg.schema(2));
- assertNotNull(reg.schema(3));
- assertNotNull(reg.schema(4));
-
- history.remove(2);
- history.remove(3);
- reg.onSchemaDropped(2);
- reg.onSchemaDropped(3);
-
- assertEquals(4, reg.lastKnownSchemaVersion());
- assertThrows(SchemaRegistryException.class, () -> reg.schema(2));
- assertThrows(SchemaRegistryException.class, () -> reg.schema(3));
- assertNotNull(reg.schema(4));
- }
-
- /**
- * Check schema cache cleanup.
- */
- @Test
- public void testSchemaCacheCleanup() {
- final SchemaDescriptor schemaV3 = new SchemaDescriptor(3,
- new Column[]{new Column("keyLongCol", INT64, false)},
- new Column[]{
- new Column("valStringCol", STRING, true)
- });
-
- schemaV3.columnMapping(createMapper(schemaV3).add(
- schemaV3.column("valStringCol").positionInRow(),
- schemaV2.column("valStringCol").positionInRow())
- );
-
- final SchemaDescriptor schemaV4 = new SchemaDescriptor(4,
- new Column[]{new Column("keyLongCol", INT64, false)},
- new Column[]{
- new Column("valBytesCol", BYTES, true),
- new Column("valStringCol", STRING, true)
- });
-
-
schemaV4.columnMapping(createMapper(schemaV4).add(schemaV4.column("valBytesCol")));
-
- final SchemaRegistryImpl reg = new SchemaRegistryImpl(v -> null,
schemaV1);
-
- final Map<Long, ColumnMapper> cache = reg.mappingCache();
-
- assertThrows(SchemaRegistrationConflictException.class, () ->
reg.onSchemaRegistered(schemaV1));
- reg.onSchemaRegistered(schemaV2);
- reg.onSchemaRegistered(schemaV3);
- reg.onSchemaRegistered(schemaV4);
-
- assertEquals(0, cache.size());
-
- reg.resolveMapping(schemaV4, schemaV1);
- reg.resolveMapping(schemaV3, schemaV1);
- reg.resolveMapping(schemaV4, schemaV2);
-
- assertEquals(3, cache.size());
-
- reg.onSchemaDropped(schemaV1.version());
-
- assertEquals(1, cache.size());
-
- reg.onSchemaDropped(schemaV2.version());
-
- assertEquals(0, cache.size());
- }
-
@Test
void schemaAsyncReturnsExpectedResults() {
Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1);
@@ -536,19 +331,6 @@ public class SchemaRegistryImplTest {
assertThat(schema2Future, willBe(schemaV2));
}
- @Test
- void schemaAsyncReturnsExceptionForCompactedAwayVersion() {
- Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1,
schemaV2);
-
- SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get,
schemaV2);
-
- history.remove(1);
- reg.onSchemaDropped(1);
-
- SchemaRegistryException ex = assertWillThrowFast(reg.schemaAsync(1),
SchemaRegistryException.class);
- assertThat(ex.getMessage(), is("Failed to find schema (was it
compacted away?) [version=1]"));
- }
-
/**
* SchemaHistory.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
index 8733e3835a..a2aaff1ed2 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCreateTableDdlTest.java
@@ -22,7 +22,6 @@ import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.SYSTEM_SCHEMAS;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
-import static org.apache.ignite.internal.table.TableTestUtils.getTableStrict;
import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_PARSE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -300,25 +299,6 @@ public class ItCreateTableDdlTest extends
BaseSqlIntegrationTest {
.check();
}
- /**
- * Checks that schema version is updated even if column names are
intersected.
- */
- // Need to be removed after
https://issues.apache.org/jira/browse/IGNITE-19082
- @Test
- public void checkSchemaUpdatedWithEqAlterColumn() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- IgniteImpl node = CLUSTER.aliveNode();
-
- int tableVersionBefore = getTableStrict(node.catalogManager(), "TEST",
node.clock().nowLong()).tableVersion();
-
- sql("ALTER TABLE TEST ADD COLUMN (VAL1 INT)");
-
- int tableVersionAfter = getTableStrict(node.catalogManager(), "TEST",
node.clock().nowLong()).tableVersion();
-
- assertEquals(tableVersionBefore + 1, tableVersionAfter);
- }
-
/**
* Check explicit colocation columns configuration.
*/
diff --git
a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
index 75667d9cac..1a37242ec4 100644
---
a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
+++
b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
@@ -117,8 +117,7 @@ public class SystemViewManagerImpl implements
SystemViewManager, NodeAttributesP
.map(SystemViewUtils::toSystemViewCreateCommand)
.collect(Collectors.toList());
- catalogManager.execute(commands).whenComplete(
- (r, t) -> {
+ catalogManager.catalogReadyFuture(1).thenCompose((x) ->
catalogManager.execute(commands)).whenComplete((r, t) -> {
viewsRegistrationFuture.complete(null);
if (t != null) {
diff --git
a/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
b/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
index 2f8c9cf823..1ee56d5217 100644
---
a/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
+++
b/modules/system-view/src/test/java/org/apache/ignite/internal/systemview/SystemViewManagerTest.java
@@ -39,10 +39,12 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.BitSet;
@@ -75,7 +77,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
-import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -86,8 +87,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
public class SystemViewManagerTest extends BaseIgniteAbstractTest {
private static final String LOCAL_NODE_NAME = "LOCAL_NODE_NAME";
- @Mock
- private CatalogManager catalog;
+ private final CatalogManager catalog = Mockito.mock(CatalogManager.class);
private SystemViewManagerImpl viewMgr;
@@ -119,13 +119,16 @@ public class SystemViewManagerTest extends
BaseIgniteAbstractTest {
@Test
public void startAfterStartFails() {
-
Mockito.when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
+
when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+ when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+ when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
viewMgr.register(() -> List.of(dummyView("test")));
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
- verify(catalog, only()).execute(anyList());
+ verify(catalog, times(1)).execute(anyList());
+ reset(catalog);
assertThrows(IllegalStateException.class, viewMgr::startAsync);
@@ -148,12 +151,14 @@ public class SystemViewManagerTest extends
BaseIgniteAbstractTest {
public void registerAllColumnTypes(NativeTypeSpec typeSpec) {
NativeType type = SchemaTestUtils.specToType(typeSpec);
-
Mockito.when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
+
when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+ when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+ when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
viewMgr.register(() -> List.of(dummyView("test", type)));
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
- verify(catalog, only()).execute(anyList());
+ verify(catalog, times(1)).execute(anyList());
assertTrue(viewMgr.completeRegistration().isDone());
}
@@ -161,20 +166,24 @@ public class SystemViewManagerTest extends
BaseIgniteAbstractTest {
public void managerStartsSuccessfullyEvenIfCatalogRespondsWithError() {
CatalogValidationException expected = new
CatalogValidationException("Expected exception.");
-
Mockito.when(catalog.execute(anyList())).thenReturn(failedFuture(expected));
+
when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+ when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+ when(catalog.execute(anyList())).thenReturn(failedFuture(expected));
viewMgr.register(() -> List.of(dummyView("test")));
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
- verify(catalog, only()).execute(anyList());
+ verify(catalog, times(1)).execute(anyList());
assertThat(viewMgr.completeRegistration(), willBe(nullValue()));
}
@Test
public void nodeAttributesUpdatedAfterStart() {
-
Mockito.when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
+
when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+ when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+ when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
String name1 = "view1";
String name2 = "view2";
@@ -185,9 +194,6 @@ public class SystemViewManagerTest extends
BaseIgniteAbstractTest {
assertThat(viewMgr.startAsync(), willCompleteSuccessfully());
- verify(catalog, only()).execute(anyList());
- verifyNoMoreInteractions(catalog);
-
assertThat(viewMgr.nodeAttributes(), is(Map.of(NODE_ATTRIBUTES_KEY,
String.join(NODE_ATTRIBUTES_LIST_SEPARATOR, name1.toUpperCase(
Locale.ROOT), name2.toUpperCase(Locale.ROOT)))));
}
@@ -251,7 +257,9 @@ public class SystemViewManagerTest extends
BaseIgniteAbstractTest {
@Test
void viewScanTest() {
-
Mockito.when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
+
when(catalog.catalogInitializationFuture()).thenReturn(nullCompletedFuture());
+ when(catalog.catalogReadyFuture(1)).thenReturn(nullCompletedFuture());
+ when(catalog.execute(anyList())).thenReturn(nullCompletedFuture());
String nodeView = "NODE_VIEW";
String clusterView = "CLUSTER_VIEW";