This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 85f4fe5564 IGNITE-19276 Implement a mechanism to build indices
distributively (#2676)
85f4fe5564 is described below
commit 85f4fe5564643723adffbfa5e19fb0e84d1c4a22
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Oct 16 12:01:22 2023 +0300
IGNITE-19276 Implement a mechanism to build indices distributively (#2676)
---
modules/index/build.gradle | 1 +
.../index/IndexAvailabilityController.java | 361 ++++++++++++++++++
.../index/IndexAvailabilityControllerTest.java | 403 +++++++++++++++++++++
.../ignite/internal/table/TableTestUtils.java | 31 +-
4 files changed, 792 insertions(+), 4 deletions(-)
diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 264ef39637..367f7661c4 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -41,6 +41,7 @@ dependencies {
testImplementation(testFixtures(project(':ignite-table')))
testImplementation(testFixtures(project(':ignite-catalog')))
testImplementation project(':ignite-placement-driver')
+ testImplementation project(':ignite-replicator')
testImplementation libs.mockito.core
testImplementation libs.mockito.junit
testImplementation libs.hamcrest.core
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
new file mode 100644
index 0000000000..633d0cf133
--- /dev/null
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.function.Predicate.not;
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.apache.ignite.internal.util.CollectionUtils.concat;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import
org.apache.ignite.internal.catalog.IndexAlreadyAvailableValidationException;
+import org.apache.ignite.internal.catalog.IndexNotFoundValidationException;
+import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import
org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import
org.apache.ignite.internal.table.distributed.index.IndexBuildCompletionListener;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/**
+ * This component is responsible for ensuring that an index, upon completion
of a distributed index building for all partitions, becomes
+ * available for read-write.
+ *
+ * <p>An approximate algorithm for making an index available for
read-write:</p>
+ * <ul>
+ * <li>On {@link CatalogEvent#INDEX_CREATE}, keys are created in the
metastore: {@code indexBuild.inProgress.<indexId>} and
+ * {@code indexBuild.partition.<indexId>.<partitionId_0>}...{@code
indexBuild.partition.<indexId>.<partitionId_N>}.</li>
+ * <li>Then it is expected that the distributed index building event will
be triggered for all partitions via
+ * {@link IndexBuildCompletionListener} (from {@link
IndexBuilder#listen}); as a result of each of these events, the corresponding
key
+ * {@code indexBuild.partition.<indexId>.<partitionId>} will be deleted
from metastore.</li>
+ * <li>When all the {@code indexBuild.partition.<indexId>.<partitionId>}
keys in the metastore are deleted,
+ * {@link MakeIndexAvailableCommand} will be executed for the
corresponding index.</li>
+ * <li>At {@link CatalogEvent#INDEX_AVAILABLE}, key {@code
indexBuild.inProgress.<indexId>} in the metastore will be deleted.</li>
+ * </ul>
+ *
+ * <p>Notes:</p>
+ * <ul>
+ * <li>At {@link CatalogEvent#INDEX_DROP}, the keys in the metastore are
deleted: {@code indexBuild.inProgress.<indexId>} and
+ * {@code indexBuild.partition.<indexId>.<partitionId_0>}...{@code
indexBuild.partition.<indexId>.<partitionId_N>}.</li>
+ * <li>Handling of {@link CatalogEvent#INDEX_CREATE}, {@link
CatalogEvent#INDEX_DROP}, {@link CatalogEvent#INDEX_AVAILABLE} and watch
+ * prefix {@link #PARTITION_BUILD_INDEX_KEY_PREFIX} is made by the whole
cluster (and only one node makes a write to the metastore) as
+ * these events are global, but only one node (a primary replica owning a
partition) handles
+ * {@link IndexBuildCompletionListener#onBuildCompletion} (form {@link
IndexBuilder#listen}) event.</li>
+ * </ul>
+ *
+ * @see CatalogIndexDescriptor#writeOnly()
+ */
+// TODO: IGNITE-20637 Recovery needs to be implemented
+// TODO: IGNITE-20637 Need integration with the IgniteImpl
+public class IndexAvailabilityController implements ManuallyCloseable {
+ private static final IgniteLogger LOG =
Loggers.forClass(IndexAvailabilityController.class);
+
+ private static final String IN_PROGRESS_BUILD_INDEX_KEY_PREFIX =
"indexBuild.inProgress.";
+
+ private static final String PARTITION_BUILD_INDEX_KEY_PREFIX =
"indexBuild.partition.";
+
+ private final CatalogManager catalogManager;
+
+ private final MetaStorageManager metaStorageManager;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+ /** Constructor. */
+ public IndexAvailabilityController(CatalogManager catalogManager,
MetaStorageManager metaStorageManager, IndexBuilder indexBuilder) {
+ this.catalogManager = catalogManager;
+ this.metaStorageManager = metaStorageManager;
+
+ addListeners(catalogManager, metaStorageManager, indexBuilder);
+ }
+
+ @Override
+ public void close() {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+ }
+
+ private void addListeners(CatalogService catalogService,
MetaStorageManager metaStorageManager, IndexBuilder indexBuilder) {
+ catalogService.listen(CatalogEvent.INDEX_CREATE, (parameters,
exception) -> {
+ if (exception != null) {
+ return failedFuture(exception);
+ }
+
+ return onIndexCreate((CreateIndexEventParameters)
parameters).thenApply(unused -> false);
+ });
+
+ catalogService.listen(CatalogEvent.INDEX_DROP, (parameters, exception)
-> {
+ if (exception != null) {
+ return failedFuture(exception);
+ }
+
+ return onIndexDrop((DropIndexEventParameters)
parameters).thenApply(unused -> false);
+ });
+
+ catalogService.listen(CatalogEvent.INDEX_AVAILABLE, (parameters,
exception) -> {
+ if (exception != null) {
+ return failedFuture(exception);
+ }
+
+ return onIndexAvailable((MakeIndexAvailableEventParameters)
parameters).thenApply(unused -> false);
+ });
+
+
metaStorageManager.registerPrefixWatch(ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX),
new WatchListener() {
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent event) {
+ return onUpdatePartitionBuildIndexKey(event).thenApply(unused
-> null);
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ LOG.error("Error on handle partition build index key", e);
+ }
+ });
+
+ indexBuilder.listen((indexId, tableId, partitionId) ->
onIndexBuildCompletionForPartition(indexId, partitionId));
+ }
+
+ private CompletableFuture<?> onIndexCreate(CreateIndexEventParameters
parameters) {
+ return inBusyLockAsync(busyLock, () -> {
+ int indexId = parameters.indexDescriptor().id();
+
+ int partitions = getPartitionCountFromCatalog(indexId,
parameters.catalogVersion());
+
+ ByteArray inProgressBuildIndexKey =
inProgressBuildIndexKey(indexId);
+
+ return metaStorageManager.invoke(
+ notExists(inProgressBuildIndexKey),
+ concat(
+ List.of(put(inProgressBuildIndexKey,
BYTE_EMPTY_ARRAY)),
+ putPartitionBuildIndexOperations(indexId,
partitions)
+ ),
+ List.of(noop())
+ );
+ });
+ }
+
+ private CompletableFuture<?> onIndexDrop(DropIndexEventParameters
parameters) {
+ return inBusyLockAsync(busyLock, () -> {
+ int indexId = parameters.indexId();
+
+ int partitions = getPartitionCountFromCatalog(indexId,
parameters.catalogVersion() - 1);
+
+ ByteArray inProgressBuildIndexKey =
inProgressBuildIndexKey(indexId);
+
+ return metaStorageManager.invoke(
+ exists(inProgressBuildIndexKey),
+ concat(
+ List.of(remove(inProgressBuildIndexKey)),
+ removePartitionBuildIndexOperations(indexId,
partitions)
+ ),
+ List.of(noop())
+ );
+ });
+ }
+
+ private CompletableFuture<?>
onIndexAvailable(MakeIndexAvailableEventParameters parameters) {
+ return inBusyLockAsync(busyLock, () -> {
+ ByteArray inProgressBuildIndexKey =
inProgressBuildIndexKey(parameters.indexId());
+
+ return metaStorageManager.invoke(exists(inProgressBuildIndexKey),
remove(inProgressBuildIndexKey), noop());
+ });
+ }
+
+ private CompletableFuture<?> onUpdatePartitionBuildIndexKey(WatchEvent
event) {
+ return inBusyLockAsync(busyLock, () -> {
+ if (!event.single()) {
+ // We don't need to handle keys on index creation or deletion.
+ return completedFuture(null);
+ }
+
+ Entry entry = event.entryEvent().newEntry();
+
+ if (!entry.tombstone()) {
+ // In case an index was created when there was only one
partition.
+ return completedFuture(null);
+ }
+
+ String partitionBuildIndexKey = new String(entry.key(), UTF_8);
+
+ int indexId =
parseIndexIdFromPartitionBuildIndexKey(partitionBuildIndexKey);
+
+ ByteArray inProgressBuildIndexKey =
inProgressBuildIndexKey(indexId);
+
+ long metastoreRevision = entry.revision();
+
+ if (isRemainingPartitionBuildIndexKeys(indexId, metastoreRevision)
+ || isMetastoreKeyAbsent(inProgressBuildIndexKey,
metastoreRevision)) {
+ return completedFuture(null);
+ }
+
+ // We can use the latest version of the catalog since we are on
the metastore thread.
+ CatalogIndexDescriptor indexDescriptor =
getIndexDescriptorStrict(indexId, catalogManager.latestCatalogVersion());
+
+ // We will not wait for the command to be executed, since we will
then find ourselves in a dead lock since we will not be able
+ // to free the metastore thread.
+ catalogManager
+ .execute(buildMakeIndexAvailableCommand(indexDescriptor))
+ .whenComplete((unused, throwable) -> {
+ if (throwable != null) {
+ Throwable unwrapCause = unwrapCause(throwable);
+
+ if (!(unwrapCause instanceof
IndexNotFoundValidationException)
+ && !(unwrapCause instanceof
IndexAlreadyAvailableValidationException)
+ && !(unwrapCause instanceof
NodeStoppingException)) {
+ LOG.error("Error processing the command to
make the index available: {}", unwrapCause, indexId);
+ }
+ }
+ });
+
+ return completedFuture(null);
+ });
+ }
+
+ private void onIndexBuildCompletionForPartition(int indexId, int
partitionId) {
+ inBusyLock(busyLock, () -> {
+ ByteArray partitionBuildIndexKey = partitionBuildIndexKey(indexId,
partitionId);
+
+ // Intentionally not waiting for the operation to complete or
returning the future because it is not necessary.
+ metaStorageManager
+ .invoke(exists(partitionBuildIndexKey),
remove(partitionBuildIndexKey), noop())
+ .whenComplete((operationResult, throwable) -> {
+ if (throwable != null && !(unwrapCause(throwable)
instanceof NodeStoppingException)) {
+ LOG.error(
+ "Error processing the operation to delete
the partition index building key: "
+ + "[indexId={}, partitionId={}]",
+ throwable,
+ indexId, partitionId
+ );
+ }
+ });
+ });
+ }
+
+ private int getPartitionCountFromCatalog(int indexId, int catalogVersion) {
+ CatalogIndexDescriptor indexDescriptor =
getIndexDescriptorStrict(indexId, catalogVersion);
+
+ CatalogTableDescriptor tableDescriptor =
catalogManager.table(indexDescriptor.tableId(), catalogVersion);
+
+ assert tableDescriptor != null : "tableId=" +
indexDescriptor.tableId() + ", catalogVersion=" + catalogVersion;
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogManager.zone(tableDescriptor.zoneId(), catalogVersion);
+
+ assert zoneDescriptor != null : "zoneId=" + tableDescriptor.zoneId() +
", catalogVersion=" + catalogVersion;
+
+ return zoneDescriptor.partitions();
+ }
+
+ private CatalogIndexDescriptor getIndexDescriptorStrict(int indexId, int
catalogVersion) {
+ CatalogIndexDescriptor indexDescriptor = catalogManager.index(indexId,
catalogVersion);
+
+ assert indexDescriptor != null : "indexId=" + indexId + ",
catalogVersion=" + catalogVersion;
+
+ return indexDescriptor;
+ }
+
+ private boolean isRemainingPartitionBuildIndexKeys(int indexId, long
metastoreRevision) {
+ try (Cursor<Entry> cursor =
metaStorageManager.prefixLocally(partitionBuildIndexKeyPrefix(indexId),
metastoreRevision)) {
+ return cursor.stream().anyMatch(not(Entry::tombstone));
+ }
+ }
+
+ private boolean isMetastoreKeyAbsent(ByteArray key, long
metastoreRevision) {
+ return metaStorageManager.getLocally(key, metastoreRevision).value()
== null;
+ }
+
+ private static ByteArray inProgressBuildIndexKey(int indexId) {
+ return ByteArray.fromString(IN_PROGRESS_BUILD_INDEX_KEY_PREFIX +
indexId);
+ }
+
+ private static ByteArray partitionBuildIndexKeyPrefix(int indexId) {
+ return ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX +
indexId);
+ }
+
+ private static ByteArray partitionBuildIndexKey(int indexId, int
partitionId) {
+ return ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX + indexId
+ '.' + partitionId);
+ }
+
+ private static Collection<Operation> putPartitionBuildIndexOperations(int
indexId, int partitions) {
+ return IntStream.range(0, partitions)
+ .mapToObj(partitionId -> partitionBuildIndexKey(indexId,
partitionId))
+ .map(key -> put(key, BYTE_EMPTY_ARRAY))
+ .collect(toList());
+ }
+
+ private static Collection<Operation>
removePartitionBuildIndexOperations(int indexId, int partitions) {
+ return IntStream.range(0, partitions)
+ .mapToObj(partitionId -> partitionBuildIndexKey(indexId,
partitionId))
+ .map(Operations::remove)
+ .collect(toList());
+ }
+
+ private static int parseIndexIdFromPartitionBuildIndexKey(String key) {
+ assert key.startsWith(PARTITION_BUILD_INDEX_KEY_PREFIX) : key;
+
+ int indexIdFromIndex = PARTITION_BUILD_INDEX_KEY_PREFIX.length();
+
+ int indexIdToIndex = key.indexOf('.', indexIdFromIndex);
+
+ return Integer.parseInt(key.substring(indexIdFromIndex,
indexIdToIndex));
+ }
+
+ private static CatalogCommand
buildMakeIndexAvailableCommand(CatalogIndexDescriptor indexDescriptor) {
+ // TODO: IGNITE-20636 Use only indexId
+ return
MakeIndexAvailableCommand.builder().schemaName(DEFAULT_SCHEMA_NAME).indexName(indexDescriptor.name()).build();
+ }
+}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
new file mode 100644
index 0000000000..805692aa7e
--- /dev/null
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogTestUtils;
+import org.apache.ignite.internal.catalog.commands.AlterZoneParams;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.network.ClusterNode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** For {@link IndexAvailabilityController} testing. */
+public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
+ private static final String NODE_NAME = "test-node";
+
+ private static final String TABLE_NAME = "test-table";
+
+ private static final String COLUMN_NAME = "test-column";
+
+ private static final String INDEX_NAME = "test-index";
+
+ private final HybridClock clock = new HybridClockImpl();
+
+ private int partitions;
+
+ private final VaultManager vaultManager = new VaultManager(new
InMemoryVaultService());
+
+ private final MetaStorageManagerImpl metaStorageManager =
StandaloneMetaStorageManager.create(vaultManager);
+
+ private final CatalogManager catalogManager =
CatalogTestUtils.createTestCatalogManager(NODE_NAME, clock, metaStorageManager);
+
+ private final IndexBuilder indexBuilder = new IndexBuilder(
+ NODE_NAME,
+ 1,
+ mock(ReplicaService.class, invocation -> completedFuture(null))
+ );
+
+ private final IndexAvailabilityController indexAvailabilityController =
new IndexAvailabilityController(
+ catalogManager,
+ metaStorageManager,
+ indexBuilder
+ );
+
+ @BeforeEach
+ void setUp() {
+ Stream.of(vaultManager, metaStorageManager,
catalogManager).forEach(IgniteComponent::start);
+
+ assertThat(metaStorageManager.deployWatches(),
willCompleteSuccessfully());
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogManager.zone(DEFAULT_ZONE_NAME, clock.nowLong());
+
+ assertNotNull(zoneDescriptor);
+
+ partitions = zoneDescriptor.partitions();
+
+ assertThat(partitions, greaterThan(4));
+
+ TableTestUtils.createTable(
+ catalogManager,
+ DEFAULT_SCHEMA_NAME,
+ DEFAULT_ZONE_NAME,
+ TABLE_NAME,
+
List.of(ColumnParams.builder().name(COLUMN_NAME).type(INT32).build()),
+ List.of(COLUMN_NAME)
+ );
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(
+ indexAvailabilityController::close,
+ indexBuilder::close,
+ catalogManager::stop,
+ metaStorageManager::stop,
+ vaultManager::stop
+ );
+ }
+
+ @Test
+ void testMetastoreKeysAfterIndexCreate() throws Exception {
+ createIndex(INDEX_NAME);
+
+ int indexId = indexId(INDEX_NAME);
+
+ awaitTillGlobalMetastoreRevisionIsApplied();
+
+ assertInProgressBuildIndexKeyExists(indexId);
+
+ for (int partitionId = 0; partitionId < partitions; partitionId++) {
+ assertPartitionBuildIndexKeyExists(indexId, partitionId);
+ }
+ }
+
+ @Test
+ void testMetastoreKeysAfterIndexCreateForOnlyOnePartition() throws
Exception {
+ changePartitionCountInCatalog(1);
+
+ testMetastoreKeysAfterIndexCreate();
+ }
+
+ @Test
+ void testMetastoreKeysAfterFinishBuildIndexForOnePartition() throws
Exception {
+ createIndex(INDEX_NAME);
+
+ int indexId = indexId(INDEX_NAME);
+
+ finishBuildingIndexForPartition(indexId, 0);
+
+ awaitTillGlobalMetastoreRevisionIsApplied();
+
+ assertInProgressBuildIndexKeyExists(indexId);
+
+ assertPartitionBuildIndexKeyAbsent(indexId, 0);
+
+ for (int partitionId = 1; partitionId < partitions; partitionId++) {
+ assertPartitionBuildIndexKeyExists(indexId, partitionId);
+ }
+
+ assertTrue(indexDescriptor(INDEX_NAME).writeOnly());
+ }
+
+ @Test
+ void testMetastoreKeysAfterFinishBuildIndexForAllPartitions() throws
Exception {
+ createIndex(INDEX_NAME);
+
+ int indexId = indexId(INDEX_NAME);
+
+ for (int partitionId = 0; partitionId < partitions; partitionId++) {
+ assertThat(
+
metaStorageManager.remove(ByteArray.fromString(partitionBuildIndexKey(indexId,
partitionId))),
+ willCompleteSuccessfully()
+ );
+ }
+
+ awaitTillGlobalMetastoreRevisionIsApplied();
+
+ assertInProgressBuildIndexKeyAbsent(indexId);
+
+ for (int partitionId = 0; partitionId < partitions; partitionId++) {
+ assertPartitionBuildIndexKeyAbsent(indexId, partitionId);
+ }
+
+ assertFalse(indexDescriptor(INDEX_NAME).writeOnly());
+ }
+
+ @Test
+ void testMetastoreKeysAfterDropIndex() throws Exception {
+ createIndex(INDEX_NAME);
+
+ int indexId = indexId(INDEX_NAME);
+
+ dropIndex(INDEX_NAME);
+
+ awaitTillGlobalMetastoreRevisionIsApplied();
+
+ assertInProgressBuildIndexKeyAbsent(indexId);
+
+ for (int partitionId = 0; partitionId < partitions; partitionId++) {
+ assertPartitionBuildIndexKeyAbsent(indexId, partitionId);
+ }
+ }
+
+ @Test
+ void testMetastoreKeysAfterDropIndexForOnlyOnePartition() throws Exception
{
+ changePartitionCountInCatalog(1);
+
+ testMetastoreKeysAfterDropIndex();
+ }
+
+ @Test
+ void testMetastoreKeysAfterFinishBuildingOnePartitionAndDropIndex() throws
Exception {
+ createIndex(INDEX_NAME);
+
+ int indexId = indexId(INDEX_NAME);
+
+ finishBuildingIndexForPartition(indexId, 0);
+
+ dropIndex(INDEX_NAME);
+
+ awaitTillGlobalMetastoreRevisionIsApplied();
+
+ assertInProgressBuildIndexKeyAbsent(indexId);
+
+ for (int partitionId = 0; partitionId < partitions; partitionId++) {
+ assertPartitionBuildIndexKeyAbsent(indexId, partitionId);
+ }
+ }
+
+ @Test
+ void testMetastoreKeysAfterDropIndexWithLastRemainingPartition() throws
Exception {
+ createIndex(INDEX_NAME);
+
+ int indexId = indexId(INDEX_NAME);
+
+ for (int partitionId = 1; partitionId < partitions; partitionId++) {
+ finishBuildingIndexForPartition(indexId, partitionId);
+ }
+
+ dropIndex(INDEX_NAME);
+
+ awaitTillGlobalMetastoreRevisionIsApplied();
+
+ assertInProgressBuildIndexKeyAbsent(indexId);
+
+ for (int partitionId = 0; partitionId < partitions; partitionId++) {
+ assertPartitionBuildIndexKeyAbsent(indexId, partitionId);
+ }
+ }
+
+ @Test
+ void testMetastoreKeysAfterDropIndexWithTwoRemainingPartitions() throws
Exception {
+ createIndex(INDEX_NAME);
+
+ int indexId = indexId(INDEX_NAME);
+
+ for (int partitionId = 2; partitionId < partitions; partitionId++) {
+ finishBuildingIndexForPartition(indexId, partitionId);
+ }
+
+ dropIndex(INDEX_NAME);
+
+ awaitTillGlobalMetastoreRevisionIsApplied();
+
+ assertInProgressBuildIndexKeyAbsent(indexId);
+
+ for (int partitionId = 0; partitionId < partitions; partitionId++) {
+ assertPartitionBuildIndexKeyAbsent(indexId, partitionId);
+ }
+ }
+
+ private void awaitTillGlobalMetastoreRevisionIsApplied() throws Exception {
+ assertTrue(
+ waitForCondition(() -> {
+ CompletableFuture<Long> currentRevisionFuture =
metaStorageManager.getService().currentRevision();
+
+ assertThat(currentRevisionFuture,
willCompleteSuccessfully());
+
+ return currentRevisionFuture.join() ==
metaStorageManager.appliedRevision();
+ }, 1_000)
+ );
+ }
+
+ private void createIndex(String indexName) {
+ TableTestUtils.createHashIndex(catalogManager, DEFAULT_SCHEMA_NAME,
TABLE_NAME, indexName, List.of(COLUMN_NAME), false);
+ }
+
+ private void dropIndex(String indexName) {
+ TableTestUtils.dropIndex(catalogManager, DEFAULT_SCHEMA_NAME,
indexName);
+ }
+
+ private int indexId(String indexName) {
+ return TableTestUtils.getIndexIdStrict(catalogManager, indexName,
clock.nowLong());
+ }
+
+ private int tableId(String tableName) {
+ return TableTestUtils.getTableIdStrict(catalogManager, tableName,
clock.nowLong());
+ }
+
+ private CatalogIndexDescriptor indexDescriptor(String indexName) {
+ return TableTestUtils.getIndexStrict(catalogManager, indexName,
clock.nowLong());
+ }
+
+ private void changePartitionCountInCatalog(int newPartitions) {
+ assertThat(
+
catalogManager.alterZone(AlterZoneParams.builder().zoneName(DEFAULT_ZONE_NAME).partitions(newPartitions).build()),
+ willCompleteSuccessfully()
+ );
+
+ partitions = newPartitions;
+ }
+
+ private void finishBuildingIndexForPartition(int indexId, int partitionId)
{
+ // It may look complicated, but the other method through mocking
IndexBuilder seems messier.
+ IndexStorage indexStorage = mock(IndexStorage.class);
+
+ when(indexStorage.getNextRowIdToBuild())
+ .thenReturn(new RowId(partitionId))
+ .thenReturn(null);
+
+ indexBuilder.scheduleBuildIndex(
+ tableId(TABLE_NAME),
+ partitionId,
+ indexId,
+ indexStorage,
+ mock(MvPartitionStorage.class),
+ mock(ClusterNode.class)
+ );
+
+ CompletableFuture<Void> finishBuildIndexFuture = new
CompletableFuture<>();
+
+ indexBuilder.listen((indexId1, tableId, partitionId1) -> {
+ if (indexId1 == indexId && partitionId1 == partitionId) {
+ finishBuildIndexFuture.complete(null);
+ }
+ });
+
+ assertThat(finishBuildIndexFuture, willCompleteSuccessfully());
+ }
+
+ private void assertInProgressBuildIndexKeyExists(int indexId) {
+ String startBuildIndexKey = inProgressBuildIndexKey(indexId);
+
+ assertThat(
+ startBuildIndexKey,
+
metaStorageManager.get(ByteArray.fromString(startBuildIndexKey)).thenApply(Entry::value),
+ willBe(BYTE_EMPTY_ARRAY)
+ );
+ }
+
+ private void assertInProgressBuildIndexKeyAbsent(int indexId) {
+ String startBuildIndexKey = inProgressBuildIndexKey(indexId);
+
+ assertThat(
+ startBuildIndexKey,
+
metaStorageManager.get(ByteArray.fromString(startBuildIndexKey)).thenApply(Entry::value),
+ willBe(nullValue())
+ );
+ }
+
+ private void assertPartitionBuildIndexKeyExists(int indexId, int
partitionId) {
+ String partitionBuildIndexKey = partitionBuildIndexKey(indexId,
partitionId);
+
+ assertThat(
+ partitionBuildIndexKey,
+
metaStorageManager.get(ByteArray.fromString(partitionBuildIndexKey)).thenApply(Entry::value),
+ willBe(BYTE_EMPTY_ARRAY)
+ );
+ }
+
+ private void assertPartitionBuildIndexKeyAbsent(int indexId, int
partitionId) {
+ String partitionBuildIndexKey = partitionBuildIndexKey(indexId,
partitionId);
+
+ assertThat(
+ partitionBuildIndexKey,
+
metaStorageManager.get(ByteArray.fromString(partitionBuildIndexKey)).thenApply(Entry::value),
+ willBe(nullValue())
+ );
+ }
+
+ private static String inProgressBuildIndexKey(int indexId) {
+ return "indexBuild.inProgress." + indexId;
+ }
+
+ private static String partitionBuildIndexKey(int indexId, int partitionId)
{
+ return "indexBuild.partition." + indexId + "." + partitionId;
+ }
+}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index dcc78df7cd..a358fd10d7 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -184,7 +184,7 @@ public class TableTestUtils {
* @param timestamp Timestamp.
*/
public static @Nullable Integer getIndexId(CatalogService catalogService,
String indexName, long timestamp) {
- CatalogIndexDescriptor index = catalogService.index(indexName,
timestamp);
+ CatalogIndexDescriptor index = getIndex(catalogService, indexName,
timestamp);
return index == null ? null : index.id();
}
@@ -198,10 +198,33 @@ public class TableTestUtils {
* @throws AssertionError If table is absent.
*/
public static int getIndexIdStrict(CatalogService catalogService, String
indexName, long timestamp) {
- Integer indexId = getIndexId(catalogService, indexName, timestamp);
+ return getIndexStrict(catalogService, indexName, timestamp).id();
+ }
+
+ /**
+ * Returns index descriptor from the catalog, {@code null} if the table is
absent.
+ *
+ * @param catalogService Catalog service.
+ * @param indexName Index name.
+ * @param timestamp Timestamp.
+ */
+ public static @Nullable CatalogIndexDescriptor getIndex(CatalogService
catalogService, String indexName, long timestamp) {
+ return catalogService.index(indexName, timestamp);
+ }
+
+ /**
+ * Returns index descriptor from the catalog.
+ *
+ * @param catalogService Catalog service.
+ * @param indexName Index name.
+ * @param timestamp Timestamp.
+ * @throws AssertionError If table descriptor is absent.
+ */
+ public static CatalogIndexDescriptor getIndexStrict(CatalogService
catalogService, String indexName, long timestamp) {
+ CatalogIndexDescriptor index = catalogService.index(indexName,
timestamp);
- assertNotNull(indexId, "indexName=" + indexName + ", timestamp=" +
timestamp);
+ assertNotNull(index, "indexName=" + indexName + ", timestamp=" +
timestamp);
- return indexId;
+ return index;
}
}