This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c92d16a250 IGNITE-20884 Registering all indexes for active tables on
node recovery (#2871)
c92d16a250 is described below
commit c92d16a2507002a728906626d0570556a170398c
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Nov 28 06:41:03 2023 +0300
IGNITE-20884 Registering all indexes for active tables on node recovery
(#2871)
---
.../internal/catalog/commands/CatalogUtils.java | 43 +++
.../descriptors/CatalogObjectDescriptor.java | 2 -
.../catalog/commands/CatalogUtilsTest.java | 299 ++++++++++++++++++++-
.../ignite/internal/catalog/CatalogTestUtils.java | 39 +++
.../internal/index/ItBuildIndexOneNodeTest.java | 20 +-
.../ignite/internal/index/ItIndexManagerTest.java | 123 +++++++++
.../apache/ignite/internal/index/IndexManager.java | 51 +++-
.../ignite/internal/index/IndexManagerTest.java | 292 +++++++++++++++++---
.../internal/index/TestIndexManagementUtils.java | 4 +-
.../internal/ClusterPerClassIntegrationTest.java | 40 +++
.../internal/table/distributed/TableManager.java | 37 ++-
11 files changed, 864 insertions(+), 86 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
index ff9828f5b0..670c8eebb9 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.catalog.commands;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.Collection;
import java.util.EnumMap;
import java.util.EnumSet;
@@ -28,6 +29,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.IndexNotFoundValidationException;
import org.apache.ignite.internal.catalog.TableNotFoundValidationException;
@@ -399,4 +401,45 @@ public class CatalogUtils {
return index;
}
+
+ /**
+ * Collects all table indexes (including dropped) that the table has in
the requested catalog version range.
+ *
+ * <p>It is expected that at least one index should be between the
requested versions.</p>
+ *
+ * @param catalogService Catalog service.
+ * @param tableId Table ID for which indexes will be collected.
+ * @param catalogVersionFrom Catalog version from which indexes will be
collected (including).
+ * @param catalogVersionTo Catalog version up to which indexes will be
collected (including).
+ * @return Table indexes.
+ */
+ public static Collection<CatalogIndexDescriptor> collectIndexes(
+ CatalogService catalogService,
+ int tableId,
+ int catalogVersionFrom,
+ int catalogVersionTo
+ ) {
+ assert catalogVersionFrom <= catalogVersionTo : "from=" +
catalogVersionFrom + ", to=" + catalogVersionTo;
+
+ if (catalogVersionFrom == catalogVersionTo) {
+ List<CatalogIndexDescriptor> indexes =
catalogService.indexes(catalogVersionFrom, tableId);
+
+ assert !indexes.isEmpty() : "catalogVersion=" + catalogVersionFrom
+ ", tableId=" + tableId;
+
+ return indexes;
+ }
+
+ var indexByIdMap = new Int2ObjectOpenHashMap<CatalogIndexDescriptor>();
+
+ for (int catalogVersion = catalogVersionFrom; catalogVersion <=
catalogVersionTo; catalogVersion++) {
+ for (CatalogIndexDescriptor index :
catalogService.indexes(catalogVersion, tableId)) {
+ indexByIdMap.put(index.id(), index);
+ }
+ }
+
+ assert !indexByIdMap.isEmpty()
+ : String.format("catalogVersionFrom=%s, catalogVersionTo=%s,
tableId=%s", catalogVersionFrom, catalogVersionTo, tableId);
+
+ return indexByIdMap.values();
+ }
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogObjectDescriptor.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogObjectDescriptor.java
index b22cd6d55a..ead38f600e 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogObjectDescriptor.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogObjectDescriptor.java
@@ -57,7 +57,6 @@ public abstract class CatalogObjectDescriptor implements
Serializable {
return type;
}
-
/**
* Token of the update of the descriptor.
* Updated when {@link
UpdateEntry#applyUpdate(org.apache.ignite.internal.catalog.Catalog, long)} is
called for the
@@ -84,7 +83,6 @@ public abstract class CatalogObjectDescriptor implements
Serializable {
this.updateToken = updateToken;
}
- /** {@inheritDoc} */
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
index 44fdb895db..c00ae71ffe 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/CatalogUtilsTest.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.catalog.commands;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
+import static
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.index;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_DATA_REGION;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
@@ -24,18 +28,42 @@ import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_R
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_STORAGE_ENGINE;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.collectIndexes;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParams;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.fromParamsAndPreviousValue;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItems;
import static org.junit.jupiter.api.Assertions.assertEquals;
-
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.List;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
/** For {@link CatalogUtils} testing. */
-public class CatalogUtilsTest {
+public class CatalogUtilsTest extends BaseIgniteAbstractTest {
private static final String ZONE_NAME = "test_zone";
+ private static final String TABLE_NAME = "test_table";
+
+ private static final String INDEX_NAME = "test_index";
+
+ private static final String PK_INDEX_NAME = pkIndexName(TABLE_NAME);
+
+ private static final String COLUMN_NAME = "key";
+
@Test
void testFromParamsCreateZoneWithoutAutoAdjustFields() {
CreateZoneParams params = CreateZoneParams.builder()
@@ -189,6 +217,206 @@ public class CatalogUtilsTest {
);
}
+ @Test
+ void testCollectIndexesAfterCreateTable() throws Exception {
+ withCatalogManager(catalogManager -> {
+ createTable(catalogManager, TABLE_NAME);
+
+ int latestCatalogVersion = catalogManager.latestCatalogVersion();
+ int earliestCatalogVersion =
catalogManager.earliestCatalogVersion();
+
+ int tableId = tableId(catalogManager, latestCatalogVersion,
TABLE_NAME);
+
+ assertThat(
+ collectIndexes(catalogManager, tableId,
latestCatalogVersion, latestCatalogVersion),
+ hasItems(index(catalogManager, latestCatalogVersion,
PK_INDEX_NAME))
+ );
+
+ assertThat(
+ collectIndexes(catalogManager, tableId,
earliestCatalogVersion, latestCatalogVersion),
+ hasItems(index(catalogManager, latestCatalogVersion,
PK_INDEX_NAME))
+ );
+ });
+ }
+
+ @Test
+ void testCollectIndexesAfterCreateIndex() throws Exception {
+ withCatalogManager(catalogManager -> {
+ createTable(catalogManager, TABLE_NAME);
+ createIndex(catalogManager, TABLE_NAME, INDEX_NAME);
+
+ int latestCatalogVersion = catalogManager.latestCatalogVersion();
+ int earliestCatalogVersion =
catalogManager.earliestCatalogVersion();
+
+ int tableId = tableId(catalogManager, latestCatalogVersion,
TABLE_NAME);
+
+ assertThat(
+ collectIndexes(catalogManager, tableId,
latestCatalogVersion, latestCatalogVersion),
+ hasItems(
+ index(catalogManager, latestCatalogVersion,
PK_INDEX_NAME),
+ index(catalogManager, latestCatalogVersion,
INDEX_NAME)
+ )
+ );
+
+ assertThat(
+ collectIndexes(catalogManager, tableId,
earliestCatalogVersion, latestCatalogVersion),
+ hasItems(
+ index(catalogManager, latestCatalogVersion,
PK_INDEX_NAME),
+ index(catalogManager, latestCatalogVersion,
INDEX_NAME)
+ )
+ );
+ });
+ }
+
+ @Test
+ void testCollectIndexesAfterCreateIndexAndMakeAvailableIndex() throws
Exception {
+ withCatalogManager(catalogManager -> {
+ String indexName0 = INDEX_NAME + 0;
+ String indexName1 = INDEX_NAME + 1;
+
+ createTable(catalogManager, TABLE_NAME);
+ createIndex(catalogManager, TABLE_NAME, indexName0);
+ createIndex(catalogManager, TABLE_NAME, indexName1);
+
+ makeIndexAvailable(catalogManager, indexName1);
+
+ int latestCatalogVersion = catalogManager.latestCatalogVersion();
+ int earliestCatalogVersion =
catalogManager.earliestCatalogVersion();
+
+ int tableId = tableId(catalogManager, latestCatalogVersion,
TABLE_NAME);
+
+ assertThat(
+ collectIndexes(catalogManager, tableId,
latestCatalogVersion, latestCatalogVersion),
+ hasItems(
+ index(catalogManager, latestCatalogVersion,
PK_INDEX_NAME),
+ index(catalogManager, latestCatalogVersion,
indexName0),
+ index(catalogManager, latestCatalogVersion,
indexName1)
+ )
+ );
+
+ assertThat(
+ collectIndexes(catalogManager, tableId,
earliestCatalogVersion, latestCatalogVersion),
+ hasItems(
+ index(catalogManager, latestCatalogVersion,
PK_INDEX_NAME),
+ index(catalogManager, latestCatalogVersion,
indexName0),
+ index(catalogManager, latestCatalogVersion,
indexName1)
+ )
+ );
+ });
+ }
+
+ @Test
+ void testCollectIndexesAfterDropIndexes() throws Exception {
+ withCatalogManager(catalogManager -> {
+ String indexName0 = INDEX_NAME + 0;
+ String indexName1 = INDEX_NAME + 1;
+
+ createTable(catalogManager, TABLE_NAME);
+ createIndex(catalogManager, TABLE_NAME, indexName0);
+ createIndex(catalogManager, TABLE_NAME, indexName1);
+
+ makeIndexAvailable(catalogManager, indexName1);
+
+ int catalogVersionBeforeDropIndex0 =
catalogManager.latestCatalogVersion();
+
+ dropIndex(catalogManager, indexName0);
+
+ int catalogVersionBeforeDropIndex1 =
catalogManager.latestCatalogVersion();
+
+ dropIndex(catalogManager, indexName1);
+
+ int latestCatalogVersion = catalogManager.latestCatalogVersion();
+ int earliestCatalogVersion =
catalogManager.earliestCatalogVersion();
+
+ int tableId = tableId(catalogManager, latestCatalogVersion,
TABLE_NAME);
+
+ assertThat(
+ collectIndexes(catalogManager, tableId,
latestCatalogVersion, latestCatalogVersion),
+ hasItems(index(catalogManager, latestCatalogVersion,
PK_INDEX_NAME))
+ );
+
+ assertThat(
+ collectIndexes(catalogManager, tableId,
earliestCatalogVersion, latestCatalogVersion),
+ hasItems(
+ index(catalogManager, latestCatalogVersion,
PK_INDEX_NAME),
+ index(catalogManager,
catalogVersionBeforeDropIndex0, indexName0),
+ index(catalogManager,
catalogVersionBeforeDropIndex1, indexName1)
+ )
+ );
+ });
+ }
+
+ /**
+ * Tests the more complex case of getting indexes.
+ *
+ * <p>Consider the following versions of the directory with its
contents:</p>
+ * <pre>
+ * Catalog versions and entity IDs have been simplified.
+ *
+ * 0 : T0 Ipk(A)
+ * 1 : T0 Ipk(A) I0(R) I1(R) I2(R) I3(R)
+ * 2 : T0 Ipk(A) I0(A) I1(R) I2(A) I3(R)
+ * 3 : T0 Ipk(A) I1(R) I2(A)
+ * </pre>
+ *
+ * <p>Expected indexes for range version:</p>
+ * <pre>
+ * 3 -> 3 : Ipk(A) I1(R) I2(A)
+ * 0 -> 3 : Ipk(A) I0(A) I1(R) I2(A) I3(R)
+ * </pre>
+ */
+ @Test
+ void testCollectIndexesComplexCase() throws Exception {
+ withCatalogManager(catalogManager -> {
+ String indexName0 = INDEX_NAME + 0;
+ String indexName1 = INDEX_NAME + 1;
+ String indexName2 = INDEX_NAME + 2;
+ String indexName3 = INDEX_NAME + 3;
+
+ createTable(catalogManager, TABLE_NAME);
+ createIndex(catalogManager, TABLE_NAME, indexName0);
+ createIndex(catalogManager, TABLE_NAME, indexName1);
+ createIndex(catalogManager, TABLE_NAME, indexName2);
+ createIndex(catalogManager, TABLE_NAME, indexName3);
+
+ makeIndexAvailable(catalogManager, indexName0);
+ makeIndexAvailable(catalogManager, indexName2);
+
+ int catalogVersionBeforeDropIndex0 =
catalogManager.latestCatalogVersion();
+
+ dropIndex(catalogManager, indexName0);
+
+ int catalogVersionBeforeDropIndex3 =
catalogManager.latestCatalogVersion();
+
+ dropIndex(catalogManager, indexName3);
+
+ int latestCatalogVersion = catalogManager.latestCatalogVersion();
+ int earliestCatalogVersion =
catalogManager.earliestCatalogVersion();
+
+ int tableId = tableId(catalogManager, latestCatalogVersion,
TABLE_NAME);
+
+ assertThat(
+ collectIndexes(catalogManager, tableId,
latestCatalogVersion, latestCatalogVersion),
+ hasItems(
+ index(catalogManager, latestCatalogVersion,
PK_INDEX_NAME),
+ index(catalogManager, latestCatalogVersion,
indexName1),
+ index(catalogManager, latestCatalogVersion,
indexName2)
+ )
+ );
+
+ assertThat(
+ collectIndexes(catalogManager, tableId,
earliestCatalogVersion, latestCatalogVersion),
+ hasItems(
+ index(catalogManager, latestCatalogVersion,
PK_INDEX_NAME),
+ index(catalogManager,
catalogVersionBeforeDropIndex0, indexName0),
+ index(catalogManager, latestCatalogVersion,
indexName1),
+ index(catalogManager, latestCatalogVersion,
indexName2),
+ index(catalogManager,
catalogVersionBeforeDropIndex3, indexName3)
+ )
+ );
+ });
+ }
+
private static CreateZoneParams createZoneParams(@Nullable Integer
autoAdjust, @Nullable Integer scaleUp, @Nullable Integer scaleDown) {
return CreateZoneParams.builder()
.zoneName(ZONE_NAME)
@@ -216,4 +444,71 @@ public class CatalogUtilsTest {
private static CatalogZoneDescriptor createPreviousZoneWithDefaults() {
return fromParams(1,
CreateZoneParams.builder().zoneName(ZONE_NAME).build());
}
+
+ private static void withCatalogManager(Consumer<CatalogManager> fun)
throws Exception {
+ CatalogManager catalogManager = createTestCatalogManager("test", new
HybridClockImpl());
+
+ try {
+ catalogManager.start();
+
+ fun.accept(catalogManager);
+ } finally {
+ catalogManager.stop();
+ }
+ }
+
+ private static void createTable(CatalogManager catalogManager, String
tableName) {
+ CatalogCommand catalogCommand = CreateTableCommand.builder()
+ .schemaName(DEFAULT_SCHEMA_NAME)
+ .zone(DEFAULT_ZONE_NAME)
+ .tableName(tableName)
+
.columns(List.of(ColumnParams.builder().name(COLUMN_NAME).type(INT32).build()))
+ .primaryKeyColumns(List.of(COLUMN_NAME))
+ .colocationColumns(List.of(COLUMN_NAME))
+ .build();
+
+ assertThat(catalogManager.execute(catalogCommand),
willCompleteSuccessfully());
+ }
+
+ private static void createIndex(CatalogManager catalogManager, String
tableName, String indexName) {
+ CatalogCommand catalogCommand = CreateHashIndexCommand.builder()
+ .schemaName(DEFAULT_SCHEMA_NAME)
+ .tableName(tableName)
+ .indexName(indexName)
+ .columns(List.of(COLUMN_NAME))
+ .unique(false)
+ .build();
+
+ assertThat(catalogManager.execute(catalogCommand),
willCompleteSuccessfully());
+ }
+
+ private static void makeIndexAvailable(CatalogManager catalogManager,
String indexName) {
+ CatalogIndexDescriptor index = index(catalogManager,
catalogManager.latestCatalogVersion(), indexName);
+
+ CatalogCommand catalogCommand = MakeIndexAvailableCommand.builder()
+ .indexId(index.id())
+ .build();
+
+ assertThat(catalogManager.execute(catalogCommand),
willCompleteSuccessfully());
+ }
+
+ private static void dropIndex(CatalogManager catalogManager, String
indexName) {
+ CatalogCommand catalogCommand = DropIndexCommand.builder()
+ .schemaName(DEFAULT_SCHEMA_NAME)
+ .indexName(indexName)
+ .build();
+
+ assertThat(catalogManager.execute(catalogCommand),
willCompleteSuccessfully());
+ }
+
+ private static int tableId(CatalogService catalogService, int
catalogVersion, String tableName) {
+ CatalogTableDescriptor tableDescriptor =
catalogService.tables(catalogVersion).stream()
+ .filter(table -> tableName.equals(table.name()))
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(tableDescriptor, "catalogVersion=" + catalogVersion + ",
tableName=" + tableName);
+
+ return tableDescriptor.id();
+ }
}
diff --git
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
index 141cbbb148..d621ba1eb8 100644
---
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
+++
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/CatalogTestUtils.java
@@ -21,6 +21,7 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.List;
import java.util.Set;
@@ -30,6 +31,8 @@ import
org.apache.ignite.internal.catalog.commands.AlterTableDropColumnCommand;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.commands.ColumnParams.Builder;
import org.apache.ignite.internal.catalog.commands.DropTableCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.storage.UpdateLog;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.catalog.storage.VersionedUpdate;
@@ -298,4 +301,40 @@ public class CatalogTestUtils {
}
}
+
+ /**
+ * Searches for a table by name in the requested version of the catalog.
+ *
+ * @param catalogService Catalog service.
+ * @param catalogVersion Catalog version in which to find the table.
+ * @param tableName Table name.
+ */
+ public static CatalogTableDescriptor table(CatalogService catalogService,
int catalogVersion, String tableName) {
+ CatalogTableDescriptor tableDescriptor =
catalogService.tables(catalogVersion).stream()
+ .filter(table -> tableName.equals(table.name()))
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(tableDescriptor, "catalogVersion=" + catalogVersion + ",
tableName=" + tableName);
+
+ return tableDescriptor;
+ }
+
+ /**
+ * Searches for an index by name in the requested version of the catalog.
+ *
+ * @param catalogService Catalog service.
+ * @param catalogVersion Catalog version in which to find the index.
+ * @param indexName Index name.
+ */
+ public static CatalogIndexDescriptor index(CatalogService catalogService,
int catalogVersion, String indexName) {
+ CatalogIndexDescriptor indexDescriptor =
catalogService.indexes(catalogVersion).stream()
+ .filter(index -> indexName.equals(index.name()))
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(indexDescriptor, "catalogVersion=" + catalogVersion + ",
indexName=" + indexName);
+
+ return indexDescriptor;
+ }
}
diff --git
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
index 1584494e2b..a265c4a334 100644
---
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
+++
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
@@ -22,14 +22,12 @@ import static
java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
@@ -40,7 +38,6 @@ import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import
org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters;
-import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.engine.util.QueryChecker;
import
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
@@ -102,7 +99,7 @@ public class ItBuildIndexOneNodeTest extends
BaseSqlIntegrationTest {
CLUSTER.stopNode(0);
CLUSTER.startNode(0);
- awaitIndexBecomeAvailable(node(), INDEX_NAME);
+ awaitIndexesBecomeAvailable(node(), INDEX_NAME);
}
@Test
@@ -243,10 +240,6 @@ public class ItBuildIndexOneNodeTest extends
BaseSqlIntegrationTest {
return CLUSTER.node(0);
}
- private static void awaitIndexBecomeAvailable(IgniteImpl ignite, String
indexName) throws Exception {
- assertTrue(waitForCondition(() -> isIndexAvailable(ignite, indexName),
100, 5_000));
- }
-
private static CompletableFuture<Void>
awaitIndexBecomeAvailableEventAsync(IgniteImpl ignite, String indexName) {
var future = new CompletableFuture<Void>();
@@ -284,15 +277,6 @@ public class ItBuildIndexOneNodeTest extends
BaseSqlIntegrationTest {
return future;
}
- private static boolean isIndexAvailable(IgniteImpl ignite, String
indexName) {
- CatalogManager catalogManager = ignite.catalogManager();
- HybridClock clock = ignite.clock();
-
- CatalogIndexDescriptor indexDescriptor =
catalogManager.index(indexName, clock.nowLong());
-
- return indexDescriptor != null && indexDescriptor.available();
- }
-
private static void createTableAndInsertManyPeople(AtomicInteger
nextPersonId) {
createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1);
@@ -303,7 +287,7 @@ public class ItBuildIndexOneNodeTest extends
BaseSqlIntegrationTest {
createIndex(TABLE_NAME, INDEX_NAME, "SALARY");
// Hack so that we can wait for the index to be added to the sql
planner.
- awaitIndexBecomeAvailable(node(), INDEX_NAME);
+ awaitIndexesBecomeAvailable(node(), INDEX_NAME);
waitForReadTimestampThatObservesMostRecentCatalog();
}
diff --git
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
new file mode 100644
index 0000000000..5339808b74
--- /dev/null
+++
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.index.IndexManager.collectIndexesForRecovery;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/** For {@link IndexManager} testing. */
+public class ItIndexManagerTest extends ClusterPerClassIntegrationTest {
+ private static final String ZONE_NAME = "ZONE_TABLE";
+
+ private static final String TABLE_NAME = "TEST_TABLE";
+
+ private static final String INDEX_NAME = "TEST_INDEX";
+
+ @AfterEach
+ void tearDown() {
+ if (node() != null) {
+ sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+ sql("DROP ZONE IF EXISTS " + ZONE_NAME);
+ }
+ }
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @Test
+ void testStartActiveAndDroppedIndexOnNodeRecovery() throws Exception {
+ createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1);
+
+ insertPeople(TABLE_NAME, createPeopleBatch(100));
+
+ String indexName0 = INDEX_NAME + 0;
+ String indexName1 = INDEX_NAME + 1;
+
+ createIndex(TABLE_NAME, indexName0, "NAME");
+ createIndex(TABLE_NAME, indexName1, "SALARY");
+
+ awaitIndexesBecomeAvailable(node(), indexName0, indexName1);
+
+ dropIndex(indexName1);
+
+ CLUSTER.restartNode(0);
+
+ TableImpl tableImpl = getTableImpl(node(), TABLE_NAME);
+
+ assertThat(
+ collectIndexIdsFromTable(tableImpl, 0),
+ equalTo(collectIndexIdsFromCatalogForRecovery(node(),
tableImpl))
+ );
+ }
+
+ private static IgniteImpl node() {
+ return CLUSTER.node(0);
+ }
+
+ private static Person[] createPeopleBatch(int batchSize) {
+ return IntStream.range(0, batchSize)
+ .mapToObj(personId -> new Person(personId, "person" +
personId, 10.0 + personId))
+ .toArray(Person[]::new);
+ }
+
+ private static TableImpl getTableImpl(IgniteImpl ignite, String tableName)
{
+ // IgniteTables#table is not used because under the hood
CompletableFuture#join is used to NOT freeze the test using an async call.
+ CompletableFuture<Table> tableFuture =
ignite.tables().tableAsync(tableName);
+
+ assertThat(tableFuture, willCompleteSuccessfully());
+
+ return (TableImpl) tableFuture.join();
+ }
+
+ private static List<Integer> collectIndexIdsFromTable(TableImpl table, int
partitionId) {
+ // Under the hood, TableIndexStoragesSupplier#get uses
CompletableFuture#join for all indexes to NOT freeze the test using an
+ // asynchronous call.
+ CompletableFuture<List<Integer>> future = runAsync(() ->
table.indexStorageAdapters(partitionId).get())
+ .thenApply(indexStorageByIndexId ->
indexStorageByIndexId.keySet().stream().sorted().collect(toList()));
+
+ assertThat(future, willCompleteSuccessfully());
+
+ return future.join();
+ }
+
+ private static List<Integer>
collectIndexIdsFromCatalogForRecovery(IgniteImpl ignite, TableImpl table) {
+ return
collectIndexesForRecovery(ignite.catalogManager()).entrySet().stream()
+ .filter(e -> e.getKey().id() == table.tableId())
+ .flatMap(e -> e.getValue().stream())
+ .map(CatalogObjectDescriptor::id)
+ .sorted()
+ .collect(toList());
+ }
+}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index c5a10da1e1..1fbc57e70a 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toMap;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
@@ -27,7 +28,10 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
@@ -309,19 +313,16 @@ public class IndexManager implements IgniteComponent {
assert recoveryFinishedFuture.isDone();
- int catalogVersion = catalogService.latestCatalogVersion();
long causalityToken = recoveryFinishedFuture.join();
List<CompletableFuture<?>> startIndexFutures = new ArrayList<>();
- for (CatalogIndexDescriptor index :
catalogService.indexes(catalogVersion)) {
- int tableId = index.tableId();
-
- CatalogTableDescriptor table = catalogService.table(tableId,
catalogVersion);
+ for (Entry<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>>
e : collectIndexesForRecovery(catalogService).entrySet()) {
+ CatalogTableDescriptor table = e.getKey();
- assert table != null : "tableId=" + tableId + ", indexId=" +
index.id();
-
- startIndexFutures.add(startIndexAsync(table, index,
causalityToken));
+ for (CatalogIndexDescriptor index : e.getValue()) {
+ startIndexFutures.add(startIndexAsync(table, index,
causalityToken));
+ }
}
// Forces to wait until recovery is complete before the metastore
watches are deployed to avoid races with other components.
@@ -444,4 +445,38 @@ public class IndexManager implements IgniteComponent {
return updateFunction.apply(t);
};
}
+
+ /**
+ * Collects indexes (including deleted ones) for tables (tables from the
latest version of the catalog) from the earliest to the latest
+ * version of the catalog that need to be started on node recovery.
+ *
+ * @param catalogService Catalog service.
+ */
+ static Map<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>>
collectIndexesForRecovery(CatalogService catalogService) {
+ int earliestCatalogVersion = catalogService.earliestCatalogVersion();
+ int latestCatalogVersion = catalogService.latestCatalogVersion();
+
+ var indexesByTableId = new
Int2ObjectOpenHashMap<Int2ObjectMap<CatalogIndexDescriptor>>();
+
+ for (CatalogTableDescriptor table :
catalogService.tables(latestCatalogVersion)) {
+ indexesByTableId.put(table.id(), new Int2ObjectOpenHashMap<>());
+ }
+
+ for (int catalogVersion = earliestCatalogVersion; catalogVersion <=
latestCatalogVersion; catalogVersion++) {
+ for (CatalogIndexDescriptor index :
catalogService.indexes(catalogVersion)) {
+ Int2ObjectMap<CatalogIndexDescriptor> indexById =
indexesByTableId.get(index.tableId());
+
+ if (indexById != null) {
+ indexById.put(index.id(), index);
+ }
+ }
+ }
+
+ return indexesByTableId.int2ObjectEntrySet()
+ .stream()
+ .collect(toMap(
+ entry -> catalogService.table(entry.getIntKey(),
latestCatalogVersion),
+ entry -> entry.getValue().values()
+ ));
+ }
}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 101d8ba00b..3a7d7c5b04 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -18,62 +18,77 @@
package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_DATA_REGION;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
import static
org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME;
import static
org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME;
import static
org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.PK_INDEX_NAME;
import static
org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME;
-import static
org.apache.ignite.internal.index.TestIndexManagementUtils.createIndex;
-import static
org.apache.ignite.internal.index.TestIndexManagementUtils.createTable;
-import static
org.apache.ignite.internal.index.TestIndexManagementUtils.dropIndex;
import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex;
-import static org.apache.ignite.internal.table.TableTestUtils.dropTable;
import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongFunction;
import org.apache.ignite.internal.catalog.CatalogManager;
-import org.apache.ignite.internal.catalog.CatalogManagerImpl;
-import org.apache.ignite.internal.catalog.ClockWaiter;
+import org.apache.ignite.internal.catalog.CatalogTestUtils;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
-import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.impl.MetaStorageService;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
-import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import
org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
+import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.PartitionSet;
import org.apache.ignite.internal.table.distributed.TableManager;
import
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
@@ -81,17 +96,29 @@ import
org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
/** Test class to verify {@link IndexManager}. */
+@ExtendWith(WorkDirectoryExtension.class)
public class IndexManagerTest extends BaseIgniteAbstractTest {
+ private static final String OTHER_TABLE_NAME = TABLE_NAME + "-other";
+
+ private static final String PK_INDEX_NAME_OTHER_TABLE =
pkIndexName(OTHER_TABLE_NAME);
+
private final HybridClock clock = new HybridClockImpl();
+ @WorkDirectory
+ private Path workDir;
+
+ private TableManager mockTableManager;
+
+ private SchemaManager mockSchemaManager;
+
private VaultManager vaultManager;
private MetaStorageManagerImpl metaStorageManager;
- private ClockWaiter clockWaiter;
-
private CatalogManager catalogManager;
private IndexManager indexManager;
@@ -100,46 +127,26 @@ public class IndexManagerTest extends
BaseIgniteAbstractTest {
@BeforeEach
public void setUp() {
- TableManager tableManagerMock = mock(TableManager.class);
-
- when(tableManagerMock.tableAsync(anyLong(), anyInt())).thenAnswer(inv
-> completedFuture(mockTable(inv.getArgument(1))));
+ mockTableManager = mock(TableManager.class);
- when(tableManagerMock.getTable(anyInt())).thenAnswer(inv ->
mockTable(inv.getArgument(0)));
+ when(mockTableManager.tableAsync(anyLong(), anyInt())).thenAnswer(inv
-> completedFuture(mockTable(inv.getArgument(1))));
- when(tableManagerMock.localPartitionSetAsync(anyLong(),
anyInt())).thenReturn(completedFuture(PartitionSet.EMPTY_SET));
+ when(mockTableManager.getTable(anyInt())).thenAnswer(inv ->
mockTable(inv.getArgument(0)));
- SchemaManager schManager = mock(SchemaManager.class);
+ when(mockTableManager.localPartitionSetAsync(anyLong(),
anyInt())).thenReturn(completedFuture(PartitionSet.EMPTY_SET));
- when(schManager.schemaRegistry(anyLong(),
anyInt())).thenReturn(completedFuture(null));
-
- vaultManager = new VaultManager(new InMemoryVaultService());
-
- metaStorageManager = StandaloneMetaStorageManager.create(vaultManager,
new SimpleInMemoryKeyValueStorage(NODE_NAME));
-
- clockWaiter = new ClockWaiter(NODE_NAME, clock);
-
- catalogManager = new CatalogManagerImpl(new
UpdateLogImpl(metaStorageManager), clockWaiter);
-
- indexManager = new IndexManager(
- schManager,
- tableManagerMock,
- catalogManager,
- metaStorageManager,
- (LongFunction<CompletableFuture<?>> function) ->
metaStorageManager.registerRevisionUpdateListener(function::apply)
- );
+ mockSchemaManager = mock(SchemaManager.class);
- List.of(vaultManager, metaStorageManager, clockWaiter, catalogManager,
indexManager).forEach(IgniteComponent::start);
+ when(mockSchemaManager.schemaRegistry(anyLong(),
anyInt())).thenReturn(completedFuture(null));
- assertThat(metaStorageManager.recoveryFinishedFuture(),
willCompleteSuccessfully());
- assertThat(metaStorageManager.notifyRevisionUpdateListenerOnStart(),
willCompleteSuccessfully());
- assertThat(metaStorageManager.deployWatches(),
willCompleteSuccessfully());
+ createAndStartComponents();
- createTable(catalogManager, TABLE_NAME, COLUMN_NAME);
+ createTable(TABLE_NAME);
}
@AfterEach
void tearDown() throws Exception {
- IgniteUtils.stopAll(vaultManager, metaStorageManager, clockWaiter,
catalogManager, indexManager);
+ IgniteUtils.stopAll(vaultManager, metaStorageManager, catalogManager,
indexManager);
}
@Test
@@ -154,7 +161,7 @@ public class IndexManagerTest extends
BaseIgniteAbstractTest {
@Test
void testGetMvTableStorageForDroppedTable() {
- dropTable(catalogManager, DEFAULT_SCHEMA_NAME, TABLE_NAME);
+ dropTable(TABLE_NAME);
assertThat(getMvTableStorageLatestRevision(Integer.MAX_VALUE),
willBe(nullValue()));
}
@@ -200,15 +207,134 @@ public class IndexManagerTest extends
BaseIgniteAbstractTest {
}
@Test
- void testDontUnregisterIndexOnCatalogEventIndexDrop() throws Exception {
- createIndex(catalogManager, TABLE_NAME, INDEX_NAME, COLUMN_NAME);
- dropIndex(catalogManager, INDEX_NAME);
+ void testDontUnregisterIndexOnCatalogEventIndexDrop() {
+ createIndex(TABLE_NAME, INDEX_NAME);
+ dropIndex(INDEX_NAME);
TableViewInternal tableViewInternal =
tableViewInternalByTableId.get(tableId());
verify(tableViewInternal, never()).unregisterIndex(anyInt());
}
+ @Test
+ void testCollectIndexesForRecoveryForCreatedTables() {
+ createTable(OTHER_TABLE_NAME);
+
+ Map<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>>
collectedIndexes = collectIndexesForRecovery();
+
+ int latestCatalogVersion = catalogManager.latestCatalogVersion();
+
+ CatalogTableDescriptor table = table(latestCatalogVersion, TABLE_NAME);
+ CatalogTableDescriptor otherTable = table(latestCatalogVersion,
OTHER_TABLE_NAME);
+
+ assertThat(collectedIndexes, hasKey(table));
+ assertThat(collectedIndexes, hasKey(otherTable));
+
+ assertThat(collectedIndexes.get(table),
hasItems(index(latestCatalogVersion, PK_INDEX_NAME)));
+ assertThat(collectedIndexes.get(otherTable),
hasItems(index(latestCatalogVersion, PK_INDEX_NAME_OTHER_TABLE)));
+ }
+
+ @Test
+ void testCollectIndexesForRecoveryForCreatedAndDroppedIndexes() {
+ createTable(OTHER_TABLE_NAME);
+
+ String indexName0 = INDEX_NAME + 0;
+ String indexName1 = INDEX_NAME + 1;
+ String indexName2 = INDEX_NAME + 2;
+ String indexName3 = INDEX_NAME + 3;
+
+ String indexNameOtherTable0 = INDEX_NAME + "-other" + 0;
+ String indexNameOtherTable1 = INDEX_NAME + "-other" + 1;
+ String indexNameOtherTable2 = INDEX_NAME + "-other" + 2;
+ String indexNameOtherTable3 = INDEX_NAME + "-other" + 3;
+
+ createIndexes(TABLE_NAME, indexName0, indexName1, indexName2,
indexName3);
+ createIndexes(OTHER_TABLE_NAME, indexNameOtherTable0,
indexNameOtherTable1, indexNameOtherTable2, indexNameOtherTable3);
+
+ makeIndexesAvailable(indexName0, indexName2, indexNameOtherTable1,
indexNameOtherTable3);
+
+ List<CatalogIndexDescriptor> droppedIndexes = dropIndexes(indexName1,
indexName2);
+ List<CatalogIndexDescriptor> droppedIndexesOtherTable =
dropIndexes(indexNameOtherTable0, indexNameOtherTable1);
+
+ Map<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>>
collectedIndexes = collectIndexesForRecovery();
+
+ int latestCatalogVersion = catalogManager.latestCatalogVersion();
+
+ CatalogTableDescriptor table = table(latestCatalogVersion, TABLE_NAME);
+ CatalogTableDescriptor otherTable = table(latestCatalogVersion,
OTHER_TABLE_NAME);
+
+ assertThat(collectedIndexes, hasKey(table));
+ assertThat(collectedIndexes, hasKey(otherTable));
+
+ assertThat(
+ collectedIndexes.get(table),
+ hasItems(
+ index(latestCatalogVersion, PK_INDEX_NAME),
+ index(latestCatalogVersion, indexName0),
+ index(latestCatalogVersion, indexName3),
+ droppedIndexes.get(0),
+ droppedIndexes.get(1)
+ )
+ );
+
+ assertThat(
+ collectedIndexes.get(otherTable),
+ hasItems(
+ index(latestCatalogVersion, PK_INDEX_NAME_OTHER_TABLE),
+ index(latestCatalogVersion, indexNameOtherTable2),
+ index(latestCatalogVersion, indexNameOtherTable3),
+ droppedIndexesOtherTable.get(0),
+ droppedIndexesOtherTable.get(1)
+ )
+ );
+ }
+
+ @Test
+ void testCollectIndexesForDroppedTable() {
+ createTable(OTHER_TABLE_NAME);
+ dropTable(OTHER_TABLE_NAME);
+
+ Map<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>>
collectedIndexes = collectIndexesForRecovery();
+
+ CatalogTableDescriptor table =
table(catalogManager.latestCatalogVersion(), TABLE_NAME);
+
+ assertThat(collectedIndexes, aMapWithSize(1));
+ assertThat(collectedIndexes, hasKey(table));
+ }
+
+ @Test
+ void testStartAllIndexesOnNodeRecovery() throws Exception {
+ String indexName0 = INDEX_NAME + 0;
+ String indexName1 = INDEX_NAME + 1;
+ String indexName2 = INDEX_NAME + 2;
+ String indexName3 = INDEX_NAME + 3;
+
+ createIndexes(TABLE_NAME, indexName0, indexName1, indexName2,
indexName3);
+
+ makeIndexesAvailable(indexName0, indexName2);
+
+ dropIndexes(indexName1, indexName2);
+
+ TableViewInternal tableViewInternal =
tableViewInternalByTableId.get(tableId());
+
+ clearInvocations(tableViewInternal);
+
+ IgniteUtils.stopAll(indexManager, catalogManager, metaStorageManager,
vaultManager);
+
+ createAndStartComponents();
+
+ ArgumentCaptor<StorageHashIndexDescriptor> captor =
ArgumentCaptor.forClass(StorageHashIndexDescriptor.class);
+
+ verify(tableViewInternal,
times(5)).registerHashIndex(captor.capture(), anyBoolean(), any(), any());
+
+ CatalogTableDescriptor table =
table(catalogManager.latestCatalogVersion(), TABLE_NAME);
+
+ assertThat(
+
captor.getAllValues().stream().map(StorageHashIndexDescriptor::id).sorted().collect(toList()),
+
equalTo(collectIndexesForRecovery().get(table).stream().map(CatalogObjectDescriptor::id).sorted().collect(toList()))
+ );
+ }
+
private TableViewInternal mockTable(int tableId) {
return tableViewInternalByTableId.computeIfAbsent(tableId,
this::newMockTable);
}
@@ -245,4 +371,84 @@ public class IndexManagerTest extends
BaseIgniteAbstractTest {
private int tableId() {
return getTableIdStrict(catalogManager, TABLE_NAME, clock.nowLong());
}
+
+ private void createAndStartComponents() {
+ vaultManager = new VaultManager(new InMemoryVaultService());
+
+ metaStorageManager = StandaloneMetaStorageManager.create(vaultManager,
new TestRocksDbKeyValueStorage(NODE_NAME, workDir));
+
+ catalogManager = CatalogTestUtils.createTestCatalogManager(NODE_NAME,
clock, metaStorageManager);
+
+ indexManager = new IndexManager(
+ mockSchemaManager,
+ mockTableManager,
+ catalogManager,
+ metaStorageManager,
+ (LongFunction<CompletableFuture<?>> function) ->
metaStorageManager.registerRevisionUpdateListener(function::apply)
+ );
+
+ List.of(vaultManager, metaStorageManager, catalogManager,
indexManager).forEach(IgniteComponent::start);
+
+ assertThat(metaStorageManager.recoveryFinishedFuture(),
willCompleteSuccessfully());
+ assertThat(metaStorageManager.notifyRevisionUpdateListenerOnStart(),
willCompleteSuccessfully());
+ assertThat(metaStorageManager.deployWatches(),
willCompleteSuccessfully());
+ }
+
+ private CatalogTableDescriptor table(int catalogVersion, String tableName)
{
+ return CatalogTestUtils.table(catalogManager, catalogVersion,
tableName);
+ }
+
+ private CatalogIndexDescriptor index(int catalogVersion, String indexName)
{
+ return CatalogTestUtils.index(catalogManager, catalogVersion,
indexName);
+ }
+
+ private void createTable(String tableName) {
+ TestIndexManagementUtils.createTable(catalogManager, tableName,
COLUMN_NAME);
+ }
+
+ private void dropTable(String tableName) {
+ TableTestUtils.dropTable(catalogManager, DEFAULT_SCHEMA_NAME,
tableName);
+ }
+
+ private void createIndex(String tableName, String indexName) {
+ createHashIndex(catalogManager, DEFAULT_SCHEMA_NAME, tableName,
indexName, List.of(COLUMN_NAME), false);
+ }
+
+ private void makeIndexAvailable(String indexName) {
+ CatalogIndexDescriptor index =
index(catalogManager.latestCatalogVersion(), indexName);
+
+ TestIndexManagementUtils.makeIndexAvailable(catalogManager,
index.id());
+ }
+
+ private void dropIndex(String indexName) {
+ TableTestUtils.dropIndex(catalogManager, DEFAULT_SCHEMA_NAME,
indexName);
+ }
+
+ private void createIndexes(String tableName, String... indexNames) {
+ for (String indexName : indexNames) {
+ createIndex(tableName, indexName);
+ }
+ }
+
+ private void makeIndexesAvailable(String... indexNames) {
+ for (String indexName : indexNames) {
+ makeIndexAvailable(indexName);
+ }
+ }
+
+ private List<CatalogIndexDescriptor> dropIndexes(String... indexNames) {
+ var res = new ArrayList<CatalogIndexDescriptor>(indexNames.length);
+
+ for (String indexName : indexNames) {
+ res.add(index(catalogManager.latestCatalogVersion(), indexName));
+
+ dropIndex(indexName);
+ }
+
+ return res;
+ }
+
+ private Map<CatalogTableDescriptor, Collection<CatalogIndexDescriptor>>
collectIndexesForRecovery() {
+ return IndexManager.collectIndexesForRecovery(catalogManager);
+ }
}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
index 4581daeaa8..619935d81a 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
@@ -91,8 +91,8 @@ class TestIndexManagementUtils {
return TableTestUtils.getIndexIdStrict(catalogService, indexName,
clock.nowLong());
}
- static CatalogIndexDescriptor indexDescriptor(CatalogService
catalogService, String indexId, HybridClock clock) {
- return TableTestUtils.getIndexStrict(catalogService, indexId,
clock.nowLong());
+ static CatalogIndexDescriptor indexDescriptor(CatalogService
catalogService, String indexName, HybridClock clock) {
+ return TableTestUtils.getIndexStrict(catalogService, indexName,
clock.nowLong());
}
static int tableId(CatalogService catalogService, String tableName,
HybridClock clock) {
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
index 854f1cffba..1a8413b4cb 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
+import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -232,6 +234,15 @@ public abstract class ClusterPerClassIntegrationTest
extends IgniteIntegrationTe
sql(format("CREATE INDEX {} ON {} ({})", indexName, tableName,
columnName));
}
+ /**
+ * Drops an index for the table created by {@link
#createZoneAndTable(String, String, int, int)}.
+ *
+ * @param indexName Index name.
+ */
+ protected static void dropIndex(String indexName) {
+ sql(format("DROP INDEX {}", indexName));
+ }
+
/**
* Sets whether to wait for indexes to become available.
*
@@ -370,4 +381,33 @@ public abstract class ClusterPerClassIntegrationTest
extends IgniteIntegrationTe
throw new IllegalStateException(e);
}
}
+
+ /**
+ * Returns {@code true} if the index exists and is available in the latest
catalog version.
+ *
+ * @param ignite Node.
+ * @param indexName Index name that is being checked.
+ */
+ protected static boolean isIndexAvailable(IgniteImpl ignite, String
indexName) {
+ CatalogManager catalogManager = ignite.catalogManager();
+ HybridClock clock = ignite.clock();
+
+ CatalogIndexDescriptor indexDescriptor =
catalogManager.index(indexName, clock.nowLong());
+
+ return indexDescriptor != null && indexDescriptor.available();
+ }
+
+ /**
+ * Awaits for all requested indexes to become available in the latest
catalog version.
+ *
+ * @param ignite Node.
+ * @param indexNames Names of indexes that are of interest.
+ */
+ protected static void awaitIndexesBecomeAvailable(IgniteImpl ignite,
String... indexNames) throws Exception {
+ assertTrue(waitForCondition(
+ () -> Arrays.stream(indexNames).allMatch(indexName ->
isIndexAvailable(ignite, indexName)),
+ 10,
+ 30_000L
+ ));
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index d18daacb87..bfb9630e71 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -77,8 +77,9 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import
org.apache.ignite.internal.catalog.descriptors.CatalogDataStorageDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
@@ -557,7 +558,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
private CompletableFuture<?> onTableCreate(CreateTableEventParameters
parameters) {
- return createTableLocally(parameters.causalityToken(),
parameters.catalogVersion(), parameters.tableDescriptor());
+ return createTableLocally(parameters.causalityToken(),
parameters.catalogVersion(), parameters.tableDescriptor(), false);
}
private CompletableFuture<Boolean> writeTableAssignmentsToMetastore(
@@ -1044,9 +1045,16 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @param causalityToken Causality token.
* @param catalogVersion Catalog version on which the table was created.
* @param tableDescriptor Catalog table descriptor.
+ * @param onNodeRecovery {@code true} when called during node recovery,
{@code false} otherwise.
* @return Future that will be completed when local changes related to the
table creation are applied.
*/
- private CompletableFuture<?> createTableLocally(long causalityToken, int
catalogVersion, CatalogTableDescriptor tableDescriptor) {
+ private CompletableFuture<?> createTableLocally(
+ long causalityToken,
+ int catalogVersion,
+ CatalogTableDescriptor tableDescriptor,
+ // TODO: IGNITE-18595 We need to do something different to wait
for indexes before full rebalancing
+ boolean onNodeRecovery
+ ) {
return inBusyLockAsync(busyLock, () -> {
int tableId = tableDescriptor.id();
int zoneId = tableDescriptor.zoneId();
@@ -1074,7 +1082,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
tableDescriptor,
zoneDescriptor,
assignmentsFuture,
- catalogVersion
+ catalogVersion,
+ onNodeRecovery
).thenCompose(ignored -> writeTableAssignmentsToMetastore(tableId,
assignmentsFuture));
});
}
@@ -1087,6 +1096,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @param zoneDescriptor Catalog distributed zone descriptor.
* @param assignmentsFuture Future with assignments.
* @param catalogVersion Catalog version on which the table was created.
+ * @param onNodeRecovery {@code true} when called during node recovery,
{@code false} otherwise.
* @return Future that will be completed when local changes related to the
table creation are applied.
*/
private CompletableFuture<Void> createTableLocally(
@@ -1094,7 +1104,9 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
CatalogTableDescriptor tableDescriptor,
CatalogZoneDescriptor zoneDescriptor,
CompletableFuture<List<Set<Assignment>>> assignmentsFuture,
- int catalogVersion
+ int catalogVersion,
+ // TODO: IGNITE-18595 We need to do something different to wait
for indexes before full rebalancing
+ boolean onNodeRecovery
) {
String tableName = tableDescriptor.name();
int tableId = tableDescriptor.id();
@@ -1115,8 +1127,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
var table = new TableImpl(internalTable, lockMgr, schemaVersions);
- // TODO: IGNITE-19082 Need another way to wait for indexes
- table.addIndexesToWait(collectTableIndexIds(tableId, catalogVersion));
+ // TODO: IGNITE-18595 We need to do something different to wait for
indexes before full rebalancing
+ table.addIndexesToWait(collectTableIndexIds(tableId, catalogVersion,
onNodeRecovery));
tablesByIdVv.update(causalityToken, (previous, e) ->
inBusyLock(busyLock, () -> {
if (e != null) {
@@ -2085,9 +2097,12 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
);
}
- private int[] collectTableIndexIds(int tableId, int catalogVersion) {
- return catalogService.indexes(catalogVersion, tableId).stream()
- .mapToInt(CatalogIndexDescriptor::id)
+ private int[] collectTableIndexIds(int tableId, int catalogVersion,
boolean onNodeRecovery) {
+ // If the method is called on CatalogEvent#TABLE_CREATE, then we only
need the catalogVersion in which this table created.
+ int catalogVersionFrom = onNodeRecovery ?
catalogService.earliestCatalogVersion() : catalogVersion;
+
+ return CatalogUtils.collectIndexes(catalogService, tableId,
catalogVersionFrom, catalogVersion).stream()
+ .mapToInt(CatalogObjectDescriptor::id)
.toArray();
}
@@ -2175,7 +2190,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
// TODO: IGNITE-20384 Clean up abandoned resources for dropped zones
from volt and metastore
for (CatalogTableDescriptor tableDescriptor :
catalogService.tables(catalogVersion)) {
- startTableFutures.add(createTableLocally(recoveryRevision,
catalogVersion, tableDescriptor));
+ startTableFutures.add(createTableLocally(recoveryRevision,
catalogVersion, tableDescriptor, true));
}
// Forces you to wait until recovery is complete before the metastore
watches is deployed to avoid races with catalog listeners.