This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 85f4fe5564 IGNITE-19276 Implement a mechanism to build indices 
distributively (#2676)
85f4fe5564 is described below

commit 85f4fe5564643723adffbfa5e19fb0e84d1c4a22
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Oct 16 12:01:22 2023 +0300

    IGNITE-19276 Implement a mechanism to build indices distributively (#2676)
---
 modules/index/build.gradle                         |   1 +
 .../index/IndexAvailabilityController.java         | 361 ++++++++++++++++++
 .../index/IndexAvailabilityControllerTest.java     | 403 +++++++++++++++++++++
 .../ignite/internal/table/TableTestUtils.java      |  31 +-
 4 files changed, 792 insertions(+), 4 deletions(-)

diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 264ef39637..367f7661c4 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -41,6 +41,7 @@ dependencies {
     testImplementation(testFixtures(project(':ignite-table')))
     testImplementation(testFixtures(project(':ignite-catalog')))
     testImplementation project(':ignite-placement-driver')
+    testImplementation project(':ignite-replicator')
     testImplementation libs.mockito.core
     testImplementation libs.mockito.junit
     testImplementation libs.hamcrest.core
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
new file mode 100644
index 0000000000..633d0cf133
--- /dev/null
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.function.Predicate.not;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.apache.ignite.internal.util.CollectionUtils.concat;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import 
org.apache.ignite.internal.catalog.IndexAlreadyAvailableValidationException;
+import org.apache.ignite.internal.catalog.IndexNotFoundValidationException;
+import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
+import 
org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import 
org.apache.ignite.internal.table.distributed.index.IndexBuildCompletionListener;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/**
+ * This component is responsible for ensuring that an index, upon completion 
of a distributed index building for all partitions, becomes
+ * available for read-write.
+ *
+ * <p>An approximate algorithm for making an index available for 
read-write:</p>
+ * <ul>
+ *     <li>On {@link CatalogEvent#INDEX_CREATE}, keys are created in the 
metastore: {@code indexBuild.inProgress.<indexId>} and
+ *     {@code indexBuild.partition.<indexId>.<partitionId_0>}...{@code 
indexBuild.partition.<indexId>.<partitionId_N>}.</li>
+ *     <li>Then it is expected that the distributed index building event will 
be triggered for all partitions via
+ *     {@link IndexBuildCompletionListener} (from {@link 
IndexBuilder#listen}); as a result of each of these events, the corresponding 
key
+ *     {@code indexBuild.partition.<indexId>.<partitionId>} will be deleted 
from metastore.</li>
+ *     <li>When all the {@code indexBuild.partition.<indexId>.<partitionId>} 
keys in the metastore are deleted,
+ *     {@link MakeIndexAvailableCommand} will be executed for the 
corresponding index.</li>
+ *     <li>At {@link CatalogEvent#INDEX_AVAILABLE}, key {@code 
indexBuild.inProgress.<indexId>} in the metastore will be deleted.</li>
+ * </ul>
+ *
+ * <p>Notes:</p>
+ * <ul>
+ *     <li>At {@link CatalogEvent#INDEX_DROP}, the keys in the metastore are 
deleted: {@code indexBuild.inProgress.<indexId>} and
+ *     {@code indexBuild.partition.<indexId>.<partitionId_0>}...{@code 
indexBuild.partition.<indexId>.<partitionId_N>}.</li>
+ *     <li>Handling of {@link CatalogEvent#INDEX_CREATE}, {@link 
CatalogEvent#INDEX_DROP}, {@link CatalogEvent#INDEX_AVAILABLE} and watch
+ *     prefix {@link #PARTITION_BUILD_INDEX_KEY_PREFIX} is made by the whole 
cluster (and only one node makes a write to the metastore) as
+ *     these events are global, but only one node (a primary replica owning a 
partition) handles
+ *     {@link IndexBuildCompletionListener#onBuildCompletion} (form {@link 
IndexBuilder#listen}) event.</li>
+ * </ul>
+ *
+ * @see CatalogIndexDescriptor#writeOnly()
+ */
+// TODO: IGNITE-20637 Recovery needs to be implemented
+// TODO: IGNITE-20637 Need integration with the IgniteImpl
+public class IndexAvailabilityController implements ManuallyCloseable {
+    private static final IgniteLogger LOG = 
Loggers.forClass(IndexAvailabilityController.class);
+
+    private static final String IN_PROGRESS_BUILD_INDEX_KEY_PREFIX = 
"indexBuild.inProgress.";
+
+    private static final String PARTITION_BUILD_INDEX_KEY_PREFIX = 
"indexBuild.partition.";
+
+    private final CatalogManager catalogManager;
+
+    private final MetaStorageManager metaStorageManager;
+
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /** Constructor. */
+    public IndexAvailabilityController(CatalogManager catalogManager, 
MetaStorageManager metaStorageManager, IndexBuilder indexBuilder) {
+        this.catalogManager = catalogManager;
+        this.metaStorageManager = metaStorageManager;
+
+        addListeners(catalogManager, metaStorageManager, indexBuilder);
+    }
+
+    @Override
+    public void close() {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+    }
+
+    private void addListeners(CatalogService catalogService, 
MetaStorageManager metaStorageManager, IndexBuilder indexBuilder) {
+        catalogService.listen(CatalogEvent.INDEX_CREATE, (parameters, 
exception) -> {
+            if (exception != null) {
+                return failedFuture(exception);
+            }
+
+            return onIndexCreate((CreateIndexEventParameters) 
parameters).thenApply(unused -> false);
+        });
+
+        catalogService.listen(CatalogEvent.INDEX_DROP, (parameters, exception) 
-> {
+            if (exception != null) {
+                return failedFuture(exception);
+            }
+
+            return onIndexDrop((DropIndexEventParameters) 
parameters).thenApply(unused -> false);
+        });
+
+        catalogService.listen(CatalogEvent.INDEX_AVAILABLE, (parameters, 
exception) -> {
+            if (exception != null) {
+                return failedFuture(exception);
+            }
+
+            return onIndexAvailable((MakeIndexAvailableEventParameters) 
parameters).thenApply(unused -> false);
+        });
+
+        
metaStorageManager.registerPrefixWatch(ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX),
 new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                return onUpdatePartitionBuildIndexKey(event).thenApply(unused 
-> null);
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                LOG.error("Error on handle partition build index key", e);
+            }
+        });
+
+        indexBuilder.listen((indexId, tableId, partitionId) -> 
onIndexBuildCompletionForPartition(indexId, partitionId));
+    }
+
+    private CompletableFuture<?> onIndexCreate(CreateIndexEventParameters 
parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            int indexId = parameters.indexDescriptor().id();
+
+            int partitions = getPartitionCountFromCatalog(indexId, 
parameters.catalogVersion());
+
+            ByteArray inProgressBuildIndexKey = 
inProgressBuildIndexKey(indexId);
+
+            return metaStorageManager.invoke(
+                    notExists(inProgressBuildIndexKey),
+                    concat(
+                            List.of(put(inProgressBuildIndexKey, 
BYTE_EMPTY_ARRAY)),
+                            putPartitionBuildIndexOperations(indexId, 
partitions)
+                    ),
+                    List.of(noop())
+            );
+        });
+    }
+
+    private CompletableFuture<?> onIndexDrop(DropIndexEventParameters 
parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            int indexId = parameters.indexId();
+
+            int partitions = getPartitionCountFromCatalog(indexId, 
parameters.catalogVersion() - 1);
+
+            ByteArray inProgressBuildIndexKey = 
inProgressBuildIndexKey(indexId);
+
+            return metaStorageManager.invoke(
+                    exists(inProgressBuildIndexKey),
+                    concat(
+                            List.of(remove(inProgressBuildIndexKey)),
+                            removePartitionBuildIndexOperations(indexId, 
partitions)
+                    ),
+                    List.of(noop())
+            );
+        });
+    }
+
+    private CompletableFuture<?> 
onIndexAvailable(MakeIndexAvailableEventParameters parameters) {
+        return inBusyLockAsync(busyLock, () -> {
+            ByteArray inProgressBuildIndexKey = 
inProgressBuildIndexKey(parameters.indexId());
+
+            return metaStorageManager.invoke(exists(inProgressBuildIndexKey), 
remove(inProgressBuildIndexKey), noop());
+        });
+    }
+
+    private CompletableFuture<?> onUpdatePartitionBuildIndexKey(WatchEvent 
event) {
+        return inBusyLockAsync(busyLock, () -> {
+            if (!event.single()) {
+                // We don't need to handle keys on index creation or deletion.
+                return completedFuture(null);
+            }
+
+            Entry entry = event.entryEvent().newEntry();
+
+            if (!entry.tombstone()) {
+                // In case an index was created when there was only one 
partition.
+                return completedFuture(null);
+            }
+
+            String partitionBuildIndexKey = new String(entry.key(), UTF_8);
+
+            int indexId = 
parseIndexIdFromPartitionBuildIndexKey(partitionBuildIndexKey);
+
+            ByteArray inProgressBuildIndexKey = 
inProgressBuildIndexKey(indexId);
+
+            long metastoreRevision = entry.revision();
+
+            if (isRemainingPartitionBuildIndexKeys(indexId, metastoreRevision)
+                    || isMetastoreKeyAbsent(inProgressBuildIndexKey, 
metastoreRevision)) {
+                return completedFuture(null);
+            }
+
+            // We can use the latest version of the catalog since we are on 
the metastore thread.
+            CatalogIndexDescriptor indexDescriptor = 
getIndexDescriptorStrict(indexId, catalogManager.latestCatalogVersion());
+
+            // We will not wait for the command to be executed, since we will 
then find ourselves in a dead lock since we will not be able
+            // to free the metastore thread.
+            catalogManager
+                    .execute(buildMakeIndexAvailableCommand(indexDescriptor))
+                    .whenComplete((unused, throwable) -> {
+                        if (throwable != null) {
+                            Throwable unwrapCause = unwrapCause(throwable);
+
+                            if (!(unwrapCause instanceof 
IndexNotFoundValidationException)
+                                    && !(unwrapCause instanceof 
IndexAlreadyAvailableValidationException)
+                                    && !(unwrapCause instanceof 
NodeStoppingException)) {
+                                LOG.error("Error processing the command to 
make the index available: {}", unwrapCause, indexId);
+                            }
+                        }
+                    });
+
+            return completedFuture(null);
+        });
+    }
+
+    private void onIndexBuildCompletionForPartition(int indexId, int 
partitionId) {
+        inBusyLock(busyLock, () -> {
+            ByteArray partitionBuildIndexKey = partitionBuildIndexKey(indexId, 
partitionId);
+
+            // Intentionally not waiting for the operation to complete or 
returning the future because it is not necessary.
+            metaStorageManager
+                    .invoke(exists(partitionBuildIndexKey), 
remove(partitionBuildIndexKey), noop())
+                    .whenComplete((operationResult, throwable) -> {
+                        if (throwable != null && !(unwrapCause(throwable) 
instanceof NodeStoppingException)) {
+                            LOG.error(
+                                    "Error processing the operation to delete 
the partition index building key: "
+                                            + "[indexId={}, partitionId={}]",
+                                    throwable,
+                                    indexId, partitionId
+                            );
+                        }
+                    });
+        });
+    }
+
+    private int getPartitionCountFromCatalog(int indexId, int catalogVersion) {
+        CatalogIndexDescriptor indexDescriptor = 
getIndexDescriptorStrict(indexId, catalogVersion);
+
+        CatalogTableDescriptor tableDescriptor = 
catalogManager.table(indexDescriptor.tableId(), catalogVersion);
+
+        assert tableDescriptor != null : "tableId=" + 
indexDescriptor.tableId() + ", catalogVersion=" + catalogVersion;
+
+        CatalogZoneDescriptor zoneDescriptor = 
catalogManager.zone(tableDescriptor.zoneId(), catalogVersion);
+
+        assert zoneDescriptor != null : "zoneId=" + tableDescriptor.zoneId() + 
", catalogVersion=" + catalogVersion;
+
+        return zoneDescriptor.partitions();
+    }
+
+    private CatalogIndexDescriptor getIndexDescriptorStrict(int indexId, int 
catalogVersion) {
+        CatalogIndexDescriptor indexDescriptor = catalogManager.index(indexId, 
catalogVersion);
+
+        assert indexDescriptor != null : "indexId=" + indexId + ", 
catalogVersion=" + catalogVersion;
+
+        return indexDescriptor;
+    }
+
+    private boolean isRemainingPartitionBuildIndexKeys(int indexId, long 
metastoreRevision) {
+        try (Cursor<Entry> cursor = 
metaStorageManager.prefixLocally(partitionBuildIndexKeyPrefix(indexId), 
metastoreRevision)) {
+            return cursor.stream().anyMatch(not(Entry::tombstone));
+        }
+    }
+
+    private boolean isMetastoreKeyAbsent(ByteArray key, long 
metastoreRevision) {
+        return metaStorageManager.getLocally(key, metastoreRevision).value() 
== null;
+    }
+
+    private static ByteArray inProgressBuildIndexKey(int indexId) {
+        return ByteArray.fromString(IN_PROGRESS_BUILD_INDEX_KEY_PREFIX + 
indexId);
+    }
+
+    private static ByteArray partitionBuildIndexKeyPrefix(int indexId) {
+        return ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX + 
indexId);
+    }
+
+    private static ByteArray partitionBuildIndexKey(int indexId, int 
partitionId) {
+        return ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX + indexId 
+ '.' + partitionId);
+    }
+
+    private static Collection<Operation> putPartitionBuildIndexOperations(int 
indexId, int partitions) {
+        return IntStream.range(0, partitions)
+                .mapToObj(partitionId -> partitionBuildIndexKey(indexId, 
partitionId))
+                .map(key -> put(key, BYTE_EMPTY_ARRAY))
+                .collect(toList());
+    }
+
+    private static Collection<Operation> 
removePartitionBuildIndexOperations(int indexId, int partitions) {
+        return IntStream.range(0, partitions)
+                .mapToObj(partitionId -> partitionBuildIndexKey(indexId, 
partitionId))
+                .map(Operations::remove)
+                .collect(toList());
+    }
+
+    private static int parseIndexIdFromPartitionBuildIndexKey(String key) {
+        assert key.startsWith(PARTITION_BUILD_INDEX_KEY_PREFIX) : key;
+
+        int indexIdFromIndex = PARTITION_BUILD_INDEX_KEY_PREFIX.length();
+
+        int indexIdToIndex = key.indexOf('.', indexIdFromIndex);
+
+        return Integer.parseInt(key.substring(indexIdFromIndex, 
indexIdToIndex));
+    }
+
+    private static CatalogCommand 
buildMakeIndexAvailableCommand(CatalogIndexDescriptor indexDescriptor) {
+        // TODO: IGNITE-20636 Use only indexId
+        return 
MakeIndexAvailableCommand.builder().schemaName(DEFAULT_SCHEMA_NAME).indexName(indexDescriptor.name()).build();
+    }
+}
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
new file mode 100644
index 0000000000..805692aa7e
--- /dev/null
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogTestUtils;
+import org.apache.ignite.internal.catalog.commands.AlterZoneParams;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.network.ClusterNode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** For {@link IndexAvailabilityController} testing. */
+public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
+    private static final String NODE_NAME = "test-node";
+
+    private static final String TABLE_NAME = "test-table";
+
+    private static final String COLUMN_NAME = "test-column";
+
+    private static final String INDEX_NAME = "test-index";
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    private int partitions;
+
+    private final VaultManager vaultManager = new VaultManager(new 
InMemoryVaultService());
+
+    private final MetaStorageManagerImpl metaStorageManager = 
StandaloneMetaStorageManager.create(vaultManager);
+
+    private final CatalogManager catalogManager = 
CatalogTestUtils.createTestCatalogManager(NODE_NAME, clock, metaStorageManager);
+
+    private final IndexBuilder indexBuilder = new IndexBuilder(
+            NODE_NAME,
+            1,
+            mock(ReplicaService.class, invocation -> completedFuture(null))
+    );
+
+    private final IndexAvailabilityController indexAvailabilityController = 
new IndexAvailabilityController(
+            catalogManager,
+            metaStorageManager,
+            indexBuilder
+    );
+
+    @BeforeEach
+    void setUp() {
+        Stream.of(vaultManager, metaStorageManager, 
catalogManager).forEach(IgniteComponent::start);
+
+        assertThat(metaStorageManager.deployWatches(), 
willCompleteSuccessfully());
+
+        CatalogZoneDescriptor zoneDescriptor = 
catalogManager.zone(DEFAULT_ZONE_NAME, clock.nowLong());
+
+        assertNotNull(zoneDescriptor);
+
+        partitions = zoneDescriptor.partitions();
+
+        assertThat(partitions, greaterThan(4));
+
+        TableTestUtils.createTable(
+                catalogManager,
+                DEFAULT_SCHEMA_NAME,
+                DEFAULT_ZONE_NAME,
+                TABLE_NAME,
+                
List.of(ColumnParams.builder().name(COLUMN_NAME).type(INT32).build()),
+                List.of(COLUMN_NAME)
+        );
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        IgniteUtils.closeAll(
+                indexAvailabilityController::close,
+                indexBuilder::close,
+                catalogManager::stop,
+                metaStorageManager::stop,
+                vaultManager::stop
+        );
+    }
+
+    @Test
+    void testMetastoreKeysAfterIndexCreate() throws Exception {
+        createIndex(INDEX_NAME);
+
+        int indexId = indexId(INDEX_NAME);
+
+        awaitTillGlobalMetastoreRevisionIsApplied();
+
+        assertInProgressBuildIndexKeyExists(indexId);
+
+        for (int partitionId = 0; partitionId < partitions; partitionId++) {
+            assertPartitionBuildIndexKeyExists(indexId, partitionId);
+        }
+    }
+
+    @Test
+    void testMetastoreKeysAfterIndexCreateForOnlyOnePartition() throws 
Exception {
+        changePartitionCountInCatalog(1);
+
+        testMetastoreKeysAfterIndexCreate();
+    }
+
+    @Test
+    void testMetastoreKeysAfterFinishBuildIndexForOnePartition() throws 
Exception {
+        createIndex(INDEX_NAME);
+
+        int indexId = indexId(INDEX_NAME);
+
+        finishBuildingIndexForPartition(indexId, 0);
+
+        awaitTillGlobalMetastoreRevisionIsApplied();
+
+        assertInProgressBuildIndexKeyExists(indexId);
+
+        assertPartitionBuildIndexKeyAbsent(indexId, 0);
+
+        for (int partitionId = 1; partitionId < partitions; partitionId++) {
+            assertPartitionBuildIndexKeyExists(indexId, partitionId);
+        }
+
+        assertTrue(indexDescriptor(INDEX_NAME).writeOnly());
+    }
+
+    @Test
+    void testMetastoreKeysAfterFinishBuildIndexForAllPartitions() throws 
Exception {
+        createIndex(INDEX_NAME);
+
+        int indexId = indexId(INDEX_NAME);
+
+        for (int partitionId = 0; partitionId < partitions; partitionId++) {
+            assertThat(
+                    
metaStorageManager.remove(ByteArray.fromString(partitionBuildIndexKey(indexId, 
partitionId))),
+                    willCompleteSuccessfully()
+            );
+        }
+
+        awaitTillGlobalMetastoreRevisionIsApplied();
+
+        assertInProgressBuildIndexKeyAbsent(indexId);
+
+        for (int partitionId = 0; partitionId < partitions; partitionId++) {
+            assertPartitionBuildIndexKeyAbsent(indexId, partitionId);
+        }
+
+        assertFalse(indexDescriptor(INDEX_NAME).writeOnly());
+    }
+
+    @Test
+    void testMetastoreKeysAfterDropIndex() throws Exception {
+        createIndex(INDEX_NAME);
+
+        int indexId = indexId(INDEX_NAME);
+
+        dropIndex(INDEX_NAME);
+
+        awaitTillGlobalMetastoreRevisionIsApplied();
+
+        assertInProgressBuildIndexKeyAbsent(indexId);
+
+        for (int partitionId = 0; partitionId < partitions; partitionId++) {
+            assertPartitionBuildIndexKeyAbsent(indexId, partitionId);
+        }
+    }
+
+    @Test
+    void testMetastoreKeysAfterDropIndexForOnlyOnePartition() throws Exception 
{
+        changePartitionCountInCatalog(1);
+
+        testMetastoreKeysAfterDropIndex();
+    }
+
+    @Test
+    void testMetastoreKeysAfterFinishBuildingOnePartitionAndDropIndex() throws 
Exception {
+        createIndex(INDEX_NAME);
+
+        int indexId = indexId(INDEX_NAME);
+
+        finishBuildingIndexForPartition(indexId, 0);
+
+        dropIndex(INDEX_NAME);
+
+        awaitTillGlobalMetastoreRevisionIsApplied();
+
+        assertInProgressBuildIndexKeyAbsent(indexId);
+
+        for (int partitionId = 0; partitionId < partitions; partitionId++) {
+            assertPartitionBuildIndexKeyAbsent(indexId, partitionId);
+        }
+    }
+
+    @Test
+    void testMetastoreKeysAfterDropIndexWithLastRemainingPartition() throws 
Exception {
+        createIndex(INDEX_NAME);
+
+        int indexId = indexId(INDEX_NAME);
+
+        for (int partitionId = 1; partitionId < partitions; partitionId++) {
+            finishBuildingIndexForPartition(indexId, partitionId);
+        }
+
+        dropIndex(INDEX_NAME);
+
+        awaitTillGlobalMetastoreRevisionIsApplied();
+
+        assertInProgressBuildIndexKeyAbsent(indexId);
+
+        for (int partitionId = 0; partitionId < partitions; partitionId++) {
+            assertPartitionBuildIndexKeyAbsent(indexId, partitionId);
+        }
+    }
+
+    @Test
+    void testMetastoreKeysAfterDropIndexWithTwoRemainingPartitions() throws 
Exception {
+        createIndex(INDEX_NAME);
+
+        int indexId = indexId(INDEX_NAME);
+
+        for (int partitionId = 2; partitionId < partitions; partitionId++) {
+            finishBuildingIndexForPartition(indexId, partitionId);
+        }
+
+        dropIndex(INDEX_NAME);
+
+        awaitTillGlobalMetastoreRevisionIsApplied();
+
+        assertInProgressBuildIndexKeyAbsent(indexId);
+
+        for (int partitionId = 0; partitionId < partitions; partitionId++) {
+            assertPartitionBuildIndexKeyAbsent(indexId, partitionId);
+        }
+    }
+
+    private void awaitTillGlobalMetastoreRevisionIsApplied() throws Exception {
+        assertTrue(
+                waitForCondition(() -> {
+                    CompletableFuture<Long> currentRevisionFuture = 
metaStorageManager.getService().currentRevision();
+
+                    assertThat(currentRevisionFuture, 
willCompleteSuccessfully());
+
+                    return currentRevisionFuture.join() == 
metaStorageManager.appliedRevision();
+                }, 1_000)
+        );
+    }
+
+    private void createIndex(String indexName) {
+        TableTestUtils.createHashIndex(catalogManager, DEFAULT_SCHEMA_NAME, 
TABLE_NAME, indexName, List.of(COLUMN_NAME), false);
+    }
+
+    private void dropIndex(String indexName) {
+        TableTestUtils.dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, 
indexName);
+    }
+
+    private int indexId(String indexName) {
+        return TableTestUtils.getIndexIdStrict(catalogManager, indexName, 
clock.nowLong());
+    }
+
+    private int tableId(String tableName) {
+        return TableTestUtils.getTableIdStrict(catalogManager, tableName, 
clock.nowLong());
+    }
+
+    private CatalogIndexDescriptor indexDescriptor(String indexName) {
+        return TableTestUtils.getIndexStrict(catalogManager, indexName, 
clock.nowLong());
+    }
+
+    private void changePartitionCountInCatalog(int newPartitions) {
+        assertThat(
+                
catalogManager.alterZone(AlterZoneParams.builder().zoneName(DEFAULT_ZONE_NAME).partitions(newPartitions).build()),
+                willCompleteSuccessfully()
+        );
+
+        partitions = newPartitions;
+    }
+
+    private void finishBuildingIndexForPartition(int indexId, int partitionId) 
{
+        // It may look complicated, but the other method through mocking 
IndexBuilder seems messier.
+        IndexStorage indexStorage = mock(IndexStorage.class);
+
+        when(indexStorage.getNextRowIdToBuild())
+                .thenReturn(new RowId(partitionId))
+                .thenReturn(null);
+
+        indexBuilder.scheduleBuildIndex(
+                tableId(TABLE_NAME),
+                partitionId,
+                indexId,
+                indexStorage,
+                mock(MvPartitionStorage.class),
+                mock(ClusterNode.class)
+        );
+
+        CompletableFuture<Void> finishBuildIndexFuture = new 
CompletableFuture<>();
+
+        indexBuilder.listen((indexId1, tableId, partitionId1) -> {
+            if (indexId1 == indexId && partitionId1 == partitionId) {
+                finishBuildIndexFuture.complete(null);
+            }
+        });
+
+        assertThat(finishBuildIndexFuture, willCompleteSuccessfully());
+    }
+
+    private void assertInProgressBuildIndexKeyExists(int indexId) {
+        String startBuildIndexKey = inProgressBuildIndexKey(indexId);
+
+        assertThat(
+                startBuildIndexKey,
+                
metaStorageManager.get(ByteArray.fromString(startBuildIndexKey)).thenApply(Entry::value),
+                willBe(BYTE_EMPTY_ARRAY)
+        );
+    }
+
+    private void assertInProgressBuildIndexKeyAbsent(int indexId) {
+        String startBuildIndexKey = inProgressBuildIndexKey(indexId);
+
+        assertThat(
+                startBuildIndexKey,
+                
metaStorageManager.get(ByteArray.fromString(startBuildIndexKey)).thenApply(Entry::value),
+                willBe(nullValue())
+        );
+    }
+
+    private void assertPartitionBuildIndexKeyExists(int indexId, int 
partitionId) {
+        String partitionBuildIndexKey = partitionBuildIndexKey(indexId, 
partitionId);
+
+        assertThat(
+                partitionBuildIndexKey,
+                
metaStorageManager.get(ByteArray.fromString(partitionBuildIndexKey)).thenApply(Entry::value),
+                willBe(BYTE_EMPTY_ARRAY)
+        );
+    }
+
+    private void assertPartitionBuildIndexKeyAbsent(int indexId, int 
partitionId) {
+        String partitionBuildIndexKey = partitionBuildIndexKey(indexId, 
partitionId);
+
+        assertThat(
+                partitionBuildIndexKey,
+                
metaStorageManager.get(ByteArray.fromString(partitionBuildIndexKey)).thenApply(Entry::value),
+                willBe(nullValue())
+        );
+    }
+
+    private static String inProgressBuildIndexKey(int indexId) {
+        return "indexBuild.inProgress." + indexId;
+    }
+
+    private static String partitionBuildIndexKey(int indexId, int partitionId) 
{
+        return "indexBuild.partition." + indexId + "." + partitionId;
+    }
+}
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
index dcc78df7cd..a358fd10d7 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java
@@ -184,7 +184,7 @@ public class TableTestUtils {
      * @param timestamp Timestamp.
      */
     public static @Nullable Integer getIndexId(CatalogService catalogService, 
String indexName, long timestamp) {
-        CatalogIndexDescriptor index = catalogService.index(indexName, 
timestamp);
+        CatalogIndexDescriptor index = getIndex(catalogService, indexName, 
timestamp);
 
         return index == null ? null : index.id();
     }
@@ -198,10 +198,33 @@ public class TableTestUtils {
      * @throws AssertionError If table is absent.
      */
     public static int getIndexIdStrict(CatalogService catalogService, String 
indexName, long timestamp) {
-        Integer indexId = getIndexId(catalogService, indexName, timestamp);
+        return getIndexStrict(catalogService, indexName, timestamp).id();
+    }
+
+    /**
+     * Returns index descriptor from the catalog, {@code null} if the table is 
absent.
+     *
+     * @param catalogService Catalog service.
+     * @param indexName Index name.
+     * @param timestamp Timestamp.
+     */
+    public static @Nullable CatalogIndexDescriptor getIndex(CatalogService 
catalogService, String indexName, long timestamp) {
+        return catalogService.index(indexName, timestamp);
+    }
+
+    /**
+     * Returns index descriptor from the catalog.
+     *
+     * @param catalogService Catalog service.
+     * @param indexName Index name.
+     * @param timestamp Timestamp.
+     * @throws AssertionError If table descriptor is absent.
+     */
+    public static CatalogIndexDescriptor getIndexStrict(CatalogService 
catalogService, String indexName, long timestamp) {
+        CatalogIndexDescriptor index = catalogService.index(indexName, 
timestamp);
 
-        assertNotNull(indexId, "indexName=" + indexName + ", timestamp=" + 
timestamp);
+        assertNotNull(index, "indexName=" + indexName + ", timestamp=" + 
timestamp);
 
-        return indexId;
+        return index;
     }
 }


Reply via email to