This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 11ee199daa IGNITE-18539 Implement build procedure for new indexes
(#1800)
11ee199daa is described below
commit 11ee199daabd4cdc2c450b4de96e0239190c81dd
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Mar 31 14:12:15 2023 +0300
IGNITE-18539 Implement build procedure for new indexes (#1800)
---
modules/index/build.gradle | 3 +
.../apache/ignite/internal/index/IndexBuilder.java | 274 +++++++++++++++++++++
.../apache/ignite/internal/index/IndexManager.java | 19 +-
.../ignite/internal/index/IndexManagerTest.java | 3 +-
.../runner/app/ItIgniteNodeRestartTest.java | 2 +-
.../internal/sql/engine/ItBuildIndexTest.java | 143 +++++++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
.../sql/engine/exec/MockedStructuresTest.java | 2 +-
.../ignite/distributed/ItTablePersistenceTest.java | 9 +-
.../distributed/ItTxDistributedTestSingleNode.java | 11 +-
.../apache/ignite/internal/table/TableImpl.java | 42 +++-
.../table/distributed/StorageUpdateHandler.java | 43 +++-
.../distributed/TableIndexStoragesSupplier.java | 40 +++
.../table/distributed/TableMessageGroup.java | 4 +
.../distributed/command/BuildIndexCommand.java | 50 ++++
.../table/distributed/raft/PartitionListener.java | 32 +++
.../internal/table/distributed/IndexBaseTest.java | 17 +-
.../PartitionGcOnWriteConcurrentTest.java | 9 +-
.../table/distributed/PartitionGcOnWriteTest.java | 10 +-
.../distributed/StorageUpdateHandlerTest.java | 132 ++++++++++
.../raft/PartitionCommandListenerTest.java | 93 +++++--
.../PartitionReplicaListenerIndexLockingTest.java | 3 +-
.../replication/PartitionReplicaListenerTest.java | 3 +-
.../table/impl/DummyInternalTableImpl.java | 24 +-
24 files changed, 909 insertions(+), 61 deletions(-)
diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index c104b76510..33c33510ba 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -26,6 +26,9 @@ dependencies {
implementation project(':ignite-schema')
implementation project(':ignite-table')
implementation project(':ignite-transactions')
+ implementation project(':ignite-storage-api')
+ implementation project(':ignite-network-api')
+ implementation project(':ignite-raft-api')
implementation libs.jetbrains.annotations
testImplementation(testFixtures(project(':ignite-configuration')))
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
new file mode 100644
index 0000000000..15364e5298
--- /dev/null
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class for managing the index building process.
+ */
+class IndexBuilder {
+ private static final IgniteLogger LOG =
Loggers.forClass(IndexBuilder.class);
+
+ /** Batch size of row IDs to build the index. */
+ private static final int BUILD_INDEX_ROW_ID_BATCH_SIZE = 100;
+
+ /** Message factory to create messages - RAFT commands. */
+ private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new
TableMessagesFactory();
+
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock;
+
+ /** Cluster service. */
+ private final ClusterService clusterService;
+
+ /** Index building executor. */
+ private final ExecutorService buildIndexExecutor;
+
+ IndexBuilder(String nodeName, IgniteSpinBusyLock busyLock, ClusterService
clusterService) {
+ this.busyLock = busyLock;
+ this.clusterService = clusterService;
+
+ int cpus = Runtime.getRuntime().availableProcessors();
+
+ buildIndexExecutor = new ThreadPoolExecutor(
+ cpus,
+ cpus,
+ 30,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ NamedThreadFactory.create(nodeName, "build-index", LOG)
+ );
+ }
+
+ /**
+ * Stops the index builder.
+ */
+ void stop() {
+ shutdownAndAwaitTermination(buildIndexExecutor, 10, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Initializes the build of the index.
+ */
+ void startIndexBuild(TableIndexView tableIndexView, TableImpl table) {
+ for (int partitionId = 0; partitionId <
table.internalTable().partitions(); partitionId++) {
+ // TODO: IGNITE-19177 Add assignments check
+ buildIndexExecutor.submit(new BuildIndexTask(table,
tableIndexView, partitionId, null));
+ }
+ }
+
+ /**
+ * Task of building a table index for a partition.
+ *
+ * <p>Only the leader of the raft group will manage the building of the
index. Leader sends batches of row IDs via
+ * {@link BuildIndexCommand}, the next batch will only be send after the
previous batch has been processed.
+ *
+ * <p>Index building itself occurs locally on each node of the raft group
when processing {@link BuildIndexCommand}. This ensures that
+ * the index build process in the raft group is consistent and that the
index build process is restored after restarting the raft group
+ * (not from the beginning).
+ */
+ private class BuildIndexTask implements Runnable {
+ private final TableImpl table;
+
+ private final TableIndexView tableIndexView;
+
+ private final int partitionId;
+
+ /**
+ * ID of the next row to build the index from the previous batch,
{@code null} if it is the first row after the index was created
+ * (both on a live node and after a restore).
+ */
+ private final @Nullable RowId nextRowIdToBuildFromPreviousBatch;
+
+ private BuildIndexTask(
+ TableImpl table,
+ TableIndexView tableIndexView,
+ int partitionId,
+ @Nullable RowId nextRowIdToBuildFromPreviousBatch
+ ) {
+ this.table = table;
+ this.tableIndexView = tableIndexView;
+ this.partitionId = partitionId;
+ this.nextRowIdToBuildFromPreviousBatch =
nextRowIdToBuildFromPreviousBatch;
+ }
+
+ @Override
+ public void run() {
+ if (!busyLock.enterBusy()) {
+ return;
+ }
+
+ try {
+ // At the time of creating the index, we should have already
waited for the table to be created and its raft of clients
+ // (services) to start for all partitions, so there should be
no errors.
+ RaftGroupService raftGroupService =
table.internalTable().partitionRaftGroupService(partitionId);
+
+ raftGroupService
+ // We do not check the presence of nodes in the
topology on purpose, so as not to get into races on
+ // rebalancing, it will be more convenient and
reliable for us to wait for a stable topology with a chosen
+ // leader.
+ .refreshAndGetLeaderWithTerm()
+ .thenComposeAsync(leaderWithTerm -> {
+ if (!busyLock.enterBusy()) {
+ return completedFuture(null);
+ }
+
+ try {
+ // At this point, we have a stable topology,
each node of which has already applied all local updates.
+ if
(!localNodeConsistentId().equals(leaderWithTerm.leader().consistentId())) {
+ // TODO: IGNITE-19053 Must handle the
change of leader
+ // TODO: IGNITE-19053 Add a test to change
the leader even at the start of the task
+ return completedFuture(null);
+ }
+
+ List<RowId> batchRowIds = collectRowIdBatch();
+
+ RowId nextRowId =
getNextRowIdForNextBatch(batchRowIds);
+
+ boolean finish = batchRowIds.size() <
BUILD_INDEX_ROW_ID_BATCH_SIZE || nextRowId == null;
+
+ // TODO: IGNITE-19053 Must handle the change
of leader
+ return
raftGroupService.run(createBuildIndexCommand(batchRowIds, finish))
+ .thenRun(() -> {
+ if (!finish) {
+ assert nextRowId != null :
createCommonTableIndexInfo();
+
+ buildIndexExecutor.submit(
+ new
BuildIndexTask(table, tableIndexView, partitionId, nextRowId)
+ );
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, buildIndexExecutor)
+ .whenComplete((unused, throwable) -> {
+ if (throwable != null) {
+ LOG.error("Index build error: [{}]",
throwable, createCommonTableIndexInfo());
+ }
+ });
+ } catch (Throwable t) {
+ LOG.error("Index build error: [{}]", t,
createCommonTableIndexInfo());
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private boolean isLocalNodeLeader(RaftGroupService raftGroupService) {
+ Peer leader = raftGroupService.leader();
+
+ assert leader != null : "tableId=" + table.tableId() + ",
partitionId=" + partitionId;
+
+ return localNodeConsistentId().equals(leader.consistentId());
+ }
+
+ private List<RowId> createBatchRowIds(RowId lastBuiltRowId, int
batchSize) {
+ MvPartitionStorage mvPartition =
table.internalTable().storage().getMvPartition(partitionId);
+
+ assert mvPartition != null : createCommonTableIndexInfo();
+
+ List<RowId> batch = new ArrayList<>(batchSize);
+
+ for (int i = 0; i < batchSize && lastBuiltRowId != null; i++) {
+ lastBuiltRowId = mvPartition.closestRowId(lastBuiltRowId);
+
+ if (lastBuiltRowId == null) {
+ break;
+ }
+
+ batch.add(lastBuiltRowId);
+
+ lastBuiltRowId = lastBuiltRowId.increment();
+ }
+
+ return batch;
+ }
+
+ private BuildIndexCommand createBuildIndexCommand(List<RowId> rowIds,
boolean finish) {
+ return TABLE_MESSAGES_FACTORY.buildIndexCommand()
+
.tablePartitionId(TABLE_MESSAGES_FACTORY.tablePartitionIdMessage()
+ .tableId(table.tableId())
+ .partitionId(partitionId)
+ .build()
+ )
+ .indexId(tableIndexView.id())
+ .rowIds(rowIds.stream().map(RowId::uuid).collect(toList()))
+ .finish(finish)
+ .build();
+ }
+
+ private String createCommonTableIndexInfo() {
+ return "table=" + table.name() + ", tableId=" + table.tableId()
+ + ", partitionId=" + partitionId
+ + ", index=" + tableIndexView.name() + ", indexId=" +
tableIndexView.id();
+ }
+
+ private String localNodeConsistentId() {
+ return clusterService.topologyService().localMember().name();
+ }
+
+ private @Nullable RowId getNextRowIdForNextBatch(List<RowId> batch) {
+ return batch.isEmpty() ? null : batch.get(batch.size() -
1).increment();
+ }
+
+ private @Nullable List<RowId> collectRowIdBatch() {
+ RowId nextRowIdToBuild;
+
+ if (nextRowIdToBuildFromPreviousBatch == null) {
+ nextRowIdToBuild =
table.internalTable().storage().getOrCreateIndex(partitionId,
tableIndexView.id()).getNextRowIdToBuild();
+
+ if (nextRowIdToBuild == null) {
+ // Index has already been built.
+ return null;
+ }
+ } else {
+ nextRowIdToBuild = nextRowIdToBuildFromPreviousBatch;
+ }
+
+ if (nextRowIdToBuildFromPreviousBatch == null) {
+ LOG.info("Start building the index: [{}]",
createCommonTableIndexInfo());
+ }
+
+ return createBatchRowIds(nextRowIdToBuild,
BUILD_INDEX_ROW_ID_BATCH_SIZE);
+ }
+ }
+}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index ed514ba196..4b0caa0ccf 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -67,6 +67,7 @@ import org.apache.ignite.lang.IndexAlreadyExistsException;
import org.apache.ignite.lang.IndexNotFoundException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.TableNotFoundException;
+import org.apache.ignite.network.ClusterService;
import org.jetbrains.annotations.NotNull;
/**
@@ -92,17 +93,29 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
/** Prevents double stopping of the component. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
+ /** Index builder. */
+ private final IndexBuilder indexBuilder;
+
/**
* Constructor.
*
+ * @param nodeName Node name.
* @param tablesCfg Tables and indexes configuration.
* @param schemaManager Schema manager.
* @param tableManager Table manager.
+ * @param clusterService Cluster service.
*/
- public IndexManager(TablesConfiguration tablesCfg, SchemaManager
schemaManager, TableManager tableManager) {
+ public IndexManager(
+ String nodeName,
+ TablesConfiguration tablesCfg,
+ SchemaManager schemaManager,
+ TableManager tableManager,
+ ClusterService clusterService
+ ) {
this.tablesCfg = Objects.requireNonNull(tablesCfg, "tablesCfg");
this.schemaManager = Objects.requireNonNull(schemaManager,
"schemaManager");
this.tableManager = tableManager;
+ this.indexBuilder = new IndexBuilder(nodeName, busyLock,
clusterService);
}
/** {@inheritDoc} */
@@ -159,6 +172,8 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
busyLock.block();
+ indexBuilder.stop();
+
LOG.info("Index manager stopped");
}
@@ -418,6 +433,8 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
table.pkId(indexId);
}
}
+
+ indexBuilder.startIndexBuild(tableIndexView, table);
});
return allOf(createIndexFuture, fireEventFuture);
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 99e8ff769b..0be0aef1ed 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -54,6 +54,7 @@ import
org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IndexNotFoundException;
+import org.apache.ignite.network.ClusterService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -92,7 +93,7 @@ public class IndexManagerTest {
when(schManager.schemaRegistry(anyLong(),
any())).thenReturn(completedFuture(null));
- indexManager = new IndexManager(tablesConfig, schManager,
tableManagerMock);
+ indexManager = new IndexManager("test", tablesConfig, schManager,
tableManagerMock, mock(ClusterService.class));
indexManager.start();
assertThat(
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index b17823bb1c..94ab5b0fd5 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -391,7 +391,7 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
topologyAwareRaftGroupServiceFactory
);
- var indexManager = new IndexManager(tablesConfiguration,
schemaManager, tableManager);
+ var indexManager = new IndexManager(name, tablesConfiguration,
schemaManager, tableManager, clusterSvc);
CatalogManager catalogManager = new CatalogServiceImpl(metaStorageMgr);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
new file mode 100644
index 0000000000..3ef6863171
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static java.util.stream.Collectors.joining;
+import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Integration test of index building.
+ */
+public class ItBuildIndexTest extends ClusterPerClassIntegrationTest {
+ private static final String ZONE_NAME = "zone_table";
+
+ private static final String TABLE_NAME = "test_table";
+
+ private static final String INDEX_NAME = "test_index";
+
+ @AfterEach
+ void tearDown() {
+ sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+ }
+
+ @ParameterizedTest
+ @MethodSource("replicas")
+ void testBuildIndexOnStableTopology(int replicas) throws Exception {
+ sql(IgniteStringFormatter.format("CREATE ZONE IF NOT EXISTS {} WITH
REPLICAS={}, PARTITIONS={}",
+ ZONE_NAME, replicas, 2
+ ));
+
+ sql(IgniteStringFormatter.format(
+ "CREATE TABLE {} (i0 INTEGER PRIMARY KEY, i1 INTEGER) WITH
PRIMARY_ZONE='{}'",
+ TABLE_NAME, ZONE_NAME.toUpperCase()
+ ));
+
+ sql(IgniteStringFormatter.format(
+ "INSERT INTO {} VALUES {}",
+ TABLE_NAME, toValuesString(List.of(1, 1), List.of(2, 2),
List.of(3, 3), List.of(4, 4), List.of(5, 5))
+ ));
+
+ sql(IgniteStringFormatter.format("CREATE INDEX {} ON {} (i1)",
INDEX_NAME, TABLE_NAME));
+
+ // TODO: IGNITE-18733
+ waitForIndex(INDEX_NAME);
+
+ waitForIndexBuild(TABLE_NAME, INDEX_NAME);
+
+ assertQuery(IgniteStringFormatter.format("SELECT * FROM {} WHERE i1 >
0", TABLE_NAME))
+ .matches(containsIndexScan("PUBLIC", TABLE_NAME.toUpperCase(),
INDEX_NAME.toUpperCase()))
+ .returns(1, 1)
+ .returns(2, 2)
+ .returns(3, 3)
+ .returns(4, 4)
+ .returns(5, 5)
+ .check();
+ }
+
+ private static int[] replicas() {
+ // TODO: IGNITE-19086 Fix NullPointerException on insertAll
+ // return new int[]{1, 2, 3};
+ return new int[]{1};
+ }
+
+ private static String toValuesString(List<Object>... values) {
+ return Stream.of(values)
+ .peek(Assertions::assertNotNull)
+ .map(objects ->
objects.stream().map(Object::toString).collect(joining(", ", "(", ")")))
+ .collect(joining(", "));
+ }
+
+ private void waitForIndexBuild(String tableName, String indexName) throws
Exception {
+ for (Ignite clusterNode : CLUSTER_NODES) {
+ CompletableFuture<Table> tableFuture =
clusterNode.tables().tableAsync(tableName);
+
+ assertThat(tableFuture, willCompleteSuccessfully());
+
+ TableImpl tableImpl = (TableImpl) tableFuture.join();
+
+ InternalTable internalTable = tableImpl.internalTable();
+
+ UUID indexId = ((IgniteImpl) clusterNode).clusterConfiguration()
+ .getConfiguration(TablesConfiguration.KEY)
+ .indexes()
+ .get(INDEX_NAME.toUpperCase())
+ .id()
+ .value();
+
+ assertNotNull(indexId, "table=" + tableName + ", index=" +
indexName);
+
+ for (int partitionId = 0; partitionId <
internalTable.partitions(); partitionId++) {
+ RaftGroupService raftGroupService =
internalTable.partitionRaftGroupService(partitionId);
+
+ Stream<Peer> allPeers =
Stream.concat(Stream.of(raftGroupService.leader()),
raftGroupService.peers().stream());
+
+ if
(allPeers.map(Peer::consistentId).noneMatch(clusterNode.name()::equals)) {
+ continue;
+ }
+
+ IndexStorage index =
internalTable.storage().getOrCreateIndex(partitionId, indexId);
+
+ assertTrue(waitForCondition(() -> index.getNextRowIdToBuild()
== null, 10, 10_000));
+ }
+ }
+ }
+}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index dc715c2d12..76087f301e 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -486,7 +486,7 @@ public class IgniteImpl implements Ignite {
topologyAwareRaftGroupServiceFactory
);
- indexManager = new IndexManager(tablesConfiguration, schemaManager,
distributedTblMgr);
+ indexManager = new IndexManager(name, tablesConfiguration,
schemaManager, distributedTblMgr, clusterSvc);
catalogManager = new CatalogServiceImpl(metaStorageMgr);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 0ef110270d..06253e8b62 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -249,7 +249,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
tblManager = mockManagers();
- idxManager = new IndexManager(tblsCfg, schemaManager, tblManager);
+ idxManager = new IndexManager(NODE_NAME, tblsCfg, schemaManager,
tblManager, cs);
idxManager.start();
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 7f0977a79a..667016c8a5 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -82,6 +82,7 @@ import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
@@ -375,8 +376,12 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(mvPartitionStorage);
- StorageUpdateHandler storageUpdateHandler =
- new StorageUpdateHandler(0, partitionDataStorage,
Map::of, tableCfg.dataStorage());
+ StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(
+ 0,
+ partitionDataStorage,
+
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of()),
+ tableCfg.dataStorage()
+ );
PartitionListener listener = new PartitionListener(
partitionDataStorage,
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 93d6e253af..e9e3152a8d 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -41,7 +41,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
@@ -90,6 +89,7 @@ import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.InternalTransaction;
@@ -440,8 +440,13 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
PendingComparableValuesTracker<Long> storageIndexTracker = new
PendingComparableValuesTracker<>(0L);
PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(testMpPartStorage);
- Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = ()
-> Map.of(pkStorage.get().id(), pkStorage.get());
- StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(partId, partitionDataStorage, indexes, dsCfg);
+
+ StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(
+ partId,
+ partitionDataStorage,
+
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(),
pkStorage.get())),
+ dsCfg
+ );
TopologyAwareRaftGroupServiceFactory
topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
clusterServices.get(assignment),
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 560c0b0543..1e406f54c6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.lang.ErrorGroups;
@@ -222,20 +223,28 @@ public class TableImpl implements Table {
}
/** Returns a supplier of index storage wrapper factories for given
partition. */
- public Supplier<Map<UUID, TableSchemaAwareIndexStorage>>
indexStorageAdapters(int partId) {
- return () -> {
- awaitIndexes();
+ public TableIndexStoragesSupplier indexStorageAdapters(int partId) {
+ return new TableIndexStoragesSupplier() {
+ @Override
+ public Map<UUID, TableSchemaAwareIndexStorage> get() {
+ awaitIndexes();
- List<IndexStorageAdapterFactory> factories = new
ArrayList<>(indexStorageAdapterFactories.values());
+ List<IndexStorageAdapterFactory> factories = new
ArrayList<>(indexStorageAdapterFactories.values());
- Map<UUID, TableSchemaAwareIndexStorage> adapters = new HashMap<>();
+ Map<UUID, TableSchemaAwareIndexStorage> adapters = new
HashMap<>();
- for (IndexStorageAdapterFactory factory : factories) {
- TableSchemaAwareIndexStorage storage = factory.create(partId);
- adapters.put(storage.id(), storage);
+ for (IndexStorageAdapterFactory factory : factories) {
+ TableSchemaAwareIndexStorage storage =
factory.create(partId);
+ adapters.put(storage.id(), storage);
+ }
+
+ return adapters;
}
- return adapters;
+ @Override
+ public void addIndexToWaitIfAbsent(UUID indexId) {
+ addIndexesToWait(List.of(indexId));
+ }
};
}
@@ -385,14 +394,25 @@ public class TableImpl implements Table {
}
/**
- * Adds indexes to wait before inserting data into the table.
+ * Adds indexes to wait, if not already created, before inserting data
into the table.
*
* @param indexIds Indexes Index IDs.
*/
// TODO: IGNITE-19082 Needs to be redone/improved
public void addIndexesToWait(Collection<UUID> indexIds) {
for (UUID indexId : indexIds) {
- indexesToWait.computeIfAbsent(indexId, uuid -> new
CompletableFuture<>());
+ indexesToWait.compute(indexId, (indexId0, awaitIndexFuture) -> {
+ if (awaitIndexFuture != null) {
+ return awaitIndexFuture;
+ }
+
+ if (indexStorageAdapterFactories.containsKey(indexId) &&
indexLockerFactories.containsKey(indexId)) {
+ // Index is already registered and created.
+ return null;
+ }
+
+ return new CompletableFuture<>();
+ });
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index b842038cae..de76d70584 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -28,7 +28,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
-import java.util.function.Supplier;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
@@ -54,7 +53,7 @@ public class StorageUpdateHandler {
/** Partition storage with access to MV data of a partition. */
private final PartitionDataStorage storage;
- private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes;
+ private final TableIndexStoragesSupplier indexes;
/** Last recorded GC low watermark. */
private final AtomicReference<HybridTimestamp> lastRecordedLwm = new
AtomicReference<>();
@@ -73,7 +72,7 @@ public class StorageUpdateHandler {
public StorageUpdateHandler(
int partitionId,
PartitionDataStorage storage,
- Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes,
+ TableIndexStoragesSupplier indexes,
DataStorageConfiguration dsCfg
) {
this.partitionId = partitionId;
@@ -416,4 +415,42 @@ public class StorageUpdateHandler {
public void waitIndexes() {
indexes.get();
}
+
+ /**
+ * Builds an index for all versions of a row.
+ *
+ * <p>Index is expected to exist, skips the tombstones.
+ *
+ * @param indexId Index ID.
+ * @param rowUuids Row uuids.
+ * @param finish Index build completion flag.
+ */
+ public void buildIndex(UUID indexId, List<UUID> rowUuids, boolean finish) {
+ // TODO: IGNITE-19082 Need another way to wait for index creation
+ indexes.addIndexToWaitIfAbsent(indexId);
+
+ TableSchemaAwareIndexStorage index = indexes.get().get(indexId);
+
+ assert index != null : "indexId=" + indexId + ", partitionId=" +
partitionId;
+
+ RowId lastRowId = null;
+
+ for (UUID rowUuid : rowUuids) {
+ lastRowId = new RowId(partitionId, rowUuid);
+
+ try (Cursor<ReadResult> cursor = storage.scanVersions(lastRowId)) {
+ while (cursor.hasNext()) {
+ ReadResult next = cursor.next();
+
+ if (!next.isEmpty()) {
+ index.put(next.binaryRow(), lastRowId);
+ }
+ }
+ }
+ }
+
+ assert lastRowId != null || finish : "indexId=" + indexId + ",
partitionId=" + partitionId;
+
+ index.storage().setNextRowIdToBuild(finish ? null :
lastRowId.increment());
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIndexStoragesSupplier.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIndexStoragesSupplier.java
new file mode 100644
index 0000000000..bd0bb96b49
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIndexStoragesSupplier.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Supplier table index storages.
+ */
+public interface TableIndexStoragesSupplier {
+ /**
+ * Returns indexes by their ID.
+ *
+ * <p>Waits for the primary key index and all other registered indexes to
be created.
+ */
+ Map<UUID, TableSchemaAwareIndexStorage> get();
+
+ /**
+ * Adds index creation waits if it hasn't been created yet.
+ *
+ * @param indexId Index ID.
+ */
+ void addIndexToWaitIfAbsent(UUID indexId);
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 8a2b1990d1..88373fd5e5 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed;
import static
org.apache.ignite.internal.table.distributed.TableMessageGroup.GROUP_TYPE;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
@@ -152,6 +153,9 @@ public interface TableMessageGroup {
/** Message type for {@link UpdateCommand}. */
short UPDATE = 43;
+ /** Message type for {@link BuildIndexCommand}. */
+ short BUILD_INDEX = 44;
+
/** Message type for {@link TablePartitionIdMessage}. */
short TABLE_PARTITION_ID = 61;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java
new file mode 100644
index 0000000000..dd0527fa4a
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/BuildIndexCommand.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * State machine command to build a table index.
+ */
+@Transferable(TableMessageGroup.Commands.BUILD_INDEX)
+public interface BuildIndexCommand extends WriteCommand {
+ /**
+ * Returns ID of table partition.
+ */
+ TablePartitionIdMessage tablePartitionId();
+
+ /**
+ * Returns index ID.
+ */
+ UUID indexId();
+
+ /**
+ * Returns row IDs for which to build indexes.
+ */
+ List<UUID> rowIds();
+
+ /**
+ * Returns {@code true} if this batch is the last one.
+ */
+ boolean finish();
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 04c6fb16e8..0954dea8ff 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -48,6 +48,7 @@ import
org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
import
org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
@@ -165,6 +166,8 @@ public class PartitionListener implements RaftGroupListener
{
handleTxCleanupCommand((TxCleanupCommand) command,
commandIndex, commandTerm);
} else if (command instanceof SafeTimeSyncCommand) {
handleSafeTimeSyncCommand((SafeTimeSyncCommand) command,
commandIndex, commandTerm);
+ } else if (command instanceof BuildIndexCommand) {
+ handleBuildIndexCommand((BuildIndexCommand) command,
commandIndex, commandTerm);
} else {
assert false : "Command was not found [cmd=" + command +
']';
}
@@ -422,4 +425,33 @@ public class PartitionListener implements
RaftGroupListener {
public MvPartitionStorage getMvStorage() {
return storage.getStorage();
}
+
+ /**
+ * Handler for the {@link BuildIndexCommand}.
+ *
+ * @param cmd Command.
+ * @param commandIndex RAFT index of the command.
+ * @param commandTerm RAFT term of the command.
+ */
+ void handleBuildIndexCommand(BuildIndexCommand cmd, long commandIndex,
long commandTerm) {
+ // Skips the write command because the storage has already executed it.
+ if (commandIndex <= storage.lastAppliedIndex()) {
+ return;
+ }
+
+ storage.runConsistently(() -> {
+ storageUpdateHandler.buildIndex(cmd.indexId(), cmd.rowIds(),
cmd.finish());
+
+ storage.lastApplied(commandIndex, commandTerm);
+
+ return null;
+ });
+
+ if (cmd.finish()) {
+ LOG.info(
+ "Finish building the index: [tableId={}, partitionId={},
indexId={}]",
+ cmd.tablePartitionId().tableId(),
cmd.tablePartitionId().partitionId(), cmd.indexId()
+ );
+ }
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index 30ffda06d3..fbb24b636c 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -44,6 +44,7 @@ import
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedInde
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
@@ -118,12 +119,16 @@ public abstract class IndexBaseTest extends
BaseMvStoragesTest {
storage = new TestMvPartitionStorage(PARTITION_ID);
- storageUpdateHandler = new StorageUpdateHandler(PARTITION_ID, new
TestPartitionDataStorage(storage),
- () -> Map.of(
- pkIndexId, pkStorage,
- sortedIndexId, sortedIndexStorage,
- hashIndexId, hashIndexStorage
- ),
+ Map<UUID, TableSchemaAwareIndexStorage> indexes = Map.of(
+ pkIndexId, pkStorage,
+ sortedIndexId, sortedIndexStorage,
+ hashIndexId, hashIndexStorage
+ );
+
+ storageUpdateHandler = new StorageUpdateHandler(
+ PARTITION_ID,
+ new TestPartitionDataStorage(storage),
+
DummyInternalTableImpl.createTableIndexStoragesSupplier(indexes),
dsCfg
);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
index 2c3997498a..c782bb327e 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.ignite.distributed.TestPartitionDataStorage;
@@ -41,6 +42,7 @@ import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfig
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -68,7 +70,12 @@ public class PartitionGcOnWriteConcurrentTest {
when(storage.pollForVacuum(any())).thenReturn(null);
- storageUpdateHandler = new StorageUpdateHandler(1, new
TestPartitionDataStorage(storage), Collections::emptyMap, dsCfg);
+ storageUpdateHandler = new StorageUpdateHandler(
+ PARTITION_ID,
+ new TestPartitionDataStorage(storage),
+
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of()),
+ dsCfg
+ );
}
@ParameterizedTest
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
index 62a3006d77..39721d97b1 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
@@ -21,8 +21,8 @@ import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
@@ -58,7 +59,12 @@ public class PartitionGcOnWriteTest extends
BaseMvStoragesTest {
void setUp(@InjectConfiguration("mock.gcOnUpdateBatchSize=" +
GC_BATCH_SIZE) DataStorageConfiguration dsCfg) {
storage = new TestMvPartitionStorage(1);
- storageUpdateHandler = new StorageUpdateHandler(1, new
TestPartitionDataStorage(storage), Collections::emptyMap, dsCfg);
+ storageUpdateHandler = new StorageUpdateHandler(
+ 1,
+ new TestPartitionDataStorage(storage),
+
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of()),
+ dsCfg
+ );
}
@ParameterizedTest
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
new file mode 100644
index 0000000000..69590f7603
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.schema.BinaryRow;
+import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * For {@link StorageUpdateHandler} testing.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class StorageUpdateHandlerTest {
+ private static final int PARTITION_ID = 0;
+
+ @InjectConfiguration
+ private DataStorageConfiguration dataStorageConfig;
+
+ private final HybridClock clock = new HybridClockImpl();
+
+ @Test
+ void testBuildIndex() {
+ PartitionDataStorage partitionStorage =
mock(PartitionDataStorage.class);
+
+ TableSchemaAwareIndexStorage indexStorage = createIndexStorage();
+
+ UUID indexId = UUID.randomUUID();
+
+ TableIndexStoragesSupplier indexes =
mock(TableIndexStoragesSupplier.class);
+
+ when(indexes.get()).thenReturn(Map.of(indexId, indexStorage));
+
+ StorageUpdateHandler storageUpdateHandler =
createStorageUpdateHandler(partitionStorage, indexes);
+
+ RowId rowId0 = new RowId(PARTITION_ID);
+ RowId rowId1 = new RowId(PARTITION_ID);
+
+ List<BinaryRow> rowVersions0 = asList(mock(BinaryRow.class), null);
+ List<BinaryRow> rowVersions1 = asList(mock(BinaryRow.class), null);
+
+ setRowVersions(partitionStorage, Map.of(rowId0.uuid(), rowVersions0,
rowId1.uuid(), rowVersions1));
+
+ storageUpdateHandler.buildIndex(indexId, List.of(rowId0.uuid(),
rowId1.uuid()), false);
+
+ verify(indexStorage).put(rowVersions0.get(0), rowId0);
+ verify(indexStorage, never()).put(rowVersions0.get(1), rowId0);
+
+ verify(indexStorage).put(rowVersions1.get(0), rowId1);
+ verify(indexStorage, never()).put(rowVersions1.get(1), rowId1);
+
+ verify(indexStorage.storage()).setNextRowIdToBuild(rowId1.increment());
+ verify(indexes).addIndexToWaitIfAbsent(indexId);
+
+ // Let's check one more batch - it will be the finishing one.
+ RowId rowId2 = new RowId(PARTITION_ID, UUID.randomUUID());
+
+ List<BinaryRow> rowVersions2 = singletonList(mock(BinaryRow.class));
+
+ setRowVersions(partitionStorage, Map.of(rowId2.uuid(), rowVersions2));
+
+ storageUpdateHandler.buildIndex(indexId, List.of(rowId2.uuid()), true);
+
+ verify(indexStorage).put(rowVersions2.get(0), rowId2);
+
+ verify(indexStorage.storage()).setNextRowIdToBuild(null);
+ verify(indexes, times(2)).addIndexToWaitIfAbsent(indexId);
+ }
+
+ private static TableSchemaAwareIndexStorage createIndexStorage() {
+ TableSchemaAwareIndexStorage indexStorage =
mock(TableSchemaAwareIndexStorage.class);
+
+ IndexStorage storage = mock(IndexStorage.class);
+
+ when(indexStorage.storage()).thenReturn(storage);
+
+ return indexStorage;
+ }
+
+ private StorageUpdateHandler
createStorageUpdateHandler(PartitionDataStorage partitionStorage,
TableIndexStoragesSupplier indexes) {
+ return new StorageUpdateHandler(PARTITION_ID, partitionStorage,
indexes, dataStorageConfig);
+ }
+
+ private void setRowVersions(PartitionDataStorage partitionStorage,
Map<UUID, List<BinaryRow>> rowVersions) {
+ for (Entry<UUID, List<BinaryRow>> entry : rowVersions.entrySet()) {
+ RowId rowId = new RowId(PARTITION_ID, entry.getKey());
+
+ List<ReadResult> readResults = entry.getValue().stream()
+ .map(binaryRow -> ReadResult.createFromCommitted(rowId,
binaryRow, clock.now()))
+ .collect(toList());
+
+
when(partitionStorage.scanVersions(rowId)).thenReturn(Cursor.fromIterable(readResults));
+ }
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index fb847edfbd..216dee1119 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
@@ -50,7 +51,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.function.Supplier;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.TestHybridClock;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -62,7 +62,6 @@ import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.CommittedConfiguration;
-import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.command.HybridTimestampMessage;
import
org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
@@ -85,10 +84,12 @@ import
org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.Timestamp;
@@ -128,9 +129,6 @@ public class PartitionCommandListenerTest {
new Column[]{new Column("value", NativeTypes.INT32, false)}
);
- /** Hybrid clock. */
- private static final HybridClock CLOCK = new HybridClockImpl();
-
/** Table command listener. */
private PartitionListener commandListener;
@@ -157,13 +155,13 @@ public class PartitionCommandListenerTest {
private Path workDir;
/** Factory for command messages. */
- private TableMessagesFactory msgFactory = new TableMessagesFactory();
+ private final TableMessagesFactory msgFactory = new TableMessagesFactory();
/** Factory for replica messages. */
- private ReplicaMessagesFactory replicaMessagesFactory = new
ReplicaMessagesFactory();
+ private final ReplicaMessagesFactory replicaMessagesFactory = new
ReplicaMessagesFactory();
/** Hybrid clock. */
- private HybridClock hybridClock;
+ private final HybridClock hybridClock = new HybridClockImpl();
/** Safe time tracker. */
private PendingComparableValuesTracker<HybridTimestamp> safeTimeTracker;
@@ -173,6 +171,8 @@ public class PartitionCommandListenerTest {
private final RaftGroupConfigurationConverter
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
+ private StorageUpdateHandler storageUpdateHandler;
+
/**
* Initializes a table listener before tests.
*/
@@ -184,15 +184,14 @@ public class PartitionCommandListenerTest {
when(clusterService.topologyService().localMember().address()).thenReturn(addr);
- ReplicaService replicaService = mock(ReplicaService.class,
RETURNS_DEEP_STUBS);
-
- hybridClock = new HybridClockImpl();
-
safeTimeTracker = new PendingComparableValuesTracker<>(new
HybridTimestamp(1, 0));
- Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () ->
Map.of(pkStorage.id(), pkStorage);
-
- StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(0, partitionDataStorage, indexes, dsCfg);
+ storageUpdateHandler = spy(new StorageUpdateHandler(
+ PARTITION_ID,
+ partitionDataStorage,
+
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.id(),
pkStorage)),
+ dsCfg
+ ));
commandListener = new PartitionListener(
partitionDataStorage,
@@ -281,9 +280,12 @@ public class PartitionCommandListenerTest {
public void
testOnSnapshotSavePropagateLastAppliedIndexAndTerm(@InjectConfiguration
DataStorageConfiguration dsCfg) {
TestPartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(mvPartitionStorage);
- Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () ->
Map.of(pkStorage.id(), pkStorage);
-
- StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(PARTITION_ID, partitionDataStorage, indexes, dsCfg);
+ StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler(
+ PARTITION_ID,
+ partitionDataStorage,
+
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.id(),
pkStorage)),
+ dsCfg
+ );
PartitionListener testCommandListener = new PartitionListener(
partitionDataStorage,
@@ -462,6 +464,49 @@ public class PartitionCommandListenerTest {
applySafeTimeCommand(SafeTimeSyncCommand.class, testClock.now());
}
+ @Test
+ void testBuildIndexCommand() {
+ UUID indexId = UUID.randomUUID();
+
+ doNothing().when(storageUpdateHandler).buildIndex(eq(indexId),
any(List.class), anyBoolean());
+
+ List<UUID> rowUuids0 = List.of(UUID.randomUUID());
+ List<UUID> rowUuids1 = List.of(UUID.randomUUID());
+ List<UUID> rowUuids2 = List.of(UUID.randomUUID());
+
+ InOrder inOrder = inOrder(partitionDataStorage, storageUpdateHandler);
+
+
commandListener.handleBuildIndexCommand(createBuildIndexCommand(indexId,
rowUuids0, false), 10, 1);
+
+ inOrder.verify(storageUpdateHandler).buildIndex(indexId, rowUuids0,
false);
+ inOrder.verify(partitionDataStorage).lastApplied(10, 1);
+
+
commandListener.handleBuildIndexCommand(createBuildIndexCommand(indexId,
rowUuids1, true), 20, 2);
+
+ inOrder.verify(storageUpdateHandler).buildIndex(indexId, rowUuids1,
true);
+ inOrder.verify(partitionDataStorage).lastApplied(20, 2);
+
+ // Let's check that the command with a lower commandIndex than in the
storage will not be executed.
+
commandListener.handleBuildIndexCommand(createBuildIndexCommand(indexId,
rowUuids2, false), 5, 1);
+
+ inOrder.verify(storageUpdateHandler, never()).buildIndex(indexId,
rowUuids2, false);
+ inOrder.verify(partitionDataStorage, never()).lastApplied(5, 1);
+ }
+
+ private BuildIndexCommand createBuildIndexCommand(UUID indexId, List<UUID>
rowUuids, boolean finish) {
+ return msgFactory.buildIndexCommand()
+ .tablePartitionId(
+ msgFactory.tablePartitionIdMessage()
+ .tableId(UUID.randomUUID())
+ .partitionId(PARTITION_ID)
+ .build()
+ )
+ .indexId(indexId)
+ .rowIds(rowUuids)
+ .finish(finish)
+ .build();
+ }
+
private void applySafeTimeCommand(Class<? extends
SafeTimePropagatingCommand> cls, HybridTimestamp timestamp) {
HybridTimestampMessage safeTime = hybridTimestamp(timestamp);
@@ -572,7 +617,7 @@ public class PartitionCommandListenerTest {
rows.put(Timestamp.nextVersion().toUuid(), row.byteBuffer());
}
- HybridTimestamp commitTimestamp = CLOCK.now();
+ HybridTimestamp commitTimestamp = hybridClock.now();
invokeBatchedCommand(msgFactory.updateAllCommand()
.tablePartitionId(
@@ -608,7 +653,7 @@ public class PartitionCommandListenerTest {
rows.put(readRow(row).uuid(), row.byteBuffer());
}
- HybridTimestamp commitTimestamp = CLOCK.now();
+ HybridTimestamp commitTimestamp = hybridClock.now();
invokeBatchedCommand(msgFactory.updateAllCommand()
.tablePartitionId(
@@ -642,7 +687,7 @@ public class PartitionCommandListenerTest {
keyRows.put(readRow(row).uuid(), null);
}
- HybridTimestamp commitTimestamp = CLOCK.now();
+ HybridTimestamp commitTimestamp = hybridClock.now();
invokeBatchedCommand(msgFactory.updateAllCommand()
.tablePartitionId(
@@ -699,7 +744,7 @@ public class PartitionCommandListenerTest {
}).when(clo).result(any());
}));
- HybridTimestamp commitTimestamp = CLOCK.now();
+ HybridTimestamp commitTimestamp = hybridClock.now();
txIds.forEach(txId ->
invokeBatchedCommand(msgFactory.txCleanupCommand()
.txId(txId)
@@ -743,7 +788,7 @@ public class PartitionCommandListenerTest {
}).when(clo).result(any());
}));
- HybridTimestamp commitTimestamp = CLOCK.now();
+ HybridTimestamp commitTimestamp = hybridClock.now();
txIds.forEach(txId ->
invokeBatchedCommand(msgFactory.txCleanupCommand()
.txId(txId)
@@ -818,7 +863,7 @@ public class PartitionCommandListenerTest {
}).when(clo).result(any());
}));
- HybridTimestamp now = CLOCK.now();
+ HybridTimestamp now = hybridClock.now();
txIds.forEach(txId -> invokeBatchedCommand(
msgFactory.txCleanupCommand()
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 8cde444fe6..99f7a34063 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -69,6 +69,7 @@ import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.Lock;
@@ -189,7 +190,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
new StorageUpdateHandler(
PART_ID,
new
TestPartitionDataStorage(TEST_MV_PARTITION_STORAGE),
- () -> Map.of(pkStorage.get().id(), pkStorage.get()),
+
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(),
pkStorage.get())),
dsCfg
),
peer -> true,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 837307f0b9..ec6b5bc7f2 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -103,6 +103,7 @@ import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaL
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
+import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -336,7 +337,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
new StorageUpdateHandler(
partId,
partitionDataStorage,
- () -> Map.of(pkStorage.get().id(), pkStorage.get()),
+
DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(),
pkStorage.get())),
dsCfg
),
peer -> localNode.name().equals(peer.consistentId()),
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index ac641836e7..0d607f4f65 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.impl;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
@@ -29,7 +30,6 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
-import java.util.function.Supplier;
import javax.naming.OperationNotSupportedException;
import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.distributed.TestPartitionDataStorage;
@@ -62,6 +62,7 @@ import
org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
@@ -263,7 +264,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
PendingComparableValuesTracker<HybridTimestamp> safeTime = new
PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(mvPartStorage);
- Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () ->
Map.of(pkStorage.get().id(), pkStorage.get());
+ TableIndexStoragesSupplier indexes =
createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()));
DataStorageConfiguration dsCfg = mock(DataStorageConfiguration.class);
ConfigurationValue<Integer> gcBatchSizeValue =
mock(ConfigurationValue.class);
@@ -398,4 +399,23 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
safeTimeUpdaterThread = null;
}
}
+
+ /**
+ * Returns dummy table index storages supplier.
+ *
+ * @param indexes Index storage by ID.
+ */
+ public static TableIndexStoragesSupplier
createTableIndexStoragesSupplier(Map<UUID, TableSchemaAwareIndexStorage>
indexes) {
+ return new TableIndexStoragesSupplier() {
+ @Override
+ public Map<UUID, TableSchemaAwareIndexStorage> get() {
+ return indexes;
+ }
+
+ @Override
+ public void addIndexToWaitIfAbsent(UUID indexId) {
+ fail("not supported");
+ }
+ };
+ }
}