This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5a826a0f01 IGNITE-20637 Implement recovery of distributed index
building (#2756)
5a826a0f01 is described below
commit 5a826a0f01c368e327a666fe05e89cb62adbbb22
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Oct 31 19:25:02 2023 +0300
IGNITE-20637 Implement recovery of distributed index building (#2756)
---
.../index/IndexAvailabilityController.java | 187 +++--------
.../index/IndexAvailabilityControllerRestorer.java | 254 ++++++++++++++
.../internal/index/IndexManagementUtils.java | 266 +++++++++++++++
.../apache/ignite/internal/index/IndexManager.java | 3 +
.../IndexAvailabilityControllerRestorerTest.java | 374 +++++++++++++++++++++
.../index/IndexAvailabilityControllerTest.java | 35 +-
.../internal/index/IndexBuildControllerTest.java | 46 +--
.../internal/index/IndexManagementUtilsTest.java | 123 +++++++
.../ignite/internal/index/IndexManagerTest.java | 26 +-
.../internal/index/TestIndexManagementUtils.java | 126 +++++++
.../index/IndexBuildCompletionListener.java | 2 +-
.../table/distributed/index/IndexBuilder.java | 15 +-
.../table/distributed/index/IndexBuilderTest.java | 2 +-
13 files changed, 1236 insertions(+), 223 deletions(-)
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
index 8f135af85a..0c790f63f8 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
@@ -17,36 +17,37 @@
package org.apache.ignite.internal.index;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.function.Predicate.not;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.PARTITION_BUILD_INDEX_KEY_PREFIX;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.extractIndexIdFromPartitionBuildIndexKey;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.getPartitionCountFromCatalog;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.inProgressBuildIndexMetastoreKey;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.isAnyMetastoreKeyPresentLocally;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.isMetastoreKeyAbsentLocally;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.makeIndexAvailableInCatalogWithoutFuture;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKey;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKeyPrefix;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.putBuildIndexMetastoreKeysIfAbsent;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.removeMetastoreKeyIfPresent;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.toPartitionBuildIndexMetastoreKeyString;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
-import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
-import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
-import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.CollectionUtils.concat;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
-import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
-import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogService;
-import
org.apache.ignite.internal.catalog.IndexAlreadyAvailableValidationException;
-import org.apache.ignite.internal.catalog.IndexNotFoundValidationException;
import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
@@ -64,7 +65,6 @@ import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import
org.apache.ignite.internal.table.distributed.index.IndexBuildCompletionListener;
import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
-import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
/**
@@ -73,37 +73,38 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
*
* <p>An approximate algorithm for making an index available:</p>
* <ul>
- * <li>On {@link CatalogEvent#INDEX_CREATE}, keys are created in the
metastore: {@code indexBuild.inProgress.<indexId>} and
- * {@code indexBuild.partition.<indexId>.<partitionId_0>}...{@code
indexBuild.partition.<indexId>.<partitionId_N>}.</li>
+ * <li>On {@link CatalogEvent#INDEX_CREATE},
+ * {@link
IndexManagementUtils#putBuildIndexMetastoreKeysIfAbsent(MetaStorageManager,
int, int) index building keys} are created in the
+ * metastore.</li>
* <li>Then it is expected that the distributed index building event will
be triggered for all partitions via
- * {@link IndexBuildCompletionListener} (from {@link
IndexBuilder#listen}); as a result of each of these events, the corresponding
key
- * {@code indexBuild.partition.<indexId>.<partitionId>} will be deleted
from metastore.</li>
- * <li>When all the {@code indexBuild.partition.<indexId>.<partitionId>}
keys in the metastore are deleted,
- * {@link MakeIndexAvailableCommand} will be executed for the
corresponding index.</li>
- * <li>At {@link CatalogEvent#INDEX_AVAILABLE}, key {@code
indexBuild.inProgress.<indexId>} in the metastore will be deleted.</li>
+ * {@link IndexBuildCompletionListener} (from {@link
IndexBuilder#listen}); as a result of each of these events, the corresponding
+ * {@link IndexManagementUtils#partitionBuildIndexMetastoreKey(int, int)
partition building index key} will be deleted from
+ * metastore.</li>
+ * <li>When <b>all</b> the {@link
IndexManagementUtils#partitionBuildIndexMetastoreKey(int, int) partition index
building key} in the
+ * metastore are deleted, {@link MakeIndexAvailableCommand} will be
executed for the corresponding index.</li>
+ * <li>At {@link CatalogEvent#INDEX_AVAILABLE},
+ * {@link IndexManagementUtils#inProgressBuildIndexMetastoreKey(int) in
progress index building key} in the metastore will be
+ * deleted.</li>
* </ul>
*
* <p>Notes:</p>
* <ul>
- * <li>At {@link CatalogEvent#INDEX_DROP}, the keys in the metastore are
deleted: {@code indexBuild.inProgress.<indexId>} and
- * {@code indexBuild.partition.<indexId>.<partitionId_0>}...{@code
indexBuild.partition.<indexId>.<partitionId_N>}.</li>
+ * <li>At {@link CatalogEvent#INDEX_DROP},
+ * {@link
IndexManagementUtils#putBuildIndexMetastoreKeysIfAbsent(MetaStorageManager,
int, int) index building keys} in the metastore
+ * are deleted.</li>
* <li>Handling of {@link CatalogEvent#INDEX_CREATE}, {@link
CatalogEvent#INDEX_DROP}, {@link CatalogEvent#INDEX_AVAILABLE} and watch
- * prefix {@link #PARTITION_BUILD_INDEX_KEY_PREFIX} is made by the whole
cluster (and only one node makes a write to the metastore) as
- * these events are global, but only one node (a primary replica owning a
partition) handles
+ * prefix {@link IndexManagementUtils#PARTITION_BUILD_INDEX_KEY_PREFIX} is
made by the whole cluster (and only one node makes a write to
+ * the metastore) as these events are global, but only one node (a primary
replica owning a partition) handles
* {@link IndexBuildCompletionListener#onBuildCompletion} (form {@link
IndexBuilder#listen}) event.</li>
+ * <li>Restoring index availability occurs in {@link
IndexAvailabilityControllerRestorer}.</li>
* </ul>
*
* @see CatalogIndexDescriptor#available()
*/
-// TODO: IGNITE-20637 Recovery needs to be implemented
-// TODO: IGNITE-20637 Need integration with the IgniteImpl
+// TODO: IGNITE-20638 Need integration with the IgniteImpl
public class IndexAvailabilityController implements ManuallyCloseable {
private static final IgniteLogger LOG =
Loggers.forClass(IndexAvailabilityController.class);
- private static final String IN_PROGRESS_BUILD_INDEX_KEY_PREFIX =
"indexBuild.inProgress.";
-
- private static final String PARTITION_BUILD_INDEX_KEY_PREFIX =
"indexBuild.partition.";
-
private final CatalogManager catalogManager;
private final MetaStorageManager metaStorageManager;
@@ -173,18 +174,9 @@ public class IndexAvailabilityController implements
ManuallyCloseable {
return inBusyLockAsync(busyLock, () -> {
int indexId = parameters.indexDescriptor().id();
- int partitions = getPartitionCountFromCatalog(indexId,
parameters.catalogVersion());
-
- ByteArray inProgressBuildIndexKey =
inProgressBuildIndexKey(indexId);
+ int partitions = getPartitionCountFromCatalog(catalogManager,
indexId, parameters.catalogVersion());
- return metaStorageManager.invoke(
- notExists(inProgressBuildIndexKey),
- concat(
- List.of(put(inProgressBuildIndexKey,
BYTE_EMPTY_ARRAY)),
- putPartitionBuildIndexOperations(indexId,
partitions)
- ),
- List.of(noop())
- );
+ return putBuildIndexMetastoreKeysIfAbsent(metaStorageManager,
indexId, partitions);
});
}
@@ -192,15 +184,20 @@ public class IndexAvailabilityController implements
ManuallyCloseable {
return inBusyLockAsync(busyLock, () -> {
int indexId = parameters.indexId();
- int partitions = getPartitionCountFromCatalog(indexId,
parameters.catalogVersion() - 1);
+ int partitions = getPartitionCountFromCatalog(catalogManager,
indexId, parameters.catalogVersion() - 1);
+
+ ByteArray inProgressBuildIndexKey =
inProgressBuildIndexMetastoreKey(indexId);
- ByteArray inProgressBuildIndexKey =
inProgressBuildIndexKey(indexId);
+ List<Operation> removePartitionBuildIndexMetastoreKeyOperations =
IntStream.range(0, partitions)
+ .mapToObj(partitionId ->
partitionBuildIndexMetastoreKey(indexId, partitionId))
+ .map(Operations::remove)
+ .collect(toList());
return metaStorageManager.invoke(
exists(inProgressBuildIndexKey),
concat(
List.of(remove(inProgressBuildIndexKey)),
- removePartitionBuildIndexOperations(indexId,
partitions)
+ removePartitionBuildIndexMetastoreKeyOperations
),
List.of(noop())
);
@@ -209,9 +206,9 @@ public class IndexAvailabilityController implements
ManuallyCloseable {
private CompletableFuture<?>
onIndexAvailable(MakeIndexAvailableEventParameters parameters) {
return inBusyLockAsync(busyLock, () -> {
- ByteArray inProgressBuildIndexKey =
inProgressBuildIndexKey(parameters.indexId());
+ ByteArray inProgressBuildIndexMetastoreKey =
inProgressBuildIndexMetastoreKey(parameters.indexId());
- return metaStorageManager.invoke(exists(inProgressBuildIndexKey),
remove(inProgressBuildIndexKey), noop());
+ return removeMetastoreKeyIfPresent(metaStorageManager,
inProgressBuildIndexMetastoreKey);
});
}
@@ -224,39 +221,25 @@ public class IndexAvailabilityController implements
ManuallyCloseable {
Entry entry = event.entryEvent().newEntry();
- if (!entry.tombstone()) {
+ if (entry.value() != null) {
// In case an index was created when there was only one
partition.
return completedFuture(null);
}
- String partitionBuildIndexKey = new String(entry.key(), UTF_8);
+ String partitionBuildIndexKey =
toPartitionBuildIndexMetastoreKeyString(entry.key());
- int indexId =
parseIndexIdFromPartitionBuildIndexKey(partitionBuildIndexKey);
-
- ByteArray inProgressBuildIndexKey =
inProgressBuildIndexKey(indexId);
+ int indexId =
extractIndexIdFromPartitionBuildIndexKey(partitionBuildIndexKey);
long metastoreRevision = entry.revision();
- if (isRemainingPartitionBuildIndexKeys(indexId, metastoreRevision)
- || isMetastoreKeyAbsent(inProgressBuildIndexKey,
metastoreRevision)) {
+ if (isAnyMetastoreKeyPresentLocally(metaStorageManager,
partitionBuildIndexMetastoreKeyPrefix(indexId), metastoreRevision)
+ || isMetastoreKeyAbsentLocally(metaStorageManager,
inProgressBuildIndexMetastoreKey(indexId), metastoreRevision)) {
return completedFuture(null);
}
// We will not wait for the command to be executed, since we will
then find ourselves in a dead lock since we will not be able
// to free the metastore thread.
- catalogManager
- .execute(buildMakeIndexAvailableCommand(indexId))
- .whenComplete((unused, throwable) -> {
- if (throwable != null) {
- Throwable unwrapCause = unwrapCause(throwable);
-
- if (!(unwrapCause instanceof
IndexNotFoundValidationException)
- && !(unwrapCause instanceof
IndexAlreadyAvailableValidationException)
- && !(unwrapCause instanceof
NodeStoppingException)) {
- LOG.error("Error processing the command to
make the index available: {}", unwrapCause, indexId);
- }
- }
- });
+ makeIndexAvailableInCatalogWithoutFuture(catalogManager, indexId,
LOG);
return completedFuture(null);
});
@@ -264,7 +247,7 @@ public class IndexAvailabilityController implements
ManuallyCloseable {
private void onIndexBuildCompletionForPartition(int indexId, int
partitionId) {
inBusyLock(busyLock, () -> {
- ByteArray partitionBuildIndexKey = partitionBuildIndexKey(indexId,
partitionId);
+ ByteArray partitionBuildIndexKey =
partitionBuildIndexMetastoreKey(indexId, partitionId);
// Intentionally not waiting for the operation to complete or
returning the future because it is not necessary.
metaStorageManager
@@ -281,76 +264,4 @@ public class IndexAvailabilityController implements
ManuallyCloseable {
});
});
}
-
- private int getPartitionCountFromCatalog(int indexId, int catalogVersion) {
- CatalogIndexDescriptor indexDescriptor =
getIndexDescriptorStrict(indexId, catalogVersion);
-
- CatalogTableDescriptor tableDescriptor =
catalogManager.table(indexDescriptor.tableId(), catalogVersion);
-
- assert tableDescriptor != null : "tableId=" +
indexDescriptor.tableId() + ", catalogVersion=" + catalogVersion;
-
- CatalogZoneDescriptor zoneDescriptor =
catalogManager.zone(tableDescriptor.zoneId(), catalogVersion);
-
- assert zoneDescriptor != null : "zoneId=" + tableDescriptor.zoneId() +
", catalogVersion=" + catalogVersion;
-
- return zoneDescriptor.partitions();
- }
-
- private CatalogIndexDescriptor getIndexDescriptorStrict(int indexId, int
catalogVersion) {
- CatalogIndexDescriptor indexDescriptor = catalogManager.index(indexId,
catalogVersion);
-
- assert indexDescriptor != null : "indexId=" + indexId + ",
catalogVersion=" + catalogVersion;
-
- return indexDescriptor;
- }
-
- private boolean isRemainingPartitionBuildIndexKeys(int indexId, long
metastoreRevision) {
- try (Cursor<Entry> cursor =
metaStorageManager.prefixLocally(partitionBuildIndexKeyPrefix(indexId),
metastoreRevision)) {
- return cursor.stream().anyMatch(not(Entry::tombstone));
- }
- }
-
- private boolean isMetastoreKeyAbsent(ByteArray key, long
metastoreRevision) {
- return metaStorageManager.getLocally(key, metastoreRevision).value()
== null;
- }
-
- private static ByteArray inProgressBuildIndexKey(int indexId) {
- return ByteArray.fromString(IN_PROGRESS_BUILD_INDEX_KEY_PREFIX +
indexId);
- }
-
- private static ByteArray partitionBuildIndexKeyPrefix(int indexId) {
- return ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX +
indexId);
- }
-
- private static ByteArray partitionBuildIndexKey(int indexId, int
partitionId) {
- return ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX + indexId
+ '.' + partitionId);
- }
-
- private static Collection<Operation> putPartitionBuildIndexOperations(int
indexId, int partitions) {
- return IntStream.range(0, partitions)
- .mapToObj(partitionId -> partitionBuildIndexKey(indexId,
partitionId))
- .map(key -> put(key, BYTE_EMPTY_ARRAY))
- .collect(toList());
- }
-
- private static Collection<Operation>
removePartitionBuildIndexOperations(int indexId, int partitions) {
- return IntStream.range(0, partitions)
- .mapToObj(partitionId -> partitionBuildIndexKey(indexId,
partitionId))
- .map(Operations::remove)
- .collect(toList());
- }
-
- private static int parseIndexIdFromPartitionBuildIndexKey(String key) {
- assert key.startsWith(PARTITION_BUILD_INDEX_KEY_PREFIX) : key;
-
- int indexIdFromIndex = PARTITION_BUILD_INDEX_KEY_PREFIX.length();
-
- int indexIdToIndex = key.indexOf('.', indexIdFromIndex);
-
- return Integer.parseInt(key.substring(indexIdFromIndex,
indexIdToIndex));
- }
-
- private static CatalogCommand buildMakeIndexAvailableCommand(int indexId) {
- return MakeIndexAvailableCommand.builder().indexId(indexId).build();
- }
}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorer.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorer.java
new file mode 100644
index 0000000000..429369fe04
--- /dev/null
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorer.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.getPartitionCountFromCatalog;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.inProgressBuildIndexMetastoreKey;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.isAnyMetastoreKeyPresentLocally;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.isMetastoreKeyAbsentLocally;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.isPrimaryReplica;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.makeIndexAvailableInCatalogWithoutFuture;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKey;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKeyPrefix;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.putBuildIndexMetastoreKeysIfAbsent;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.removeMetastoreKeyIfPresent;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Component responsible for restoring the algorithm from {@link
IndexAvailabilityController} if a node fails at some step.
+ *
+ * <p>Approximate recovery algorithm:</p>
+ * <ul>
+ * <li>For registered indexes: <ul>
+ * <li>If the new index did not have time to add
+ * {@link
IndexManagementUtils#putBuildIndexMetastoreKeysIfAbsent(MetaStorageManager,
int, int) index building keys}, then add them
+ * to the metastore if they are <b>absent</b>.</li>
+ * <li>If there are no {@link
IndexManagementUtils#partitionBuildIndexMetastoreKey(int, int) partition index
building keys} left for
+ * the index in the metastore, then we {@link
MakeIndexAvailableCommand make the index available} in the catalog.</li>
+ * <li>For partitions for which index building has not completed, we
will wait until the primary replica is elected (which will make
+ * sure it has applied all the commands from the replication log). If
after this we find out that the index has been built, we will
+ * remove the {@link
IndexManagementUtils#partitionBuildIndexMetastoreKey(int, int) partition index
building key} from the metastore
+ * if it is <b>present</b>.</li>
+ * </ul></li>
+ * <li>For available indexes: <ul>
+ * <li>Delete the {@link
IndexManagementUtils#inProgressBuildIndexMetastoreKey(int) “index construction
from progress” key} in the
+ * metastore if it is <b>present</b>.</li>
+ * </ul></li>
+ * </ul>
+ */
+public class IndexAvailabilityControllerRestorer implements ManuallyCloseable {
+ private static final IgniteLogger LOG =
Loggers.forClass(IndexAvailabilityControllerRestorer.class);
+
+ private final CatalogManager catalogManager;
+
+ private final MetaStorageManager metaStorageManager;
+
+ private final IndexManager indexManager;
+
+ private final PlacementDriver placementDriver;
+
+ private final ClusterService clusterService;
+
+ private final HybridClock clock;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ /** Constructor. */
+ public IndexAvailabilityControllerRestorer(
+ CatalogManager catalogManager,
+ MetaStorageManager metaStorageManager,
+ IndexManager indexManager,
+ PlacementDriver placementDriver,
+ ClusterService clusterService,
+ HybridClock clock
+ ) {
+ this.catalogManager = catalogManager;
+ this.metaStorageManager = metaStorageManager;
+ this.indexManager = indexManager;
+ this.placementDriver = placementDriver;
+ this.clusterService = clusterService;
+ this.clock = clock;
+ }
+
+ /**
+ * Recovers index availability.
+ *
+ * <p>NOTE: Should only be executed on node recovery.</p>
+ *
+ * @param recoveryRevision Metastore revision on recovery.
+ * @return Future of recovery execution.
+ */
+ public CompletableFuture<Void> recover(long recoveryRevision) {
+ return inBusyLockAsync(busyLock, () -> {
+ // It is expected that the method will only be called on recovery,
when the deploy of metastore watches has not yet occurred.
+ int catalogVersion = catalogManager.latestCatalogVersion();
+
+ List<CompletableFuture<?>> futures =
catalogManager.indexes(catalogVersion).stream()
+ .map(indexDescriptor -> {
+ if (indexDescriptor.available()) {
+ return
recoveryForAvailableIndexBusy(indexDescriptor, recoveryRevision);
+ } else {
+ return
recoveryForRegisteredIndexBusy(indexDescriptor, recoveryRevision,
catalogVersion);
+ }
+ })
+ .collect(toList());
+
+ return allOf(futures.toArray(CompletableFuture[]::new));
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+ }
+
+ private CompletableFuture<?>
recoveryForAvailableIndexBusy(CatalogIndexDescriptor indexDescriptor, long
recoveryRevision) {
+ assert indexDescriptor.available() : indexDescriptor.id();
+
+ int indexId = indexDescriptor.id();
+
+ ByteArray inProgressBuildIndexMetastoreKey =
inProgressBuildIndexMetastoreKey(indexId);
+
+ if (isMetastoreKeyAbsentLocally(metaStorageManager,
inProgressBuildIndexMetastoreKey, recoveryRevision)) {
+ return completedFuture(null);
+ }
+
+ return removeMetastoreKeyIfPresent(metaStorageManager,
inProgressBuildIndexMetastoreKey);
+ }
+
+ private CompletableFuture<?> recoveryForRegisteredIndexBusy(
+ CatalogIndexDescriptor indexDescriptor,
+ long recoveryRevision,
+ int catalogVersion
+ ) {
+ assert !indexDescriptor.available() : indexDescriptor.id();
+
+ int indexId = indexDescriptor.id();
+
+ if (isMetastoreKeyAbsentLocally(metaStorageManager,
inProgressBuildIndexMetastoreKey(indexId), recoveryRevision)) {
+ // After creating the index, we did not have time to create the
keys for building the index in the metastore.
+ return putBuildIndexMetastoreKeysIfAbsent(
+ metaStorageManager,
+ indexId,
+ getPartitionCountFromCatalog(catalogManager, indexId,
catalogVersion)
+ );
+ }
+
+ if (!isAnyMetastoreKeyPresentLocally(metaStorageManager,
partitionBuildIndexMetastoreKeyPrefix(indexId), recoveryRevision)) {
+ // Without wait, since the metastore watches deployment will be
only after the start of the components is completed and this
+ // will cause a dead lock.
+ makeIndexAvailableInCatalogWithoutFuture(catalogManager, indexId,
LOG);
+
+ return completedFuture(null);
+ }
+
+ return
recoveryForRemainingPartitionsOfRegisteredIndexBusy(indexDescriptor,
recoveryRevision);
+ }
+
+ private CompletableFuture<?>
recoveryForRemainingPartitionsOfRegisteredIndexBusy(
+ CatalogIndexDescriptor indexDescriptor,
+ long recoveryRevision
+ ) {
+ assert !indexDescriptor.available() : indexDescriptor.id();
+
+ int indexId = indexDescriptor.id();
+
+ try (Cursor<Entry> cursor =
metaStorageManager.prefixLocally(partitionBuildIndexMetastoreKeyPrefix(indexId),
recoveryRevision)) {
+ CompletableFuture<?>[] futures = cursor.stream()
+ .filter(entry -> entry.value() != null)
+ .map(Entry::key)
+
.map(IndexManagementUtils::toPartitionBuildIndexMetastoreKeyString)
+
.mapToInt(IndexManagementUtils::extractPartitionIdFromPartitionBuildIndexKey)
+ .mapToObj(partitionId ->
recoveryForPartitionOfRegisteredIndexBusy(indexDescriptor, partitionId,
recoveryRevision))
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(futures);
+ }
+ }
+
+ private CompletableFuture<?> recoveryForPartitionOfRegisteredIndexBusy(
+ CatalogIndexDescriptor indexDescriptor,
+ int partitionId,
+ long recoveryRevision
+ ) {
+ int indexId = indexDescriptor.id();
+ int tableId = indexDescriptor.tableId();
+
+ return indexManager.getMvTableStorage(recoveryRevision, tableId)
+ .thenCompose(mvTableStorage -> inBusyLockAsync(busyLock, () ->
{
+ var replicationGroupId = new TablePartitionId(tableId,
partitionId);
+
+ return
placementDriver.getPrimaryReplica(replicationGroupId, clock.now())
+ .thenCompose(primaryReplicaMeta ->
inBusyLockAsync(busyLock, () -> {
+ ClusterNode localNode =
clusterService.topologyService().localMember();
+
+ if (primaryReplicaMeta == null ||
!isPrimaryReplica(primaryReplicaMeta, localNode, clock.now())) {
+ // Local node is not the primary replica,
so we expect a primary replica to be elected (which will make
+ // sure it has applied all the commands
from the replication log). If a local node is elected, then
+ // IndexAvailabilityController will get
rid of the partitionBuildIndexMetastoreKey from the metastore on
+ // its own by
IndexBuildCompletionListener.onBuildCompletion event.
+ return completedFuture(null);
+ }
+
+ IndexStorage indexStorage =
mvTableStorage.getIndex(partitionId, indexId);
+
+ assert indexStorage != null : "indexId=" +
indexId + ", partitionId=" + partitionId;
+
+ if (indexStorage.getNextRowIdToBuild() !=
null) {
+ // Building of the index has not yet been
completed, so we have nothing to do yet.
+ return completedFuture(null);
+ }
+
+ return removeMetastoreKeyIfPresent(
+ metaStorageManager,
+
partitionBuildIndexMetastoreKey(indexId, partitionId)
+ );
+ }));
+ }));
+ }
+}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManagementUtils.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManagementUtils.java
new file mode 100644
index 0000000000..7d0af76413
--- /dev/null
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManagementUtils.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.apache.ignite.internal.util.CollectionUtils.concat;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import
org.apache.ignite.internal.catalog.IndexAlreadyAvailableValidationException;
+import org.apache.ignite.internal.catalog.IndexNotFoundValidationException;
+import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.network.ClusterNode;
+
+/** Helper class for index management. */
+class IndexManagementUtils {
+ /** Metastore key prefix for the "index in the process of building" in the
format: {@code "indexBuild.inProgress.<indexId>"}. */
+ static final String IN_PROGRESS_BUILD_INDEX_KEY_PREFIX =
"indexBuild.inProgress.";
+
+ /**
+ * Metastore key prefix for the "index in the process of building for
partition" in the format:
+ * {@code "indexBuild.partition.<indexId>.<partitionId>"}.
+ */
+ static final String PARTITION_BUILD_INDEX_KEY_PREFIX =
"indexBuild.partition.";
+
+ /**
+ * Returns {@code true} if the {@code key} is <b>absent</b> in the
metastore locally.
+ *
+ * @param metastore Metastore manager.
+ * @param key Key to check.
+ * @param revUpperBound Upper bound of metastore revision.
+ */
+ static boolean isMetastoreKeyAbsentLocally(MetaStorageManager metastore,
ByteArray key, long revUpperBound) {
+ return metastore.getLocally(key, revUpperBound).value() == null;
+ }
+
+ /**
+ * Returns {@code true} if at least one key by prefix is <b>present</b> in
the metastore locally.
+ *
+ * @param metastore Metastore manager.
+ * @param keyPrefix Key prefix to check.
+ * @param revUpperBound Upper bound of metastore revision.
+ */
+ static boolean isAnyMetastoreKeyPresentLocally(MetaStorageManager
metastore, ByteArray keyPrefix, long revUpperBound) {
+ try (Cursor<Entry> cursor = metastore.prefixLocally(keyPrefix,
revUpperBound)) {
+ return
cursor.stream().map(Entry::value).anyMatch(Objects::nonNull);
+ }
+ }
+
+ /**
+ * Removes a {@code key} from the metastore if <b>present</b>.
+ *
+ * @param metaStorageManager Metastore manager.
+ * @param key Key to remove.
+ * @return Future result {@code true} if actual removal happened,
otherwise {@code false}.
+ */
+ static CompletableFuture<Boolean>
removeMetastoreKeyIfPresent(MetaStorageManager metaStorageManager, ByteArray
key) {
+ return metaStorageManager.invoke(exists(key), remove(key), noop());
+ }
+
+ /**
+ * Puts index building keys into the metastore if they are absent.
+ *
+ * <p>NOTES: Presence of keys is determined by {@value
#IN_PROGRESS_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>"}.</p>
+ *
+ * <p>Keys: </p>
+ * <ul>
+ * <li>{@value #IN_PROGRESS_BUILD_INDEX_KEY_PREFIX} + {@code
"<indexId>"}.</li>
+ * <li>{@value #PARTITION_BUILD_INDEX_KEY_PREFIX} + {@code
"<indexId>.0"} ...
+ * {@value #PARTITION_BUILD_INDEX_KEY_PREFIX} + {@code
"<indexId>.<partitions-1>"}.</li>
+ * </ul>
+ *
+ * @param metastore Metastore manager.
+ * @param indexId Index ID.
+ * @param partitions Partition count.
+ * @return Future result {@code true} if success update was applied,
otherwise {@code false}.
+ */
+ static CompletableFuture<Boolean>
putBuildIndexMetastoreKeysIfAbsent(MetaStorageManager metastore, int indexId,
int partitions) {
+ ByteArray inProgressBuildIndexMetastoreKey =
inProgressBuildIndexMetastoreKey(indexId);
+
+ List<Operation> putPartitionBuildIndexMetastoreKeyOperations =
IntStream.range(0, partitions)
+ .mapToObj(partitionId ->
put(partitionBuildIndexMetastoreKey(indexId, partitionId), BYTE_EMPTY_ARRAY))
+ .collect(toList());
+
+ return metastore.invoke(
+ notExists(inProgressBuildIndexMetastoreKey),
+ concat(
+ List.of(put(inProgressBuildIndexMetastoreKey,
BYTE_EMPTY_ARRAY)),
+ putPartitionBuildIndexMetastoreKeyOperations
+ ),
+ List.of(noop())
+ );
+ }
+
+ /**
+ * Returns the "index in the process of building" metastore key, format:
+ * {@value #IN_PROGRESS_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>"}.
+ *
+ * @param indexId Index ID.
+ */
+ static ByteArray inProgressBuildIndexMetastoreKey(int indexId) {
+ return ByteArray.fromString(IN_PROGRESS_BUILD_INDEX_KEY_PREFIX +
indexId);
+ }
+
+ /**
+ * Returns the "building an index for the partition" metastore prefix key,
format:
+ * {@value #PARTITION_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>"}.
+ *
+ * @param indexId Index ID.
+ */
+ static ByteArray partitionBuildIndexMetastoreKeyPrefix(int indexId) {
+ return ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX +
indexId);
+ }
+
+ /**
+ * Returns the "building an index for the partition" metastore key, format:
+ * {@value #PARTITION_BUILD_INDEX_KEY_PREFIX} + {@code
"<indexId>.<partitionId>"}.
+ *
+ * @param indexId Index ID.
+ * @param partitionId Partition ID.
+ */
+ static ByteArray partitionBuildIndexMetastoreKey(int indexId, int
partitionId) {
+ return ByteArray.fromString(PARTITION_BUILD_INDEX_KEY_PREFIX + indexId
+ '.' + partitionId);
+ }
+
+ /**
+ * Converts bytes to string key: {@value PARTITION_BUILD_INDEX_KEY_PREFIX}
+ {@code "<indexId>.<partitionId>"}.
+ *
+ * @param bytes Bytes to convert.
+ */
+ static String toPartitionBuildIndexMetastoreKeyString(byte[] bytes) {
+ String keyStr = new String(bytes, UTF_8);
+
+ assert keyStr.startsWith(PARTITION_BUILD_INDEX_KEY_PREFIX) : keyStr;
+
+ return keyStr;
+ }
+
+ /**
+ * Returns partition count from the catalog.
+ *
+ * @param catalogService Catalog service.
+ * @param indexId Index ID.
+ * @param catalogVersion Catalog version.
+ */
+ static int getPartitionCountFromCatalog(CatalogService catalogService, int
indexId, int catalogVersion) {
+ CatalogIndexDescriptor indexDescriptor = catalogService.index(indexId,
catalogVersion);
+
+ assert indexDescriptor != null : "indexId=" + indexId + ",
catalogVersion=" + catalogVersion;
+
+ CatalogTableDescriptor tableDescriptor =
catalogService.table(indexDescriptor.tableId(), catalogVersion);
+
+ assert tableDescriptor != null : "tableId=" +
indexDescriptor.tableId() + ", catalogVersion=" + catalogVersion;
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogService.zone(tableDescriptor.zoneId(), catalogVersion);
+
+ assert zoneDescriptor != null : "zoneId=" + tableDescriptor.zoneId() +
", catalogVersion=" + catalogVersion;
+
+ return zoneDescriptor.partitions();
+ }
+
+ /**
+ * Makes the index available in the catalog, does not return the future
execution of the operation, so as not to create dead locks when
+ * performing the operation and the inability to complete it due to
execution in the metastore thread or on recovery (the metastore
+ * watches will not be deployed yet). Logs errors if it is not {@link
IndexNotFoundValidationException},
+ * {@link IndexAlreadyAvailableValidationException} or {@link
NodeStoppingException}.
+ *
+ * @param catalogManager Catalog manger.
+ * @param indexId Index ID.
+ * @param log Logger.
+ */
+ static void makeIndexAvailableInCatalogWithoutFuture(CatalogManager
catalogManager, int indexId, IgniteLogger log) {
+ catalogManager
+
.execute(MakeIndexAvailableCommand.builder().indexId(indexId).build())
+ .whenComplete((unused, throwable) -> {
+ if (throwable != null) {
+ Throwable unwrapCause = unwrapCause(throwable);
+
+ if (!(unwrapCause instanceof
IndexNotFoundValidationException)
+ && !(unwrapCause instanceof
IndexAlreadyAvailableValidationException)
+ && !(unwrapCause instanceof
NodeStoppingException)) {
+ log.error("Error processing the command to make
the index available: {}", unwrapCause, indexId);
+ }
+ }
+ });
+ }
+
+ /**
+ * Extracts a partition ID from the key: {@value
PARTITION_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>.<partitionId>"}.
+ *
+ * @param key Key.
+ * @return Partition ID.
+ */
+ static int extractPartitionIdFromPartitionBuildIndexKey(String key) {
+ assert key.startsWith(PARTITION_BUILD_INDEX_KEY_PREFIX) : key;
+
+ String[] strings = key.split("\\.");
+
+ return Integer.parseInt(strings[3]);
+ }
+
+ /**
+ * Extracts a index ID from the key: {@value
PARTITION_BUILD_INDEX_KEY_PREFIX} + {@code "<indexId>.<partitionId>"}.
+ *
+ * @param key Key.
+ * @return Index ID.
+ */
+ static int extractIndexIdFromPartitionBuildIndexKey(String key) {
+ assert key.startsWith(PARTITION_BUILD_INDEX_KEY_PREFIX) : key;
+
+ String[] strings = key.split("\\.");
+
+ return Integer.parseInt(strings[2]);
+ }
+
+ /**
+ * Returns {@code true} if the local node is the primary replica at the
timestamp of interest.
+ *
+ * @param primaryReplicaMeta Primary replica meta.
+ * @param localNode Local node.
+ * @param timestamp Timestamp of interest.
+ */
+ static boolean isPrimaryReplica(ReplicaMeta primaryReplicaMeta,
ClusterNode localNode, HybridTimestamp timestamp) {
+ return localNode.id().equals(primaryReplicaMeta.getLeaseholderId())
+ && timestamp.compareTo(primaryReplicaMeta.getExpirationTime())
< 0;
+ }
+}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 6d61eeb2f1..d54da07b72 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -167,6 +167,9 @@ public class IndexManager implements IgniteComponent {
* <p>Example: when we start building an index, we will need {@link
IndexStorage} (as well as storage {@link MvPartitionStorage}) to
* build it and we can get them in {@link CatalogEvent#INDEX_CREATE} using
this method.</p>
*
+ * <p>During recovery, it is important to wait until the local node
becomes a primary replica so that all index building commands are
+ * applied from the replication log.</p>
+ *
* @param causalityToken Causality token.
* @param tableId Table ID.
* @return Future with multi-version table storage, completes with {@code
null} if the table does not exist according to the passed
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
new file mode 100644
index 0000000000..b103055c58
--- /dev/null
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.Collections.emptyIterator;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.inProgressBuildIndexMetastoreKey;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKey;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.LOCAL_NODE;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_ID;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.assertMetastoreKeyAbsent;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.assertMetastoreKeyPresent;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.awaitTillGlobalMetastoreRevisionIsApplied;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.createIndex;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.createTable;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.indexDescriptor;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.indexId;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.makeIndexAvailable;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.newPrimaryReplicaMeta;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.tableId;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogTestUtils;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import
org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** For {@link IndexAvailabilityControllerRestorer} testing. */
+@ExtendWith(WorkDirectoryExtension.class)
+public class IndexAvailabilityControllerRestorerTest extends
BaseIgniteAbstractTest {
+ private static final int PARTITION_ID = 0;
+
+ @WorkDirectory
+ private Path workDir;
+
+ private final HybridClock clock = new HybridClockImpl();
+
+ private final PlacementDriver placementDriver =
mock(PlacementDriver.class);
+
+ private final ClusterService clusterService = mock(ClusterService.class);
+
+ private final IndexManager indexManager = mock(IndexManager.class);
+
+ private final VaultManager vaultManager = new VaultManager(new
InMemoryVaultService());
+
+ private KeyValueStorage keyValueStorage;
+
+ private MetaStorageManagerImpl metaStorageManager;
+
+ private CatalogManager catalogManager;
+
+ private IndexAvailabilityControllerRestorer restorer;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ keyValueStorage = new TestRocksDbKeyValueStorage(NODE_NAME, workDir);
+
+ metaStorageManager = StandaloneMetaStorageManager.create(vaultManager,
keyValueStorage);
+
+ catalogManager = CatalogTestUtils.createTestCatalogManager(NODE_NAME,
clock, metaStorageManager);
+
+ Stream.of(vaultManager, metaStorageManager,
catalogManager).forEach(IgniteComponent::start);
+
+ deployWatches();
+
+ createTable(catalogManager, TABLE_NAME, COLUMN_NAME);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(
+ restorer == null ? null : restorer::close,
+ catalogManager == null ? null : catalogManager::stop,
+ metaStorageManager == null ? null : metaStorageManager::stop,
+ vaultManager::stop
+ );
+ }
+
+ @Test
+ void testRemoveInProgressBuildIndexMetastoreKeyForAvailableIndexes()
throws Exception {
+ createIndex(catalogManager, TABLE_NAME, INDEX_NAME + 0, COLUMN_NAME);
+ createIndex(catalogManager, TABLE_NAME, INDEX_NAME + 1, COLUMN_NAME);
+
+ int indexId0 = indexId(catalogManager, INDEX_NAME + 0, clock);
+ int indexId1 = indexId(catalogManager, INDEX_NAME + 1, clock);
+
+ makeIndexAvailable(catalogManager, indexId0);
+ makeIndexAvailable(catalogManager, indexId1);
+
+ // Let's put the inProgressBuildIndexMetastoreKey for only one index
in the metastore.
+ putInProgressBuildIndexMetastoreKeyInMetastore(indexId0);
+
+ restartComponentsAndPerformRecovery();
+
+ // Let's do checks.
+ assertMetastoreKeyAbsent(metaStorageManager,
inProgressBuildIndexMetastoreKey(indexId0));
+ assertMetastoreKeyAbsent(metaStorageManager,
inProgressBuildIndexMetastoreKey(indexId1));
+
+ assertTrue(indexDescriptor(catalogManager, INDEX_NAME + 0,
clock).available());
+ assertTrue(indexDescriptor(catalogManager, INDEX_NAME + 1,
clock).available());
+ }
+
+ @Test
+ void
testMakeIndexAvailableIfNoLeftKeysBuildingIndexForPartitionInMetastore() throws
Exception {
+ createIndex(catalogManager, TABLE_NAME, INDEX_NAME, COLUMN_NAME);
+
+ int indexId = indexId(catalogManager, INDEX_NAME, clock);
+
+ putInProgressBuildIndexMetastoreKeyInMetastore(indexId);
+
+ restartComponentsAndPerformRecovery();
+
+ // Let's do checks.
+ assertMetastoreKeyPresent(metaStorageManager,
inProgressBuildIndexMetastoreKey(indexId));
+ assertTrue(indexDescriptor(catalogManager, INDEX_NAME,
clock).available());
+ }
+
+ @Test
+ void testRemovePartitionBuildIndexMetastoreKeyForRegisteredIndex() throws
Exception {
+ createIndexWithIndexBuildingKeys(INDEX_NAME, PARTITION_ID);
+
+ int indexId = indexId(catalogManager, INDEX_NAME, clock);
+ TablePartitionId replicaGroupId = new
TablePartitionId(tableId(catalogManager, TABLE_NAME, clock), PARTITION_ID);
+ ReplicaMeta primaryReplicaMeta =
createPrimaryReplicaMetaThatExpireInOneDay(LOCAL_NODE, replicaGroupId);
+
+ prepareToRestartNode(replicaGroupId, indexId, primaryReplicaMeta);
+
+ restartComponentsAndPerformRecovery();
+
+ // Let's do checks.
+ assertMetastoreKeyPresent(metaStorageManager,
inProgressBuildIndexMetastoreKey(indexId));
+ assertMetastoreKeyAbsent(metaStorageManager,
partitionBuildIndexMetastoreKey(indexId, PARTITION_ID));
+
+ assertFalse(indexDescriptor(catalogManager, INDEX_NAME,
clock).available());
+ }
+
+ @Test
+ void
testNotRemovePartitionBuildIndexMetastoreKeyForRegisteredIndexIfBuildingIndexNotComplete()
throws Exception {
+ createIndexWithIndexBuildingKeys(INDEX_NAME, PARTITION_ID);
+
+ int indexId = indexId(catalogManager, INDEX_NAME, clock);
+ TablePartitionId replicaGroupId = new
TablePartitionId(tableId(catalogManager, TABLE_NAME, clock), PARTITION_ID);
+ ReplicaMeta primaryReplicaMeta =
createPrimaryReplicaMetaThatExpireInOneDay(LOCAL_NODE, replicaGroupId);
+
+ prepareToRestartNode(replicaGroupId, indexId, primaryReplicaMeta, new
RowId(PARTITION_ID));
+
+ restartComponentsAndPerformRecovery();
+
+ // Let's do checks.
+ assertMetastoreKeyPresent(metaStorageManager,
inProgressBuildIndexMetastoreKey(indexId));
+ assertMetastoreKeyPresent(metaStorageManager,
partitionBuildIndexMetastoreKey(indexId, PARTITION_ID));
+
+ assertFalse(indexDescriptor(catalogManager, INDEX_NAME,
clock).available());
+ }
+
+ @Test
+ void
testNotRemovePartitionBuildIndexMetastoreKeyForRegisteredIndexIfPrimaryReplicaMetaNull()
throws Exception {
+ createIndexWithIndexBuildingKeys(INDEX_NAME, PARTITION_ID);
+
+ int indexId = indexId(catalogManager, INDEX_NAME, clock);
+ TablePartitionId replicaGroupId = new
TablePartitionId(tableId(catalogManager, TABLE_NAME, clock), PARTITION_ID);
+
+ prepareToRestartNode(replicaGroupId, indexId, null);
+
+ restartComponentsAndPerformRecovery();
+
+ // Let's do checks.
+ assertMetastoreKeyPresent(metaStorageManager,
inProgressBuildIndexMetastoreKey(indexId));
+ assertMetastoreKeyPresent(metaStorageManager,
partitionBuildIndexMetastoreKey(indexId, PARTITION_ID));
+
+ assertFalse(indexDescriptor(catalogManager, INDEX_NAME,
clock).available());
+ }
+
+ @Test
+ void
testNotRemovePartitionBuildIndexMetastoreKeyForRegisteredIndexIfPrimaryReplicaMetaChanges()
throws Exception {
+ createIndexWithIndexBuildingKeys(INDEX_NAME, PARTITION_ID);
+
+ int indexId = indexId(catalogManager, INDEX_NAME, clock);
+ TablePartitionId replicaGroupId = new
TablePartitionId(tableId(catalogManager, TABLE_NAME, clock), PARTITION_ID);
+
+ ReplicaMeta primaryReplicaMeta =
createPrimaryReplicaMetaThatExpireInOneDay(
+ new ClusterNodeImpl(NODE_ID + "_ID_OLD", NODE_NAME + "_OLD",
mock(NetworkAddress.class)),
+ replicaGroupId
+ );
+
+ prepareToRestartNode(replicaGroupId, indexId, primaryReplicaMeta);
+
+ restartComponentsAndPerformRecovery();
+
+ // Let's do checks.
+ assertMetastoreKeyPresent(metaStorageManager,
inProgressBuildIndexMetastoreKey(indexId));
+ assertMetastoreKeyPresent(metaStorageManager,
partitionBuildIndexMetastoreKey(indexId, PARTITION_ID));
+
+ assertFalse(indexDescriptor(catalogManager, INDEX_NAME,
clock).available());
+ }
+
+ private void createIndexWithIndexBuildingKeys(String indexName, int
partitionId) {
+ createIndex(catalogManager, TABLE_NAME, indexName, COLUMN_NAME);
+
+ int indexId = indexId(catalogManager, indexName, clock);
+
+ putInProgressBuildIndexMetastoreKeyInMetastore(indexId);
+ putPartitionBuildIndexMetastoreKeyInMetastore(indexId, partitionId);
+ }
+
+ private ReplicaMeta createPrimaryReplicaMetaThatExpireInOneDay(ClusterNode
clusterNode, TablePartitionId replicaGroupId) {
+ HybridTimestamp startTime = clock.now();
+ HybridTimestamp expirationTime =
startTime.addPhysicalTime(TimeUnit.DAYS.toMillis(1));
+
+ return newPrimaryReplicaMeta(clusterNode, replicaGroupId, startTime,
expirationTime);
+ }
+
+ private void prepareToRestartNode(
+ TablePartitionId replicaGroupId,
+ int indexId,
+ @Nullable ReplicaMeta primaryReplicaMeta,
+ RowId... rowIdsToBuild
+ ) {
+ setIndexStorageToIndexManager(replicaGroupId, indexId, rowIdsToBuild);
+ setLocalNodeToClusterService(LOCAL_NODE);
+ setPrimaryReplicaMetaToPlacementDriver(replicaGroupId,
primaryReplicaMeta);
+ }
+
+ private void putInProgressBuildIndexMetastoreKeyInMetastore(int indexId) {
+
assertThat(metaStorageManager.put(inProgressBuildIndexMetastoreKey(indexId),
BYTE_EMPTY_ARRAY), willCompleteSuccessfully());
+ }
+
+ private void putPartitionBuildIndexMetastoreKeyInMetastore(int indexId,
int partitionId) {
+ assertThat(
+
metaStorageManager.put(partitionBuildIndexMetastoreKey(indexId, partitionId),
BYTE_EMPTY_ARRAY),
+ willCompleteSuccessfully()
+ );
+ }
+
+ private void restartComponentsAndPerformRecovery() throws Exception {
+ stopAndRestartComponentsNoDeployWatches();
+
+ assertThat(recoveryRestorer(), willCompleteSuccessfully());
+
+ deployWatches();
+ }
+
+ private void stopAndRestartComponentsNoDeployWatches() throws Exception {
+ awaitTillGlobalMetastoreRevisionIsApplied(metaStorageManager);
+
+ IgniteUtils.closeAll(
+ catalogManager == null ? null : catalogManager::stop,
+ metaStorageManager == null ? null : metaStorageManager::stop
+ );
+
+ keyValueStorage = new TestRocksDbKeyValueStorage(NODE_NAME, workDir);
+
+ metaStorageManager = StandaloneMetaStorageManager.create(vaultManager,
keyValueStorage);
+
+ catalogManager =
spy(CatalogTestUtils.createTestCatalogManager(NODE_NAME, clock,
metaStorageManager));
+
+ Stream.of(metaStorageManager,
catalogManager).forEach(IgniteComponent::start);
+ }
+
+ private void deployWatches() throws Exception {
+ assertThat(metaStorageManager.deployWatches(),
willCompleteSuccessfully());
+
+ awaitTillGlobalMetastoreRevisionIsApplied(metaStorageManager);
+ }
+
+ private CompletableFuture<Void> recoveryRestorer() throws Exception {
+ if (restorer != null) {
+ restorer.close();
+ }
+
+ restorer = new IndexAvailabilityControllerRestorer(
+ catalogManager,
+ metaStorageManager,
+ indexManager,
+ placementDriver,
+ clusterService,
+ clock
+ );
+
+ CompletableFuture<Long> metastoreRecoveryFuture =
metaStorageManager.recoveryFinishedFuture();
+
+ assertThat(metastoreRecoveryFuture, willBe(greaterThan(0L)));
+
+ return restorer.recover(metastoreRecoveryFuture.join());
+ }
+
+ private void setIndexStorageToIndexManager(TablePartitionId
replicaGroupId, int indexId, RowId... rowIdsToBuild) {
+ MvTableStorage mvTableStorage = mock(MvTableStorage.class);
+ IndexStorage indexStorage = mock(IndexStorage.class);
+
+ Iterator<RowId> it = nullOrEmpty(rowIdsToBuild) ? emptyIterator() :
List.of(rowIdsToBuild).iterator();
+
+ when(indexStorage.getNextRowIdToBuild()).then(invocation ->
it.hasNext() ? it.next() : null);
+
+ when(mvTableStorage.getIndex(replicaGroupId.partitionId(),
indexId)).thenReturn(indexStorage);
+ when(indexManager.getMvTableStorage(anyLong(),
eq(replicaGroupId.tableId()))).thenReturn(completedFuture(mvTableStorage));
+ }
+
+ private void setLocalNodeToClusterService(ClusterNode clusterNode) {
+ TopologyService topologyService = mock(TopologyService.class,
invocation -> clusterNode);
+
+ when(clusterService.topologyService()).thenReturn(topologyService);
+ }
+
+ private void setPrimaryReplicaMetaToPlacementDriver(TablePartitionId
replicaGroupId, @Nullable ReplicaMeta primaryReplicaMeta) {
+ when(placementDriver.getPrimaryReplica(eq(replicaGroupId),
any())).thenReturn(completedFuture(primaryReplicaMeta));
+ }
+}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
index 8dcd6b7bc9..281b714cf4 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -20,11 +20,14 @@ package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.createTable;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
-import static org.apache.ignite.sql.ColumnType.INT32;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.nullValue;
@@ -40,7 +43,6 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.catalog.commands.AlterZoneParams;
-import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -67,14 +69,6 @@ import org.junit.jupiter.api.Test;
/** For {@link IndexAvailabilityController} testing. */
public class IndexAvailabilityControllerTest extends BaseIgniteAbstractTest {
- private static final String NODE_NAME = "test-node";
-
- private static final String TABLE_NAME = "test-table";
-
- private static final String COLUMN_NAME = "test-column";
-
- private static final String INDEX_NAME = "test-index";
-
private static final long ANY_ENLISTMENT_CONSISTENCY_TOKEN = 100500;
private final HybridClock clock = new HybridClockImpl();
@@ -113,14 +107,7 @@ public class IndexAvailabilityControllerTest extends
BaseIgniteAbstractTest {
assertThat(partitions, greaterThan(4));
- TableTestUtils.createTable(
- catalogManager,
- DEFAULT_SCHEMA_NAME,
- DEFAULT_ZONE_NAME,
- TABLE_NAME,
-
List.of(ColumnParams.builder().name(COLUMN_NAME).type(INT32).build()),
- List.of(COLUMN_NAME)
- );
+ createTable(catalogManager, TABLE_NAME, COLUMN_NAME);
}
@AfterEach
@@ -287,15 +274,7 @@ public class IndexAvailabilityControllerTest extends
BaseIgniteAbstractTest {
}
private void awaitTillGlobalMetastoreRevisionIsApplied() throws Exception {
- assertTrue(
- waitForCondition(() -> {
- CompletableFuture<Long> currentRevisionFuture =
metaStorageManager.getService().currentRevision();
-
- assertThat(currentRevisionFuture,
willCompleteSuccessfully());
-
- return currentRevisionFuture.join() ==
metaStorageManager.appliedRevision();
- }, 1_000)
- );
+
TestIndexManagementUtils.awaitTillGlobalMetastoreRevisionIsApplied(metaStorageManager);
}
private void createIndex(String indexName) {
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
index c3ce885949..759bdc3dca 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
@@ -19,13 +19,18 @@ package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
-import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.INDEX_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.LOCAL_NODE;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_ID;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.createTable;
import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex;
import static org.apache.ignite.internal.table.TableTestUtils.getIndexIdStrict;
import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.sql.ColumnType.INT32;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -44,7 +49,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogTestUtils;
-import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -64,10 +68,7 @@ import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.network.ClusterNodeImpl;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -75,20 +76,8 @@ import org.junit.jupiter.api.Test;
/** For {@link IndexBuildController} testing. */
public class IndexBuildControllerTest extends BaseIgniteAbstractTest {
- private static final String NODE_NAME = "test_node";
-
- private static final String NODE_ID = "test_node_id";
-
- private static final String TABLE_NAME = "test_table";
-
- private static final String COLUMN_NAME = "test_column";
-
- private static final String INDEX_NAME = "test_index";
-
private static final int PARTITION_ID = 10;
- private final ClusterNode localNode = new ClusterNodeImpl(NODE_ID,
NODE_NAME, mock(NetworkAddress.class));
-
private IndexBuilder indexBuilder;
private CatalogManager catalogManager;
@@ -114,19 +103,12 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
return completedFuture(mvTableStorage);
});
- ClusterService clusterService = mock(ClusterService.class, invocation
-> mock(TopologyService.class, invocation1 -> localNode));
+ ClusterService clusterService = mock(ClusterService.class, invocation
-> mock(TopologyService.class, invocation1 -> LOCAL_NODE));
catalogManager = CatalogTestUtils.createTestCatalogManager(NODE_NAME,
clock);
catalogManager.start();
- TableTestUtils.createTable(
- catalogManager,
- DEFAULT_SCHEMA_NAME,
- DEFAULT_ZONE_NAME,
- TABLE_NAME,
-
List.of(ColumnParams.builder().name(COLUMN_NAME).type(INT32).build()),
- List.of(COLUMN_NAME)
- );
+ createTable(catalogManager, TABLE_NAME, COLUMN_NAME);
indexBuildController = new IndexBuildController(indexBuilder,
indexManager, catalogManager, clusterService, placementDriver, clock);
}
@@ -150,7 +132,7 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId(INDEX_NAME)),
any(),
any(),
- eq(localNode),
+ eq(LOCAL_NODE),
anyLong()
);
}
@@ -167,7 +149,7 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId(INDEX_NAME)),
any(),
any(),
- eq(localNode),
+ eq(LOCAL_NODE),
anyLong()
);
@@ -177,7 +159,7 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId(pkIndexName(TABLE_NAME))),
any(),
any(),
- eq(localNode),
+ eq(LOCAL_NODE),
anyLong()
);
}
@@ -217,7 +199,7 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId(INDEX_NAME)),
any(),
any(),
- eq(localNode),
+ eq(LOCAL_NODE),
anyLong()
);
@@ -227,7 +209,7 @@ public class IndexBuildControllerTest extends
BaseIgniteAbstractTest {
eq(indexId(pkIndexName(TABLE_NAME))),
any(),
any(),
- eq(localNode),
+ eq(LOCAL_NODE),
anyLong()
);
}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagementUtilsTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagementUtilsTest.java
new file mode 100644
index 0000000000..58a73a325c
--- /dev/null
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagementUtilsTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.extractIndexIdFromPartitionBuildIndexKey;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.extractPartitionIdFromPartitionBuildIndexKey;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.inProgressBuildIndexMetastoreKey;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.isPrimaryReplica;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKey;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.partitionBuildIndexMetastoreKeyPrefix;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.toPartitionBuildIndexMetastoreKeyString;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.LOCAL_NODE;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_ID;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.newPrimaryReplicaMeta;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+
+/** For {@link IndexManagementUtils} testing. */
+public class IndexManagementUtilsTest extends BaseIgniteAbstractTest {
+ private final HybridClock clock = new HybridClockImpl();
+
+ @Test
+ void testPartitionBuildIndexMetastoreKey() {
+ assertEquals(ByteArray.fromString("indexBuild.partition.1.2"),
partitionBuildIndexMetastoreKey(1, 2));
+ assertEquals(ByteArray.fromString("indexBuild.partition.7.9"),
partitionBuildIndexMetastoreKey(7, 9));
+ }
+
+ @Test
+ void testInProgressBuildIndexMetastoreKey() {
+ assertEquals(ByteArray.fromString("indexBuild.inProgress.1"),
inProgressBuildIndexMetastoreKey(1));
+ assertEquals(ByteArray.fromString("indexBuild.inProgress.7"),
inProgressBuildIndexMetastoreKey(7));
+ }
+
+ @Test
+ void testPartitionBuildIndexMetastoreKeyPrefix() {
+ assertEquals(ByteArray.fromString("indexBuild.partition.1"),
partitionBuildIndexMetastoreKeyPrefix(1));
+ assertEquals(ByteArray.fromString("indexBuild.partition.7"),
partitionBuildIndexMetastoreKeyPrefix(7));
+ }
+
+ @Test
+ void tesToPartitionBuildIndexMetastoreKeyString() {
+ assertEquals("indexBuild.partition.1.2",
toPartitionBuildIndexMetastoreKeyString("indexBuild.partition.1.2".getBytes(UTF_8)));
+ assertEquals("indexBuild.partition.7.9",
toPartitionBuildIndexMetastoreKeyString("indexBuild.partition.7.9".getBytes(UTF_8)));
+ }
+
+ @Test
+ void testExtractPartitionIdFromPartitionBuildIndexKey() {
+ assertEquals(2,
extractPartitionIdFromPartitionBuildIndexKey("indexBuild.partition.1.2"));
+ assertEquals(9,
extractPartitionIdFromPartitionBuildIndexKey("indexBuild.partition.7.9"));
+ }
+
+ @Test
+ void testExtractIndexIdFromPartitionBuildIndexKey() {
+ assertEquals(1,
extractIndexIdFromPartitionBuildIndexKey("indexBuild.partition.1.2"));
+ assertEquals(7,
extractIndexIdFromPartitionBuildIndexKey("indexBuild.partition.7.9"));
+ }
+
+ @Test
+ void testIsPrimaryReplicaTrue() {
+ TablePartitionId replicaGroupId = new TablePartitionId(1, 0);
+
+ HybridTimestamp startTime = clock.now();
+ long dayInMillis = TimeUnit.DAYS.toMillis(1);
+
+ ReplicaMeta replicaMeta = newPrimaryReplicaMeta(LOCAL_NODE,
replicaGroupId, startTime, startTime.addPhysicalTime(dayInMillis));
+
+ assertTrue(isPrimaryReplica(replicaMeta, LOCAL_NODE, clock.now()));
+ }
+
+ @Test
+ void testIsPrimaryReplicaFalse() {
+ TablePartitionId replicaGroupId = new TablePartitionId(1, 0);
+
+ ClusterNode otherNode = new ClusterNodeImpl(NODE_ID + "-other",
NODE_NAME + "-other", mock(NetworkAddress.class));
+
+ HybridTimestamp now = clock.now();
+ long dayInMillis = TimeUnit.DAYS.toMillis(1);
+ long hourInMillis = TimeUnit.HOURS.toMillis(1);
+
+ HybridTimestamp startTime0 = now;
+ HybridTimestamp startTime1 = now.addPhysicalTime(-dayInMillis);
+
+ ReplicaMeta replicaMeta0 = newPrimaryReplicaMeta(otherNode,
replicaGroupId, startTime0, startTime0.addPhysicalTime(dayInMillis));
+ ReplicaMeta replicaMeta1 = newPrimaryReplicaMeta(LOCAL_NODE,
replicaGroupId, startTime1, startTime1.addPhysicalTime(hourInMillis));
+ ReplicaMeta replicaMeta2 = newPrimaryReplicaMeta(LOCAL_NODE,
replicaGroupId, now, now);
+
+ assertFalse(isPrimaryReplica(replicaMeta0, LOCAL_NODE, clock.now()));
+ assertFalse(isPrimaryReplica(replicaMeta1, LOCAL_NODE, clock.now()));
+ assertFalse(isPrimaryReplica(replicaMeta2, LOCAL_NODE, now));
+ }
+}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 9c5c440446..0b5e9834ea 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -21,13 +21,15 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_DATA_REGION;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.COLUMN_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME;
+import static
org.apache.ignite.internal.index.TestIndexManagementUtils.createTable;
import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex;
-import static org.apache.ignite.internal.table.TableTestUtils.createTable;
import static org.apache.ignite.internal.table.TableTestUtils.dropTable;
import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.sql.ColumnType.STRING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@@ -44,7 +46,6 @@ import java.util.function.LongFunction;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.ClockWaiter;
-import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
@@ -75,10 +76,6 @@ import org.junit.jupiter.api.Test;
* Test class to verify {@link IndexManager}.
*/
public class IndexManagerTest extends BaseIgniteAbstractTest {
- private static final String TABLE_NAME = "tName";
-
- private static final String COLUMN_NAME = "c";
-
private final HybridClock clock = new HybridClockImpl();
private VaultManager vaultManager;
@@ -105,13 +102,11 @@ public class IndexManagerTest extends
BaseIgniteAbstractTest {
when(schManager.schemaRegistry(anyLong(),
anyInt())).thenReturn(completedFuture(null));
- String nodeName = "test";
-
vaultManager = new VaultManager(new InMemoryVaultService());
- metaStorageManager = StandaloneMetaStorageManager.create(vaultManager,
new SimpleInMemoryKeyValueStorage(nodeName));
+ metaStorageManager = StandaloneMetaStorageManager.create(vaultManager,
new SimpleInMemoryKeyValueStorage(NODE_NAME));
- clockWaiter = new ClockWaiter(nodeName, clock);
+ clockWaiter = new ClockWaiter(NODE_NAME, clock);
catalogManager = new CatalogManagerImpl(new
UpdateLogImpl(metaStorageManager), clockWaiter);
@@ -129,14 +124,7 @@ public class IndexManagerTest extends
BaseIgniteAbstractTest {
assertThat(metaStorageManager.notifyRevisionUpdateListenerOnStart(),
willCompleteSuccessfully());
assertThat(metaStorageManager.deployWatches(),
willCompleteSuccessfully());
- createTable(
- catalogManager,
- DEFAULT_SCHEMA_NAME,
- DEFAULT_ZONE_NAME,
- TABLE_NAME,
-
List.of(ColumnParams.builder().name(COLUMN_NAME).length(100).type(STRING).build()),
- List.of(COLUMN_NAME)
- );
+ createTable(catalogManager, TABLE_NAME, COLUMN_NAME);
}
@AfterEach
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
new file mode 100644
index 0000000000..8b1339f288
--- /dev/null
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestIndexManagementUtils.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.NetworkAddress;
+
+/** Helper class for testing index management. */
+class TestIndexManagementUtils {
+ static final String NODE_NAME = "test-node";
+
+ static final String NODE_ID = "test-node-id";
+
+ static final String TABLE_NAME = "test-table";
+
+ static final String COLUMN_NAME = "test-column";
+
+ static final String INDEX_NAME = "test-index";
+
+ static final ClusterNode LOCAL_NODE = new ClusterNodeImpl(NODE_ID,
NODE_NAME, mock(NetworkAddress.class));
+
+ static void createTable(CatalogManager catalogManager, String tableName,
String columnName) {
+ TableTestUtils.createTable(
+ catalogManager,
+ DEFAULT_SCHEMA_NAME,
+ DEFAULT_ZONE_NAME,
+ tableName,
+
List.of(ColumnParams.builder().name(columnName).type(INT32).build()),
+ List.of(columnName)
+ );
+ }
+
+ static void createIndex(CatalogManager catalogManager, String tableName,
String indexName, String columnName) {
+ TableTestUtils.createHashIndex(catalogManager, DEFAULT_SCHEMA_NAME,
tableName, indexName, List.of(columnName), false);
+ }
+
+ static int indexId(CatalogService catalogService, String indexName,
HybridClock clock) {
+ return TableTestUtils.getIndexIdStrict(catalogService, indexName,
clock.nowLong());
+ }
+
+ static CatalogIndexDescriptor indexDescriptor(CatalogService
catalogService, String indexId, HybridClock clock) {
+ return TableTestUtils.getIndexStrict(catalogService, indexId,
clock.nowLong());
+ }
+
+ static int tableId(CatalogService catalogService, String tableName,
HybridClock clock) {
+ return TableTestUtils.getTableIdStrict(catalogService, tableName,
clock.nowLong());
+ }
+
+ static void makeIndexAvailable(CatalogManager catalogManager, int indexId)
{
+
assertThat(catalogManager.execute(MakeIndexAvailableCommand.builder().indexId(indexId).build()),
willCompleteSuccessfully());
+ }
+
+ static void
awaitTillGlobalMetastoreRevisionIsApplied(MetaStorageManagerImpl
metaStorageManager) throws Exception {
+ assertTrue(
+ waitForCondition(() -> {
+ CompletableFuture<Long> currentRevisionFuture =
metaStorageManager.getService().currentRevision();
+
+ assertThat(currentRevisionFuture,
willCompleteSuccessfully());
+
+ return currentRevisionFuture.join() ==
metaStorageManager.appliedRevision();
+ }, 1_000)
+ );
+ }
+
+ static void assertMetastoreKeyAbsent(MetaStorageManager
metaStorageManager, ByteArray key) {
+ assertThat(metaStorageManager.get(key).thenApply(Entry::value),
willBe(nullValue()));
+ }
+
+ static void assertMetastoreKeyPresent(MetaStorageManager
metaStorageManager, ByteArray key) {
+ assertThat(metaStorageManager.get(key).thenApply(Entry::value),
willBe(notNullValue()));
+ }
+
+ static ReplicaMeta newPrimaryReplicaMeta(
+ ClusterNode clusterNode,
+ TablePartitionId replicaGroupId,
+ HybridTimestamp startTime,
+ HybridTimestamp expirationTime
+ ) {
+ return new Lease(clusterNode.name(), clusterNode.id(), startTime,
expirationTime, replicaGroupId);
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildCompletionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildCompletionListener.java
index e2c98a3df5..8044e4bee2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildCompletionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildCompletionListener.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.table.distributed.index;
-/** Index build completion listener, , will be called when a distributed build
of an index for a specific partition completes. */
+/** Index build completion listener, will be called when a distributed build
of an index for a specific partition completes. */
@FunctionalInterface
public interface IndexBuildCompletionListener {
/** Handles the index build completion event. */
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
index 2f9e19574f..b7c9409d69 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
@@ -102,10 +102,13 @@ public class IndexBuilder implements ManuallyCloseable {
/**
* Schedules building the index if it is not already built or is not yet
in progress.
*
- * <p>Index is built in batches using {@link BuildIndexReplicaRequest},
which are then transformed into {@link BuildIndexCommand} on the
- * replica, batches are sent sequentially.</p>
- *
- * <p>It is expected that the index building is triggered by the primary
replica.</p>
+ * <p>Notes:</p>
+ * <ul>
+ * <li>Index is built in batches using {@link
BuildIndexReplicaRequest}, which are then transformed into {@link
BuildIndexCommand}
+ * on the replica, batches are sent sequentially.</li>
+ * <li>It is expected that the index building is triggered by the
primary replica.</li>
+ * <li>If the index has already been built, {@link
IndexBuildCompletionListener} will be notified.</li>
+ * </ul>
*
* @param tableId Table ID.
* @param partitionId Partition ID.
@@ -128,6 +131,10 @@ public class IndexBuilder implements ManuallyCloseable {
) {
inBusyLockSafe(busyLock, () -> {
if (indexStorage.getNextRowIdToBuild() == null) {
+ for (IndexBuildCompletionListener listener : listeners) {
+ listener.onBuildCompletion(indexId, tableId, partitionId);
+ }
+
return;
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexBuilderTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexBuilderTest.java
index b106aa1c68..c1ffc35255 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexBuilderTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexBuilderTest.java
@@ -113,7 +113,7 @@ public class IndexBuilderTest extends
BaseIgniteAbstractTest {
scheduleBuildIndex(INDEX_ID, TABLE_ID, PARTITION_ID, List.of());
- assertThat(listenCompletionIndexBuildingFuture, willTimeoutFast());
+ assertThat(listenCompletionIndexBuildingFuture,
willCompleteSuccessfully());
}
private void scheduleBuildIndex(int indexId, int tableId, int partitionId,
Collection<RowId> nextRowIdsToBuild) {