This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1431f13855 IGNITE-19177 Add assignments checks when building indexes
(#2030)
1431f13855 is described below
commit 1431f1385563a9e0c4441c80b491995eb4a9a714
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon May 22 12:34:03 2023 +0300
IGNITE-19177 Add assignments checks when building indexes (#2030)
---
.../apache/ignite/internal/index/IndexBuilder.java | 379 ---------------------
.../apache/ignite/internal/index/IndexManager.java | 32 +-
.../ignite/internal/index/IndexManagerTest.java | 4 +-
.../apache/ignite/internal/replicator/Replica.java | 9 +
.../ignite/internal/replicator/ReplicaManager.java | 15 +-
.../replicator/listener/ReplicaListener.java | 18 +
.../runner/app/ItIgniteNodeRestartTest.java | 2 +-
.../sql/engine/ClusterPerClassIntegrationTest.java | 42 +--
.../internal/sql/engine/ItBuildIndexTest.java | 22 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
.../sql/engine/exec/MockedStructuresTest.java | 2 +-
.../internal/storage/index/IndexDescriptor.java | 22 ++
.../distributed/ItTxDistributedTestSingleNode.java | 27 +-
.../table/distributed/StorageUpdateHandler.java | 8 +
.../internal/table/distributed/TableManager.java | 31 +-
.../table/distributed/index/IndexBuildTask.java | 225 ++++++++++++
.../table/distributed/index/IndexBuildTaskId.java | 77 +++++
.../table/distributed/index/IndexBuilder.java | 186 ++++++++++
.../raft/snapshot/PartitionAccessImpl.java | 6 +
.../replicator/PartitionReplicaListener.java | 235 ++++++++++---
.../PartitionReplicaListenerIndexLockingTest.java | 17 +-
.../replication/PartitionReplicaListenerTest.java | 16 +-
.../table/impl/DummyInternalTableImpl.java | 7 +-
23 files changed, 872 insertions(+), 512 deletions(-)
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
deleted file mode 100644
index 3a20d6d627..0000000000
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.index;
-
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.stream.Collectors.toList;
-import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Supplier;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
-import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
-import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.index.IndexDescriptor;
-import org.apache.ignite.internal.storage.index.IndexStorage;
-import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
-import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.network.ClusterService;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Class for managing the index building process.
- */
-class IndexBuilder {
- private static final IgniteLogger LOG =
Loggers.forClass(IndexBuilder.class);
-
- /** Batch size of row IDs to build the index. */
- private static final int BUILD_INDEX_ROW_ID_BATCH_SIZE = 100;
-
- /** Message factory to create messages - RAFT commands. */
- private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new
TableMessagesFactory();
-
- /** Busy lock to stop synchronously. */
- private final IgniteSpinBusyLock busyLock;
-
- /** Cluster service. */
- private final ClusterService clusterService;
-
- /** Index building executor. */
- private final ExecutorService buildIndexExecutor;
-
- /** Tasks of building indexes by their ID. */
- private final Map<BuildIndexTaskId, BuildIndexTask> buildIndexTaskById =
new ConcurrentHashMap<>();
-
- IndexBuilder(String nodeName, IgniteSpinBusyLock busyLock, ClusterService
clusterService) {
- this.busyLock = busyLock;
- this.clusterService = clusterService;
-
- int cpus = Runtime.getRuntime().availableProcessors();
-
- buildIndexExecutor = new ThreadPoolExecutor(
- cpus,
- cpus,
- 30,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
- NamedThreadFactory.create(nodeName, "build-index", LOG)
- );
- }
-
- /**
- * Stops the index builder.
- */
- void stop() {
- shutdownAndAwaitTermination(buildIndexExecutor, 10, TimeUnit.SECONDS);
- }
-
- /**
- * Initializes the build of the index.
- */
- void startIndexBuild(TableIndexView tableIndexView, TableImpl table,
IndexDescriptor indexDescriptor) {
- for (int partitionId = 0; partitionId <
table.internalTable().partitions(); partitionId++) {
- // TODO: IGNITE-19112 We only need to create the index store once
- table.internalTable().storage().getOrCreateIndex(partitionId,
indexDescriptor);
-
- // TODO: IGNITE-19177 Add assignments check
- buildIndexTaskById.compute(
- new BuildIndexTaskId(table.tableId(), tableIndexView.id(),
partitionId),
- (buildIndexTaskId, previousBuildIndexTask) -> {
- assert previousBuildIndexTask == null :
buildIndexTaskId;
-
- BuildIndexTask buildIndexTask = new
BuildIndexTask(table, tableIndexView, buildIndexTaskId.getPartitionId(), null);
-
- buildIndexExecutor.submit(buildIndexTask);
-
- return buildIndexTask;
- });
- }
- }
-
- /**
- * Stops the build of the index.
- */
- void stopIndexBuild(TableIndexView tableIndexView, TableImpl table) {
- for (int partitionId = 0; partitionId <
table.internalTable().partitions(); partitionId++) {
- BuildIndexTask buildIndexTask = buildIndexTaskById.remove(
- new BuildIndexTaskId(table.tableId(), tableIndexView.id(),
partitionId)
- );
-
- if (buildIndexTask != null) {
- buildIndexTask.stop();
- }
- }
- }
-
- /**
- * Task of building a table index for a partition.
- *
- * <p>Only the leader of the raft group will manage the building of the
index. Leader sends batches of row IDs via
- * {@link BuildIndexCommand}, the next batch will only be send after the
previous batch has been processed.
- *
- * <p>Index building itself occurs locally on each node of the raft group
when processing {@link BuildIndexCommand}. This ensures that
- * the index build process in the raft group is consistent and that the
index build process is restored after restarting the raft group
- * (not from the beginning).
- */
- private class BuildIndexTask implements Runnable {
- private final TableImpl table;
-
- private final TableIndexView tableIndexView;
-
- private final int partitionId;
-
- /**
- * ID of the next row to build the index from the previous batch,
{@code null} if it is the first row after the index was created
- * (both on a live node and after a restore).
- */
- private final @Nullable RowId nextRowIdToBuildFromPreviousBatch;
-
- /** Busy lock to stop task synchronously. */
- private final IgniteSpinBusyLock taskBusyLock = new
IgniteSpinBusyLock();
-
- /** Prevents double stopping of the task. */
- private final AtomicBoolean taskStopGuard = new AtomicBoolean();
-
- private BuildIndexTask(
- TableImpl table,
- TableIndexView tableIndexView,
- int partitionId,
- @Nullable RowId nextRowIdToBuildFromPreviousBatch
- ) {
- this.table = table;
- this.tableIndexView = tableIndexView;
- this.partitionId = partitionId;
- this.nextRowIdToBuildFromPreviousBatch =
nextRowIdToBuildFromPreviousBatch;
- }
-
- @Override
- public void run() {
- completedFuture(null)
- .thenCompose(unused -> inBusyLock(() -> {
- // At the time of index creation, table and raft
services of all partitions should be already started, so there
- // should be no errors.
- RaftGroupService raftGroupService =
table.internalTable().partitionRaftGroupService(partitionId);
-
- return raftGroupService
- // Now we do not check the assignment of a
node to a partition on purpose, so as not to fight races on the
- // rebalance, we just wait for the leader of
the raft group of the partition, which has had a recovery,
- // rebalancing (if needed).
- .refreshAndGetLeaderWithTerm()
- .thenComposeAsync(leaderWithTerm ->
inBusyLock(() -> {
- // At this point, we have a stable
topology, each node of which has already applied all local updates.
- if
(!localNodeConsistentId().equals(leaderWithTerm.leader().consistentId())) {
- // TODO: IGNITE-19053 Must handle the
change of leader
- // TODO: IGNITE-19053 Add a test to
change the leader even at the start of the task
- removeTask();
-
- return completedFuture(null);
- }
-
- RowId nextRowIdToBuild =
nextRowIdToBuild();
-
- if (nextRowIdToBuild == null) {
- // Index has already been built.
- removeTask();
-
- return completedFuture(null);
- }
-
- List<RowId> batchRowIds =
collectRowIdBatch(nextRowIdToBuild);
-
- RowId nextRowId =
getNextRowIdForNextBatch(batchRowIds);
-
- boolean finish = batchRowIds.size() <
BUILD_INDEX_ROW_ID_BATCH_SIZE || nextRowId == null;
-
- // TODO: IGNITE-19053 Must handle the
change of leader
- return
raftGroupService.run(createBuildIndexCommand(batchRowIds, finish))
- .thenRun(() -> inBusyLock(() -> {
-
scheduleNextBuildIndexTaskIfNeeded(nextRowId, finish);
-
- return completedFuture(null);
- }));
- }), buildIndexExecutor);
- })).whenComplete((unused, throwable) -> {
- if (throwable != null) {
- removeTask();
-
- LOG.error("Index build error: [{}]", throwable,
createCommonTableIndexInfo());
- }
- });
- }
-
- private void scheduleNextBuildIndexTaskIfNeeded(@Nullable RowId
nextRowId, boolean finish) {
- buildIndexTaskById.compute(
- new BuildIndexTaskId(table.tableId(), tableIndexView.id(),
partitionId),
- (buildIndexTaskId, previousBuildIndexTask) -> {
- if (previousBuildIndexTask == null || finish) {
- // Index build task is in the process of stopping
or has completed.
- return null;
- }
-
- assert nextRowId != null :
createCommonTableIndexInfo();
-
- BuildIndexTask buildIndexTask = new BuildIndexTask(
- table,
- tableIndexView,
- partitionId,
- nextRowId
- );
-
- buildIndexExecutor.submit(buildIndexTask);
-
- return buildIndexTask;
- });
- }
-
- private boolean isLocalNodeLeader(RaftGroupService raftGroupService) {
- Peer leader = raftGroupService.leader();
-
- assert leader != null : "tableId=" + table.tableId() + ",
partitionId=" + partitionId;
-
- return localNodeConsistentId().equals(leader.consistentId());
- }
-
- private List<RowId> createBatchRowIds(RowId lastBuiltRowId, int
batchSize) {
- MvPartitionStorage mvPartition =
table.internalTable().storage().getMvPartition(partitionId);
-
- assert mvPartition != null : createCommonTableIndexInfo();
-
- List<RowId> batch = new ArrayList<>(batchSize);
-
- for (int i = 0; i < batchSize && lastBuiltRowId != null; i++) {
- lastBuiltRowId = mvPartition.closestRowId(lastBuiltRowId);
-
- if (lastBuiltRowId == null) {
- break;
- }
-
- batch.add(lastBuiltRowId);
-
- lastBuiltRowId = lastBuiltRowId.increment();
- }
-
- return batch;
- }
-
- private BuildIndexCommand createBuildIndexCommand(List<RowId> rowIds,
boolean finish) {
- return TABLE_MESSAGES_FACTORY.buildIndexCommand()
-
.tablePartitionId(TABLE_MESSAGES_FACTORY.tablePartitionIdMessage()
- .tableId(table.tableId())
- .partitionId(partitionId)
- .build()
- )
- .indexId(tableIndexView.id())
- .rowIds(rowIds.stream().map(RowId::uuid).collect(toList()))
- .finish(finish)
- .build();
- }
-
- private String createCommonTableIndexInfo() {
- return "table=" + table.name() + ", tableId=" + table.tableId()
- + ", partitionId=" + partitionId
- + ", index=" + tableIndexView.name() + ", indexId=" +
tableIndexView.id();
- }
-
- private String localNodeConsistentId() {
- return clusterService.topologyService().localMember().name();
- }
-
- private @Nullable RowId getNextRowIdForNextBatch(List<RowId> batch) {
- return batch.isEmpty() ? null : batch.get(batch.size() -
1).increment();
- }
-
- private @Nullable RowId nextRowIdToBuild() {
- if (nextRowIdToBuildFromPreviousBatch != null) {
- return nextRowIdToBuildFromPreviousBatch;
- }
-
- IndexStorage index =
table.internalTable().storage().getIndex(partitionId, tableIndexView.id());
-
- assert index != null : createCommonTableIndexInfo();
-
- return index.getNextRowIdToBuild();
- }
-
- private List<RowId> collectRowIdBatch(RowId nextRowIdToBuild) {
- if (nextRowIdToBuildFromPreviousBatch == null) {
- LOG.info("Start building the index: [{}]",
createCommonTableIndexInfo());
- }
-
- return createBatchRowIds(nextRowIdToBuild,
BUILD_INDEX_ROW_ID_BATCH_SIZE);
- }
-
- private boolean enterBusy() {
- if (!taskBusyLock.enterBusy()) {
- return false;
- }
-
- if (!busyLock.enterBusy()) {
- taskBusyLock.leaveBusy();
-
- return false;
- }
-
- return true;
- }
-
- private void leaveBusy() {
- taskBusyLock.leaveBusy();
- busyLock.leaveBusy();
- }
-
- private <V> CompletableFuture<V>
inBusyLock(Supplier<CompletableFuture<V>> supplier) {
- if (!enterBusy()) {
- removeTask();
-
- return completedFuture(null);
- }
-
- try {
- return supplier.get();
- } finally {
- leaveBusy();
- }
- }
-
- private void stop() {
- if (!taskStopGuard.compareAndSet(false, true)) {
- return;
- }
-
- taskBusyLock.block();
- }
-
- private void removeTask() {
- buildIndexTaskById.remove(new BuildIndexTaskId(table.tableId(),
tableIndexView.id(), partitionId));
- }
- }
-}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index f6182ee465..f17c37faa2 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.storage.index.IndexDescriptor.createIndexDescriptor;
import static org.apache.ignite.internal.util.ArrayUtils.STRING_EMPTY_ARRAY;
import java.util.ArrayList;
@@ -70,7 +71,6 @@ import org.apache.ignite.lang.IndexAlreadyExistsException;
import org.apache.ignite.lang.IndexNotFoundException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.TableNotFoundException;
-import org.apache.ignite.network.ClusterService;
import org.jetbrains.annotations.NotNull;
/**
@@ -96,29 +96,21 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
/** Prevents double stopping of the component. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
- /** Index builder. */
- private final IndexBuilder indexBuilder;
-
/**
* Constructor.
*
- * @param nodeName Node name.
* @param tablesCfg Tables and indexes configuration.
* @param schemaManager Schema manager.
* @param tableManager Table manager.
- * @param clusterService Cluster service.
*/
public IndexManager(
- String nodeName,
TablesConfiguration tablesCfg,
SchemaManager schemaManager,
- TableManager tableManager,
- ClusterService clusterService
+ TableManager tableManager
) {
this.tablesCfg = Objects.requireNonNull(tablesCfg, "tablesCfg");
this.schemaManager = Objects.requireNonNull(schemaManager,
"schemaManager");
this.tableManager = tableManager;
- this.indexBuilder = new IndexBuilder(nodeName, busyLock,
clusterService);
}
/** {@inheritDoc} */
@@ -175,8 +167,6 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
busyLock.block();
- indexBuilder.stop();
-
LOG.info("Index manager stopped");
}
@@ -400,8 +390,6 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
CompletableFuture<?> dropIndexFuture =
tableManager.tableAsync(causalityToken, tableId)
.thenAccept(table -> {
if (table != null) { // in case of DROP TABLE the
table will be removed first
- indexBuilder.stopIndexBuild(tableIndexView, table);
-
table.unregisterIndex(idxId);
}
});
@@ -454,7 +442,7 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
IndexDescriptor descriptor = newDescriptor(tableIndexView);
- org.apache.ignite.internal.storage.index.IndexDescriptor
storageIndexDescriptor = createStorageIndexDescriptor(tablesView, indexId);
+ org.apache.ignite.internal.storage.index.IndexDescriptor
storageIndexDescriptor = createIndexDescriptor(tablesView, indexId);
CompletableFuture<?> fireEventFuture =
fireEvent(IndexEvent.CREATE, new
IndexEventParameters(causalityToken, tableId, indexId, descriptor));
@@ -485,8 +473,6 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
table.pkId(indexId);
}
}
-
- indexBuilder.startIndexBuild(tableIndexView, table,
storageIndexDescriptor);
});
return allOf(createIndexFuture, fireEventFuture);
@@ -641,16 +627,4 @@ public class IndexManager extends Producer<IndexEvent,
IndexEventParameters> imp
return failedFuture(new IllegalStateException("Should not be
called"));
}
}
-
- private org.apache.ignite.internal.storage.index.IndexDescriptor
createStorageIndexDescriptor(TablesView tablesView, UUID indexId) {
- TableIndexView indexView = tablesView.indexes().get(indexId);
-
- if (indexView instanceof HashIndexView) {
- return new HashIndexDescriptor(indexId, tablesView);
- } else if (indexView instanceof SortedIndexView) {
- return new
org.apache.ignite.internal.storage.index.SortedIndexDescriptor(indexId,
tablesView);
- }
-
- throw new AssertionError("Unknown index type [type=" + (indexView !=
null ? indexView.getClass() : null) + ']');
- }
}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index b01675ec75..06564a898c 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -54,7 +54,6 @@ import
org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IndexNotFoundException;
-import org.apache.ignite.network.ClusterService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -93,8 +92,7 @@ public class IndexManagerTest {
when(schManager.schemaRegistry(anyLong(),
any())).thenReturn(completedFuture(null));
- indexManager = new IndexManager("test", tablesConfig, schManager,
tableManagerMock,
- mock(ClusterService.class));
+ indexManager = new IndexManager(tablesConfig, schManager,
tableManagerMock);
indexManager.start();
assertThat(
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index a7f0edc913..0bd0cb34e7 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -145,6 +145,8 @@ public class Replica {
if (!leaderFuture.isDone()) {
leaderFuture.complete(leaderRef);
}
+
+ listener.onBecomePrimary(clusterNode);
}
private CompletableFuture<ClusterNode> leaderFuture() {
@@ -267,4 +269,11 @@ public class Replica {
return leased != null ? leased.consistentId() : localNode.name();
}
+
+ /**
+ * Shutdowns the replica.
+ */
+ public void shutdown() {
+ listener.onShutdown();
+ }
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index fafe11e154..9582756359 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -382,8 +382,21 @@ public class ReplicaManager implements IgniteComponent {
* @param replicaGrpId Replication group id.
* @return True if the replica is found and closed, false otherwise.
*/
+ // TODO: IGNITE-19494 We need to correctly stop the replica
private boolean stopReplicaInternal(ReplicationGroupId replicaGrpId) {
- return replicas.remove(replicaGrpId) != null;
+ CompletableFuture<Replica> removed = replicas.remove(replicaGrpId);
+
+ if (removed != null) {
+ removed.whenComplete((replica, throwable) -> {
+ if (throwable != null) {
+ replica.shutdown();
+ }
+ });
+
+ return true;
+ }
+
+ return false;
}
/** {@inheritDoc} */
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
index c06515a943..f98b8cf900 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.replicator.listener;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.network.ClusterNode;
/**
* Replica listener.
*/
+@FunctionalInterface
public interface ReplicaListener {
/**
* Invokes a replica listener to process request.
@@ -31,4 +33,20 @@ public interface ReplicaListener {
* @return Listener response.
*/
CompletableFuture<?> invoke(ReplicaRequest request);
+
+ /**
+ * Callback on becoming the primary replica.
+ *
+ * @param clusterNode Primary replica node.
+ */
+ default void onBecomePrimary(ClusterNode clusterNode) {
+ // No-op.
+ }
+
+ /**
+ * Callback on replica shutdown.
+ */
+ default void onShutdown() {
+ // No-op.
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index a21247c5ac..3d2561e0c3 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -406,7 +406,7 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
null
);
- var indexManager = new IndexManager(name, tablesConfiguration,
schemaManager, tableManager, clusterSvc);
+ var indexManager = new IndexManager(tablesConfiguration,
schemaManager, tableManager);
SqlQueryProcessor qryEngine = new SqlQueryProcessor(
registry,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index f0c2dbba6d..7f641fdd75 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
+import static
org.apache.ignite.internal.storage.index.IndexDescriptor.createIndexDescriptor;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -30,13 +31,14 @@ import static
org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@@ -52,18 +54,12 @@ import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.schema.configuration.TablesView;
-import org.apache.ignite.internal.schema.configuration.index.HashIndexView;
-import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
import
org.apache.ignite.internal.schema.configuration.index.TableIndexConfiguration;
-import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.sql.engine.util.QueryChecker;
import org.apache.ignite.internal.sql.engine.util.TestQueryProcessor;
-import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
-import org.apache.ignite.internal.storage.index.IndexDescriptor;
import org.apache.ignite.internal.storage.index.IndexStorage;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -487,9 +483,12 @@ public abstract class ClusterPerClassIntegrationTest
extends IgniteIntegrationTe
*
* @param tableName Table name.
* @param indexName Index name.
+ * @return Nodes on which the partition index was built.
* @throws Exception If failed.
*/
- public static void waitForIndexBuild(String tableName, String indexName)
throws Exception {
+ protected static Map<Integer, List<Ignite>> waitForIndexBuild(String
tableName, String indexName) throws Exception {
+ Map<Integer, List<Ignite>> partitionIdToNodes = new HashMap<>();
+
// TODO: IGNITE-18733 We are waiting for the synchronization of schemes
for (Ignite clusterNode : CLUSTER_NODES) {
CompletableFuture<Table> tableFuture =
clusterNode.tables().tableAsync(tableName);
@@ -515,14 +514,19 @@ public abstract class ClusterPerClassIntegrationTest
extends IgniteIntegrationTe
continue;
}
- IndexStorage index = internalTable.storage().getOrCreateIndex(
- partitionId,
-
createIndexDescription(getTablesConfiguration(clusterNode).value(), indexName)
- );
+ TablesView tablesView =
getTablesConfiguration(clusterNode).value();
+
+ UUID indexId =
tablesView.indexes().get(indexName.toUpperCase()).id();
+
+ IndexStorage index =
internalTable.storage().getOrCreateIndex(partitionId,
createIndexDescriptor(tablesView, indexId));
assertTrue(waitForCondition(() -> index.getNextRowIdToBuild()
== null, 10, TimeUnit.SECONDS.toMillis(10)));
+
+ partitionIdToNodes.computeIfAbsent(partitionId, p -> new
ArrayList<>()).add(clusterNode);
}
}
+
+ return partitionIdToNodes;
}
/**
@@ -533,18 +537,4 @@ public abstract class ClusterPerClassIntegrationTest
extends IgniteIntegrationTe
public static TablesConfiguration getTablesConfiguration(Ignite node) {
return ((IgniteImpl)
node).clusterConfiguration().getConfiguration(TablesConfiguration.KEY);
}
-
- private static IndexDescriptor createIndexDescription(TablesView
tablesView, String indexName) {
- TableIndexView indexView =
tablesView.indexes().get(indexName.toUpperCase());
-
- assertNotNull(indexView, indexName);
-
- if (indexView instanceof HashIndexView) {
- return new HashIndexDescriptor(indexView.id(), tablesView);
- } else if (indexView instanceof SortedIndexView) {
- return new SortedIndexDescriptor(indexView.id(), tablesView);
- }
-
- return fail(indexView.getClass().getName());
- }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
index 84cdc9ef7d..e0f2e4ea32 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java
@@ -19,9 +19,13 @@ package org.apache.ignite.internal.sql.engine;
import static java.util.stream.Collectors.joining;
import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -41,13 +45,16 @@ public class ItBuildIndexTest extends
ClusterPerClassIntegrationTest {
@AfterEach
void tearDown() {
sql("DROP TABLE IF EXISTS " + TABLE_NAME);
+ sql("DROP ZONE IF EXISTS " + ZONE_NAME);
}
@ParameterizedTest(name = "replicas : {0}")
@MethodSource("replicas")
void testBuildIndexOnStableTopology(int replicas) throws Exception {
+ int partitions = 2;
+
sql(IgniteStringFormatter.format("CREATE ZONE IF NOT EXISTS {} WITH
REPLICAS={}, PARTITIONS={}",
- ZONE_NAME, replicas, 2
+ ZONE_NAME, replicas, partitions
));
sql(IgniteStringFormatter.format(
@@ -63,7 +70,18 @@ public class ItBuildIndexTest extends
ClusterPerClassIntegrationTest {
sql(IgniteStringFormatter.format("CREATE INDEX {} ON {} (i1)",
INDEX_NAME, TABLE_NAME));
// TODO: IGNITE-19150 We are waiting for schema synchronization to
avoid races to create and destroy indexes
- waitForIndexBuild(TABLE_NAME, INDEX_NAME);
+ Map<Integer, List<Ignite>> nodesWithBuiltIndexesByPartitionId =
waitForIndexBuild(TABLE_NAME, INDEX_NAME);
+
+ // Check that the number of nodes with built indexes is equal to the
number of replicas.
+ assertEquals(partitions, nodesWithBuiltIndexesByPartitionId.size());
+
+ for (Entry<Integer, List<Ignite>> entry :
nodesWithBuiltIndexesByPartitionId.entrySet()) {
+ assertEquals(
+ replicas,
+ entry.getValue().size(),
+ IgniteStringFormatter.format("p={}, nodes={}",
entry.getKey(), entry.getValue())
+ );
+ }
assertQuery(IgniteStringFormatter.format("SELECT * FROM {} WHERE i1 >
0", TABLE_NAME))
.matches(containsIndexScan("PUBLIC", TABLE_NAME.toUpperCase(),
INDEX_NAME.toUpperCase()))
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index ad1acdee1a..cabe96008c 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -512,7 +512,7 @@ public class IgniteImpl implements Ignite {
distributionZoneManager
);
- indexManager = new IndexManager(name, tablesConfiguration,
schemaManager, distributedTblMgr, clusterSvc);
+ indexManager = new IndexManager(tablesConfiguration, schemaManager,
distributedTblMgr);
qryEngine = new SqlQueryProcessor(
registry,
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 69335f7410..ff5ae6937b 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -290,7 +290,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
tblManager = mockManagers();
- idxManager = new IndexManager(NODE_NAME, tblsCfg, schemaManager,
tblManager, cs);
+ idxManager = new IndexManager(tblsCfg, schemaManager, tblManager);
idxManager.start();
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexDescriptor.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexDescriptor.java
index 5dd2fb9465..d3a0743f34 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexDescriptor.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexDescriptor.java
@@ -20,6 +20,10 @@ package org.apache.ignite.internal.storage.index;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.configuration.TablesView;
+import org.apache.ignite.internal.schema.configuration.index.HashIndexView;
+import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
+import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
/**
* Index descriptor.
@@ -54,4 +58,22 @@ public interface IndexDescriptor {
* Returns index column descriptions.
*/
List<? extends ColumnDescriptor> columns();
+
+ /**
+ * Creates an index description based on the configuration.
+ *
+ * @param tablesView Tables configuration.
+ * @param indexId Index ID.
+ */
+ static IndexDescriptor createIndexDescriptor(TablesView tablesView, UUID
indexId) {
+ TableIndexView indexView = tablesView.indexes().get(indexId);
+
+ if (indexView instanceof HashIndexView) {
+ return new HashIndexDescriptor(indexId, tablesView);
+ } else if (indexView instanceof SortedIndexView) {
+ return new SortedIndexDescriptor(indexId, tablesView);
+ } else {
+ throw new AssertionError("Unknown type: " + indexView);
+ }
+ }
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index b1b0ac602f..3f2e74c0ff 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -53,6 +53,7 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopolog
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -76,10 +77,12 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TxAbstractTest;
@@ -89,6 +92,7 @@ import
org.apache.ignite.internal.table.distributed.LowWatermark;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
@@ -140,6 +144,12 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
@InjectConfiguration
private static DataStorageConfiguration dsCfg;
+ @InjectConfiguration("mock.tables.foo {}")
+ private static TablesConfiguration tablesConfig;
+
+ @InjectConfiguration
+ private static DistributionZoneConfiguration distributionZoneConfig;
+
private static final IgniteLogger LOG =
Loggers.forClass(ItTxDistributedTestSingleNode.class);
public static final int NODE_PORT_BASE = 20_000;
@@ -423,7 +433,8 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
TablePartitionId grpId = grpIds.get(p);
for (String assignment : partAssignments) {
- var testMpPartStorage = new TestMvPartitionStorage(0);
+ var mvTableStorage = new
TestMvTableStorage(tablesConfig.tables().get("foo"), tablesConfig,
distributionZoneConfig);
+ var mvPartStorage = new TestMvPartitionStorage(0);
var txStateStorage = txStateStorages.get(assignment);
var placementDriver = new
PlacementDriver(replicaServices.get(assignment), consistentIdToNode);
@@ -451,7 +462,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
new
PendingComparableValuesTracker<>(clocks.get(assignment).now());
PendingComparableValuesTracker<Long, Void> storageIndexTracker
= new PendingComparableValuesTracker<>(0L);
- PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(testMpPartStorage);
+ PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(mvPartStorage);
StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(
partId,
@@ -487,9 +498,9 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
DummySchemaManagerImpl schemaManager = new
DummySchemaManagerImpl(schemaDescriptor);
replicaManagers.get(assignment).startReplica(
new TablePartitionId(tblId, partId),
-
CompletableFuture.completedFuture(null),
+ completedFuture(null),
new PartitionReplicaListener(
- testMpPartStorage,
+ mvPartStorage,
raftSvc,
txManagers.get(assignment),
txManagers.get(assignment).lockManager(),
@@ -498,15 +509,17 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
tblId,
() -> Map.of(pkLocker.id(),
pkLocker),
pkStorage,
- () -> Map.of(),
+ Map::of,
clocks.get(assignment),
safeTime,
txStateStorage,
placementDriver,
storageUpdateHandler,
new
DummySchemas(schemaManager),
- peer ->
assignment.equals(peer.consistentId()),
- completedFuture(schemaManager)
+ completedFuture(schemaManager),
+
consistentIdToNode.apply(assignment),
+ mvTableStorage,
+ mock(IndexBuilder.class)
),
raftSvc,
storageIndexTracker
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index fef8bbc481..f76d961aa4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -417,4 +417,12 @@ public class StorageUpdateHandler {
public PendingComparableValuesTracker<HybridTimestamp, Void>
getSafeTimeTracker() {
return safeTimeTracker;
}
+
+ /**
+ * Waits for indexes to be created.
+ */
+ // TODO: IGNITE-19513 Fix it, we should have already waited for the
indexes to be created
+ public void waitIndexes() {
+ indexes.get();
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index f9d97ed766..d35b8da1eb 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -132,6 +132,7 @@ import
org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.gc.MvGc;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
@@ -322,6 +323,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
private final LowWatermark lowWatermark;
+ private final IndexBuilder indexBuilder;
+
/**
* Creates a new table manager.
*
@@ -432,6 +435,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
mvGc = new MvGc(nodeName, tablesCfg);
lowWatermark = new LowWatermark(nodeName, tablesCfg.lowWatermark(),
clock, txManager, vaultManager, mvGc);
+
+ indexBuilder = new IndexBuilder(nodeName, cpus);
}
@Override
@@ -635,7 +640,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
futures[i] = new CompletableFuture<>();
}
- String localMemberName =
clusterService.topologyService().localMember().name();
+ String localMemberName = localNode().name();
// Create new raft nodes according to new assignments.
return tablesByIdVv.update(causalityToken, (tablesById, e) ->
inBusyLock(busyLock, () -> {
@@ -829,8 +834,10 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
placementDriver,
storageUpdateHandler,
new
NonHistoricSchemas(schemaManager),
- this::isLocalPeer,
-
schemaManager.schemaRegistry(causalityToken, tblId)
+
schemaManager.schemaRegistry(causalityToken, tblId),
+ localNode(),
+
table.internalTable().storage(),
+ indexBuilder
),
updatedRaftGroupService,
storageIndexTracker
@@ -854,7 +861,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
private boolean isLocalPeer(Peer peer) {
- return
peer.consistentId().equals(clusterService.topologyService().localMember().name());
+ return peer.consistentId().equals(localNode().name());
}
private PartitionDataStorage partitionDataStorage(MvPartitionStorage
partitionStorage, InternalTable internalTbl, int partId) {
@@ -950,7 +957,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
cleanUpTablesResources(tablesToStop);
try {
- IgniteUtils.closeAllManually(lowWatermark, mvGc);
+ IgniteUtils.closeAllManually(lowWatermark, mvGc, indexBuilder);
} catch (Throwable t) {
LOG.error("Failed to close internal components", t);
}
@@ -2000,7 +2007,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
stableConfiguration.peers().stream().map(Peer::consistentId).collect(toList())
);
- ClusterNode localMember =
clusterService.topologyService().localMember();
+ ClusterNode localMember = localNode();
// Start a new Raft node and Replica if this node has appeared in the
new assignments.
boolean shouldStartLocalServices = pendingAssignments.stream()
@@ -2102,8 +2109,10 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
placementDriver,
storageUpdateHandler,
new
NonHistoricSchemas(schemaManager),
- this::isLocalPeer,
-
completedFuture(schemaManager.schemaRegistry(tblId))
+
completedFuture(schemaManager.schemaRegistry(tblId)),
+ localNode(),
+ internalTable.storage(),
+ indexBuilder
),
(TopologyAwareRaftGroupService)
internalTable.partitionRaftGroupService(partId),
storageIndexTracker
@@ -2316,7 +2325,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
? Set.of()
:
ByteUtils.fromBytes(pendingAssignmentsFromMetaStorage);
- String localMemberName =
clusterService.topologyService().localMember().name();
+ String localMemberName = localNode().name();
boolean shouldStopLocalServices =
Stream.concat(stableAssignments.stream(), pendingAssignments.stream())
.noneMatch(assignment ->
assignment.consistentId().equals(localMemberName));
@@ -2396,4 +2405,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
tracker.close();
}
}
+
+ private ClusterNode localNode() {
+ return clusterService.topologyService().localMember();
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java
new file mode 100644
index 0000000000..4385aace8d
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTask.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.index;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.stream.Collectors.toList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteStringFormatter;
+
+/**
+ * Task of building a table index.
+ */
+class IndexBuildTask {
+ private static final IgniteLogger LOG =
Loggers.forClass(IndexBuildTask.class);
+
+ private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new
TableMessagesFactory();
+
+ private final IndexBuildTaskId taskId;
+
+ private final IndexStorage indexStorage;
+
+ private final MvPartitionStorage partitionStorage;
+
+ private final RaftGroupService raftClient;
+
+ private final ExecutorService executor;
+
+ private final IgniteSpinBusyLock busyLock;
+
+ private final int batchSize;
+
+ private final IgniteSpinBusyLock taskBusyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean taskStopGuard = new AtomicBoolean();
+
+ private final CompletableFuture<Void> taskFuture = new
CompletableFuture<>();
+
+ IndexBuildTask(
+ IndexBuildTaskId taskId,
+ IndexStorage indexStorage,
+ MvPartitionStorage partitionStorage,
+ RaftGroupService raftClient,
+ ExecutorService executor,
+ IgniteSpinBusyLock busyLock,
+ int batchSize
+ ) {
+ this.taskId = taskId;
+ this.indexStorage = indexStorage;
+ this.partitionStorage = partitionStorage;
+ this.raftClient = raftClient;
+ this.executor = executor;
+ this.busyLock = busyLock;
+ this.batchSize = batchSize;
+ }
+
+ /**
+ * Starts building the index.
+ */
+ void start() {
+ if (!enterBusy()) {
+ taskFuture.complete(null);
+
+ return;
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Start building the index: [{}]",
createCommonIndexInfo());
+ }
+
+ try {
+ supplyAsync(this::handleNextBatch, executor)
+ .thenCompose(Function.identity())
+ .whenComplete((unused, throwable) -> {
+ if (throwable != null) {
+ LOG.error("Index build error: [{}]", throwable,
createCommonIndexInfo());
+
+ taskFuture.completeExceptionally(throwable);
+ } else {
+ taskFuture.complete(null);
+ }
+ });
+ } catch (Throwable t) {
+ taskFuture.completeExceptionally(t);
+
+ throw t;
+ } finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * Stops index building.
+ */
+ void stop() {
+ if (!taskStopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ taskBusyLock.block();
+ }
+
+ /**
+ * Returns the index build future.
+ */
+ CompletableFuture<Void> getTaskFuture() {
+ return taskFuture;
+ }
+
+ private CompletableFuture<Void> handleNextBatch() {
+ if (!enterBusy()) {
+ return completedFuture(null);
+ }
+
+ try {
+ List<RowId> batchRowIds = createBatchRowIds();
+
+ return raftClient.run(createBuildIndexCommand(batchRowIds))
+ .thenComposeAsync(unused -> {
+ if (indexStorage.getNextRowIdToBuild() == null) {
+ // Index has been built.
+ return completedFuture(null);
+ }
+
+ return handleNextBatch();
+ }, executor);
+ } catch (Throwable t) {
+ return failedFuture(t);
+ } finally {
+ leaveBusy();
+ }
+ }
+
+ private List<RowId> createBatchRowIds() {
+ RowId nextRowIdToBuild = indexStorage.getNextRowIdToBuild();
+
+ List<RowId> batch = new ArrayList<>(batchSize);
+
+ for (int i = 0; i < batchSize && nextRowIdToBuild != null; i++) {
+ nextRowIdToBuild = partitionStorage.closestRowId(nextRowIdToBuild);
+
+ if (nextRowIdToBuild == null) {
+ break;
+ }
+
+ batch.add(nextRowIdToBuild);
+
+ nextRowIdToBuild = nextRowIdToBuild.increment();
+ }
+
+ return batch;
+ }
+
+ private BuildIndexCommand createBuildIndexCommand(List<RowId> rowIds) {
+ boolean finish = rowIds.size() < batchSize;
+
+ return TABLE_MESSAGES_FACTORY.buildIndexCommand()
+
.tablePartitionId(TABLE_MESSAGES_FACTORY.tablePartitionIdMessage()
+ .tableId(taskId.getTableId())
+ .partitionId(taskId.getPartitionId())
+ .build()
+ )
+ .indexId(taskId.getIndexId())
+ .rowIds(rowIds.stream().map(RowId::uuid).collect(toList()))
+ .finish(finish)
+ .build();
+ }
+
+ private boolean enterBusy() {
+ if (!busyLock.enterBusy()) {
+ return false;
+ }
+
+ if (!taskBusyLock.enterBusy()) {
+ busyLock.leaveBusy();
+
+ return false;
+ }
+
+ return true;
+ }
+
+ private void leaveBusy() {
+ taskBusyLock.leaveBusy();
+ busyLock.leaveBusy();
+ }
+
+ private String createCommonIndexInfo() {
+ return IgniteStringFormatter.format(
+ "tableId={}, partitionId={}, indexId={}",
+ taskId.getTableId(), taskId.getPartitionId(),
taskId.getIndexId()
+ );
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTaskId.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTaskId.java
new file mode 100644
index 0000000000..ca55ed314c
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuildTaskId.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.index;
+
+import java.util.UUID;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * {@link IndexBuildTask} ID.
+ */
+class IndexBuildTaskId {
+ private final UUID tableId;
+
+ private final int partitionId;
+
+ private final UUID indexId;
+
+ IndexBuildTaskId(UUID tableId, int partitionId, UUID indexId) {
+ this.tableId = tableId;
+ this.partitionId = partitionId;
+ this.indexId = indexId;
+ }
+
+ public UUID getTableId() {
+ return tableId;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public UUID getIndexId() {
+ return indexId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ IndexBuildTaskId that = (IndexBuildTaskId) o;
+
+ return partitionId == that.partitionId && tableId.equals(that.tableId)
&& indexId.equals(that.indexId);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = tableId.hashCode();
+ result = 31 * result + partitionId;
+ result = 31 * result + indexId.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(IndexBuildTaskId.class, this);
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
new file mode 100644
index 0000000000..be19759424
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.index;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Class for managing the building of table indexes.
+ */
+public class IndexBuilder implements ManuallyCloseable {
+ private static final IgniteLogger LOG =
Loggers.forClass(IndexBuilder.class);
+
+ private static final int BATCH_SIZE = 100;
+
+ private final ExecutorService executor;
+
+ private final Map<IndexBuildTaskId, IndexBuildTask> indexBuildTaskById =
new ConcurrentHashMap<>();
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ /**
+ * Constructor.
+ *
+ * @param nodeName Node name.
+ * @param threadCount Number of threads to build indexes.
+ */
+ public IndexBuilder(String nodeName, int threadCount) {
+ executor = new ThreadPoolExecutor(
+ threadCount,
+ threadCount,
+ 30,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ NamedThreadFactory.create(nodeName, "build-index", LOG)
+ );
+ }
+
+ /**
+ * Starts building the index if it is not already built or is not yet in
progress.
+ *
+ * <p>Index is built in batches using {@link BuildIndexCommand} (via
raft), batches are sent sequentially.
+ *
+ * <p>It is expected that the index building is triggered by the leader of
the raft group.
+ *
+ * @param tableId Table ID.
+ * @param partitionId Partition ID.
+ * @param indexId Index ID.
+ * @param indexStorage Index storage to build.
+ * @param partitionStorage Multi-versioned partition storage.
+ * @param raftClient Raft client.
+ */
+ // TODO: IGNITE-19498 Perhaps we need to start building the index only once
+ public void startBuildIndex(
+ UUID tableId,
+ int partitionId,
+ UUID indexId,
+ IndexStorage indexStorage,
+ MvPartitionStorage partitionStorage,
+ RaftGroupService raftClient
+ ) {
+ inBusyLock(() -> {
+ if (indexStorage.getNextRowIdToBuild() == null) {
+ return;
+ }
+
+ IndexBuildTaskId taskId = new IndexBuildTaskId(tableId,
partitionId, indexId);
+
+ IndexBuildTask newTask = new IndexBuildTask(taskId, indexStorage,
partitionStorage, raftClient, executor, busyLock, BATCH_SIZE);
+
+ IndexBuildTask previousTask =
indexBuildTaskById.putIfAbsent(taskId, newTask);
+
+ if (previousTask != null) {
+ // Index building is already in progress.
+ return;
+ }
+
+ newTask.start();
+
+ newTask.getTaskFuture().whenComplete((unused, throwable) ->
indexBuildTaskById.remove(taskId));
+ });
+ }
+
+ /**
+ * Stops index building if it is in progress.
+ *
+ * @param tableId Table ID.
+ * @param partitionId Partition ID.
+ * @param indexId Index ID.
+ */
+ public void stopBuildIndex(UUID tableId, int partitionId, UUID indexId) {
+ inBusyLock(() -> {
+ IndexBuildTask removed = indexBuildTaskById.remove(new
IndexBuildTaskId(tableId, partitionId, indexId));
+
+ if (removed != null) {
+ removed.stop();
+ }
+ });
+ }
+
+ /**
+ * Stops building all indexes (for a table partition) if they are in
progress.
+ *
+ * @param tableId Table ID.
+ * @param partitionId Partition ID.
+ */
+ public void stopBuildIndexes(UUID tableId, int partitionId) {
+ for (Iterator<Entry<IndexBuildTaskId, IndexBuildTask>> it =
indexBuildTaskById.entrySet().iterator(); it.hasNext(); ) {
+ if (!busyLock.enterBusy()) {
+ return;
+ }
+
+ try {
+ Entry<IndexBuildTaskId, IndexBuildTask> entry = it.next();
+
+ IndexBuildTaskId taskId = entry.getKey();
+
+ if (tableId.equals(taskId.getTableId()) && partitionId ==
taskId.getPartitionId()) {
+ it.remove();
+
+ entry.getValue().stop();
+ }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ IgniteUtils.shutdownAndAwaitTermination(executor, 10,
TimeUnit.SECONDS);
+ }
+
+ private void inBusyLock(Runnable runnable) {
+ if (!busyLock.enterBusy()) {
+ return;
+ }
+
+ try {
+ runnable.run();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index b170f5efe0..7ff5846779 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -192,6 +192,12 @@ public class PartitionAccessImpl implements
PartitionAccess {
@Override
public CompletableFuture<Void> startRebalance() {
+ // Avoids a race between creating indexes and starting a rebalance.
+ // If an index appears after the rebalance has started, then at the
end of the rebalance it will have a status of RUNNABLE instead
+ // of REBALANCE which will lead to errors.
+ // TODO: IGNITE-19513 Fix it, we should have already waited for the
indexes to be created
+ storageUpdateHandler.waitIndexes();
+
TxStateStorage txStateStorage = getTxStateStorage(partitionId());
return mvGc.removeStorage(toTablePartitionId(partitionKey))
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 725f19008a..eac16412b8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.storage.index.IndexDescriptor.createIndexDescriptor;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.internal.util.IgniteUtils.filter;
import static org.apache.ignite.internal.util.IgniteUtils.findAny;
@@ -46,12 +47,15 @@ import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
-import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
+import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -73,11 +77,15 @@ import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.configuration.TablesView;
+import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
+import org.apache.ignite.internal.storage.index.IndexDescriptor;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.IndexStorage;
@@ -93,6 +101,7 @@ import
org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import
org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
@@ -120,12 +129,14 @@ import
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.CursorUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
@@ -143,18 +154,12 @@ public class PartitionReplicaListener implements
ReplicaListener {
/** Replication group id. */
private final TablePartitionId replicationGroupId;
- /** Partition id. */
- private final int partId;
-
/** Primary key index. */
private final Lazy<TableSchemaAwareIndexStorage> pkIndexStorage;
/** Secondary indices. */
private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>>
secondaryIndexStorages;
- /** Table id. */
- private final UUID tableId;
-
/** Versioned partition storage. */
private final MvPartitionStorage mvDataStorage;
@@ -199,17 +204,30 @@ public class PartitionReplicaListener implements
ReplicaListener {
private final Supplier<Map<UUID, IndexLocker>> indexesLockers;
- /**
- * Function for checking that the given peer is local.
- */
- private final Function<Peer, Boolean> isLocalPeerChecker;
-
private final ConcurrentMap<UUID, TxCleanupReadyFutureList>
txCleanupReadyFutures = new ConcurrentHashMap<>();
private final CompletableFuture<SchemaRegistry> schemaFut;
private final SchemaCompatValidator schemaCompatValidator;
+ /** Instance of the local node. */
+ private final ClusterNode localNode;
+
+ /** Table storage. */
+ private final MvTableStorage mvTableStorage;
+
+ /** Index builder. */
+ private final IndexBuilder indexBuilder;
+
+ /** Listener for configuration indexes, {@code null} if the replica is not
the leader. */
+ private final
AtomicReference<ConfigurationNamedListListener<TableIndexView>>
indexesConfigurationListener = new AtomicReference<>();
+
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Prevents double stopping. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
/**
* The constructor.
*
@@ -227,8 +245,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param txStateStorage Transaction state storage.
* @param placementDriver Placement driver.
* @param storageUpdateHandler Handler that processes updates writing them
to storage.
- * @param isLocalPeerChecker Function for checking that the given peer is
local.
* @param schemaFut Table schema.
+ * @param localNode Instance of the local node.
+ * @param mvTableStorage Table storage.
+ * @param indexBuilder Index builder.
*/
public PartitionReplicaListener(
MvPartitionStorage mvDataStorage,
@@ -247,16 +267,16 @@ public class PartitionReplicaListener implements
ReplicaListener {
PlacementDriver placementDriver,
StorageUpdateHandler storageUpdateHandler,
Schemas schemas,
- Function<Peer, Boolean> isLocalPeerChecker,
- CompletableFuture<SchemaRegistry> schemaFut
+ CompletableFuture<SchemaRegistry> schemaFut,
+ ClusterNode localNode,
+ MvTableStorage mvTableStorage,
+ IndexBuilder indexBuilder
) {
this.mvDataStorage = mvDataStorage;
this.raftClient = raftClient;
this.txManager = txManager;
this.lockManager = lockManager;
this.scanRequestExecutor = scanRequestExecutor;
- this.partId = partId;
- this.tableId = tableId;
this.indexesLockers = indexesLockers;
this.pkIndexStorage = pkIndexStorage;
this.secondaryIndexStorages = secondaryIndexStorages;
@@ -264,9 +284,11 @@ public class PartitionReplicaListener implements
ReplicaListener {
this.safeTime = safeTime;
this.txStateStorage = txStateStorage;
this.placementDriver = placementDriver;
- this.isLocalPeerChecker = isLocalPeerChecker;
this.storageUpdateHandler = storageUpdateHandler;
this.schemaFut = schemaFut;
+ this.localNode = localNode;
+ this.mvTableStorage = mvTableStorage;
+ this.indexBuilder = indexBuilder;
this.replicationGroupId = new TablePartitionId(tableId, partId);
@@ -333,7 +355,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
.thenCompose(replicaAndTerm -> {
Peer leader = replicaAndTerm.leader();
- if (isLocalPeerChecker.apply(leader)) {
+ if (isLocalPeer(leader)) {
CompletableFuture<TxMeta> txStateFut =
getTxStateConcurrently(request);
return txStateFut.thenApply(txMeta -> new
LeaderOrTxState(null, txMeta));
@@ -652,7 +674,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
- return lockManager.acquire(txId, new LockKey(tableId),
LockMode.S).thenCompose(tblLock -> {
+ return lockManager.acquire(txId, new LockKey(tableId()),
LockMode.S).thenCompose(tblLock -> {
var batchRows = new ArrayList<BinaryRow>(batchCount);
@SuppressWarnings("resource") PartitionTimestampCursor cursor =
(PartitionTimestampCursor) cursors.computeIfAbsent(cursorId,
@@ -727,7 +749,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
BinaryTuple exactKey = request.exactKey();
return lockManager.acquire(txId, new LockKey(indexId),
LockMode.IS).thenCompose(idxLock -> { // Index IS lock
- return lockManager.acquire(txId, new LockKey(tableId),
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
+ return lockManager.acquire(txId, new LockKey(tableId()),
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
return lockManager.acquire(txId, new LockKey(indexId,
exactKey.byteBuffer()), LockMode.S)
.thenCompose(indRowLock -> { // Hash index bucket S
lock
Cursor<RowId> cursor = (Cursor<RowId>)
cursors.computeIfAbsent(cursorId, id -> indexStorage.get(exactKey));
@@ -767,7 +789,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
int flags = request.flags();
return lockManager.acquire(txId, new LockKey(indexId),
LockMode.IS).thenCompose(idxLock -> { // Index IS lock
- return lockManager.acquire(txId, new LockKey(tableId),
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
+ return lockManager.acquire(txId, new LockKey(tableId()),
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
var comparator = new
BinaryTupleComparator(indexStorage.indexDescriptor());
Predicate<IndexRow> isUpperBoundAchieved = indexRow -> {
@@ -916,7 +938,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return completedFuture(null); // End of range reached.
Exit loop.
}
- return lockManager.acquire(txId, new LockKey(tableId,
currentRow.rowId()), LockMode.S)
+ return lockManager.acquire(txId, new LockKey(tableId(),
currentRow.rowId()), LockMode.S)
.thenComposeAsync(rowLock -> { // Table row S lock
ReadResult readResult =
mvDataStorage.read(currentRow.rowId(), HybridTimestamp.MAX_VALUE);
return
resolveAndCheckReadCompatibility(readResult, txId)
@@ -968,7 +990,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
RowId rowId = indexCursor.next();
- return lockManager.acquire(txId, new LockKey(tableId, rowId),
LockMode.S)
+ return lockManager.acquire(txId, new LockKey(tableId(), rowId),
LockMode.S)
.thenComposeAsync(rowLock -> { // Table row S lock
ReadResult readResult = mvDataStorage.read(rowId,
HybridTimestamp.MAX_VALUE);
return resolveAndCheckReadCompatibility(readResult, txId)
@@ -1390,7 +1412,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
} catch (Exception e) {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
- format("Unable to close cursor [tableId={}]", tableId), e);
+ format("Unable to close cursor [tableId={}]", tableId()),
e);
}
}
@@ -1572,7 +1594,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
BinaryTuple keyTuple = pkTupleFuts[futNum].join();
ByteBuffer keyToCheck = keyTuple.byteBuffer();
if (uniqueKeys.add(keyToCheck)) {
- rowsToInsert.put(new RowId(partId), row);
+ rowsToInsert.put(new RowId(partId(),
UUID.randomUUID()), row);
} else {
result.add(row);
}
@@ -1620,7 +1642,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
rowIdFuts[i++] = resolveRowByPk(searchRow, txId, (rowId,
row) -> {
boolean insert = rowId == null;
- RowId rowId0 = insert ? new RowId(partId) : rowId;
+ RowId rowId0 = insert ? new RowId(partId(),
UUID.randomUUID()) : rowId;
return insert
? takeLocksForInsert(searchRow, rowId0, txId)
@@ -1789,7 +1811,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return completedFuture(false);
}
- RowId rowId0 = new RowId(partId);
+ RowId rowId0 = new RowId(partId(), UUID.randomUUID());
return takeLocksForInsert(searchRow, rowId0, txId)
.thenCompose(rowIdLock -> applyUpdateCommand(
@@ -1807,7 +1829,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
boolean insert = rowId == null;
- RowId rowId0 = insert ? new RowId(partId) : rowId;
+ RowId rowId0 = insert ? new RowId(partId(),
UUID.randomUUID()) : rowId;
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>
lockFut = insert
? takeLocksForInsert(searchRow, rowId0, txId)
@@ -1829,7 +1851,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return resolveRowByPk(searchRow, txId, (rowId, row) -> {
boolean insert = rowId == null;
- RowId rowId0 = insert ? new RowId(partId) : rowId;
+ RowId rowId0 = insert ? new RowId(partId(),
UUID.randomUUID()) : rowId;
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>
lockFut = insert
? takeLocksForInsert(searchRow, rowId0, txId)
@@ -1897,8 +1919,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future completes with tuple {@link RowId} and collection of
{@link Lock}.
*/
private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>
takeLocksForUpdate(BinaryRow binaryRow, RowId rowId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
- .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId, rowId), LockMode.X))
+ return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
+ .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.X))
.thenCompose(ignored -> takePutLockOnIndexes(binaryRow, rowId,
txId))
.thenApply(shortTermLocks -> new IgniteBiTuple<>(rowId,
shortTermLocks));
}
@@ -1911,7 +1933,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future completes with tuple {@link RowId} and collection of
{@link Lock}.
*/
private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>
takeLocksForInsert(BinaryRow binaryRow, RowId rowId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX) //
IX lock on table
+ return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
// IX lock on table
.thenCompose(ignored -> takePutLockOnIndexes(binaryRow, rowId,
txId))
.thenApply(shortTermLocks -> new IgniteBiTuple<>(rowId,
shortTermLocks));
}
@@ -1969,11 +1991,11 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future completes with {@link RowId} or {@code null} if there is
no value for remove.
*/
private CompletableFuture<RowId> takeLocksForDeleteExact(BinaryRow
expectedRow, RowId rowId, BinaryRow actualRow, UUID txId) {
- return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX) //
IX lock on table
- .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId, rowId), LockMode.S)) // S lock on RowId
+ return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
// IX lock on table
+ .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.S)) // S lock on RowId
.thenCompose(ignored -> {
if (equalValues(actualRow, expectedRow)) {
- return lockManager.acquire(txId, new LockKey(tableId,
rowId), LockMode.X) // X lock on RowId
+ return lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.X) // X lock on RowId
.thenCompose(ignored0 ->
takeRemoveLockOnIndexes(actualRow, rowId, txId))
.thenApply(exclusiveRowLock -> rowId);
}
@@ -1989,8 +2011,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future completes with {@link RowId} or {@code null} if there is
no value for the key.
*/
private CompletableFuture<RowId> takeLocksForDelete(BinaryRow binaryRow,
RowId rowId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX) //
IX lock on table
- .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId, rowId), LockMode.X)) // X lock on RowId
+ return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
// IX lock on table
+ .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.X)) // X lock on RowId
.thenCompose(ignored -> takeRemoveLockOnIndexes(binaryRow,
rowId, txId))
.thenApply(ignored -> rowId);
}
@@ -2002,8 +2024,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future completes with {@link RowId} or {@code null} if there is
no value for the key.
*/
private CompletableFuture<RowId> takeLocksForGet(RowId rowId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(tableId), LockMode.IS) //
IS lock on table
- .thenCompose(tblLock -> lockManager.acquire(txId, new
LockKey(tableId, rowId), LockMode.S)) // S lock on RowId
+ return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IS)
// IS lock on table
+ .thenCompose(tblLock -> lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.S)) // S lock on RowId
.thenApply(ignored -> rowId);
}
@@ -2059,11 +2081,11 @@ public class PartitionReplicaListener implements
ReplicaListener {
*/
private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>
takeLocksForReplace(BinaryRow expectedRow, BinaryRow oldRow,
BinaryRow newRow, RowId rowId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
- .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId, rowId), LockMode.S))
+ return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
+ .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.S))
.thenCompose(ignored -> {
if (oldRow != null && equalValues(oldRow, expectedRow)) {
- return lockManager.acquire(txId, new LockKey(tableId,
rowId), LockMode.X) // X lock on RowId
+ return lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.X) // X lock on RowId
.thenCompose(ignored1 ->
takePutLockOnIndexes(newRow, rowId, txId))
.thenApply(shortTermLocks -> new
IgniteBiTuple<>(rowId, shortTermLocks));
}
@@ -2110,7 +2132,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
);
} else if (request instanceof ReadOnlyReplicaRequest || request
instanceof ReplicaSafeTimeSyncRequest) {
- return
raftClient.refreshAndGetLeaderWithTerm().thenApply(replicaAndTerm ->
isLocalPeerChecker.apply(replicaAndTerm.leader()));
+ return
raftClient.refreshAndGetLeaderWithTerm().thenApply(replicaAndTerm ->
isLocalPeer(replicaAndTerm.leader()));
} else {
return completedFuture(null);
}
@@ -2123,7 +2145,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return completedFuture(row);
}
- return schemaCompatValidator.validateBackwards(row.schemaVersion(),
tableId, txId)
+ return schemaCompatValidator.validateBackwards(row.schemaVersion(),
tableId(), txId)
.thenCompose(validationResult -> {
if (validationResult.isSuccessful()) {
return completedFuture(row);
@@ -2363,4 +2385,127 @@ public class PartitionReplicaListener implements
ReplicaListener {
*/
TxState state;
}
+
+ @Override
+ // TODO: IGNITE-19053 Must handle the change of leader
+ // TODO: IGNITE-19053 Add a test to change the leader even at the start of
the task
+ public void onBecomePrimary(ClusterNode clusterNode) {
+ inBusyLock(() -> {
+ if (!clusterNode.equals(localNode)) {
+ // We are not the primary replica.
+ return;
+ }
+
+ registerIndexesListener();
+
+ // Let's try to build an index for the previously created indexes
for the table.
+ TablesView tablesView =
mvTableStorage.tablesConfiguration().value();
+
+ for (UUID indexId : collectIndexIds(tablesView)) {
+ startBuildIndex(createIndexDescriptor(tablesView, indexId));
+ }
+ });
+ }
+
+ @Override
+ public void onShutdown() {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ ConfigurationNamedListListener<TableIndexView> listener =
indexesConfigurationListener.getAndSet(null);
+
+ if (listener != null) {
+
mvTableStorage.tablesConfiguration().indexes().stopListenElements(listener);
+ }
+
+ indexBuilder.stopBuildIndexes(tableId(), partId());
+ }
+
+ private void registerIndexesListener() {
+ // TODO: IGNITE-19498 Might need to listen to something else
+ ConfigurationNamedListListener<TableIndexView> listener = new
ConfigurationNamedListListener<>() {
+ @Override
+ public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<TableIndexView> ctx) {
+ inBusyLock(() -> {
+ TableIndexView tableIndexView = ctx.newValue();
+
+ if (tableId().equals(tableIndexView.tableId())) {
+
startBuildIndex(createIndexDescriptor(ctx.newValue(TablesView.class),
tableIndexView.id()));
+ }
+ });
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<?>
onRename(ConfigurationNotificationEvent<TableIndexView> ctx) {
+ return failedFuture(new UnsupportedOperationException());
+ }
+
+ @Override
+ public CompletableFuture<?>
onDelete(ConfigurationNotificationEvent<TableIndexView> ctx) {
+ inBusyLock(() -> {
+ TableIndexView tableIndexView = ctx.oldValue();
+
+ if (tableId().equals(tableIndexView.tableId())) {
+ indexBuilder.stopBuildIndex(tableId(), partId(),
tableIndexView.id());
+ }
+ });
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<?>
onUpdate(ConfigurationNotificationEvent<TableIndexView> ctx) {
+ return failedFuture(new UnsupportedOperationException());
+ }
+ };
+
+ boolean casResult = indexesConfigurationListener.compareAndSet(null,
listener);
+
+ assert casResult : replicationGroupId;
+
+
mvTableStorage.tablesConfiguration().indexes().listenElements(listener);
+ }
+
+ private void startBuildIndex(IndexDescriptor indexDescriptor) {
+ // TODO: IGNITE-19112 We only need to create the index storage once
+ IndexStorage indexStorage = mvTableStorage.getOrCreateIndex(partId(),
indexDescriptor);
+
+ indexBuilder.startBuildIndex(tableId(), partId(),
indexDescriptor.id(), indexStorage, mvDataStorage, raftClient);
+ }
+
+ private List<UUID> collectIndexIds(TablesView tablesView) {
+ return tablesView.indexes().stream()
+ .filter(tableIndexView ->
replicationGroupId.tableId().equals(tableIndexView.tableId()))
+ .map(TableIndexView::id)
+ .collect(toList());
+ }
+
+ private int partId() {
+ return replicationGroupId.partitionId();
+ }
+
+ private UUID tableId() {
+ return replicationGroupId.tableId();
+ }
+
+ private boolean isLocalPeer(Peer peer) {
+ return peer.consistentId().equals(localNode.name());
+ }
+
+ private void inBusyLock(Runnable runnable) {
+ if (!busyLock.enterBusy()) {
+ return;
+ }
+
+ try {
+ runnable.run();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index fd9c72973c..0164926784 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -36,6 +36,7 @@ import java.util.function.Function;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -49,12 +50,14 @@ import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
import org.apache.ignite.internal.schema.marshaller.MarshallerException;
import
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
@@ -67,6 +70,7 @@ import
org.apache.ignite.internal.table.distributed.SortedIndexLocker;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
@@ -85,6 +89,7 @@ import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
import org.hamcrest.CustomMatcher;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.BeforeAll;
@@ -116,7 +121,11 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
private static Function<BinaryRow, BinaryTuple> row2SortKeyConverter;
@BeforeAll
- public static void beforeAll(@InjectConfiguration DataStorageConfiguration
dsCfg) {
+ public static void beforeAll(
+ @InjectConfiguration DataStorageConfiguration dsCfg,
+ @InjectConfiguration("mock.tables.foo {}") TablesConfiguration
tablesConfig,
+ @InjectConfiguration DistributionZoneConfiguration
distributionZoneConfig
+ ) {
RaftGroupService mockRaftClient = mock(RaftGroupService.class);
when(mockRaftClient.refreshAndGetLeaderWithTerm())
@@ -201,8 +210,10 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
mock(LowWatermark.class)
),
new DummySchemas(schemaManager),
- peer -> true,
- CompletableFuture.completedFuture(schemaManager)
+ CompletableFuture.completedFuture(schemaManager),
+ mock(ClusterNode.class),
+ new TestMvTableStorage(tablesConfig.tables().get("foo"),
tablesConfig, distributionZoneConfig),
+ mock(IndexBuilder.class)
);
kvMarshaller = new
ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class,
Integer.class);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index d89cefd370..aa527094d5 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -68,6 +68,7 @@ import
org.apache.ignite.internal.catalog.commands.DefaultValue;
import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -83,6 +84,7 @@ import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
import org.apache.ignite.internal.schema.marshaller.MarshallerException;
@@ -91,6 +93,7 @@ import
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshal
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
import
org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
@@ -110,6 +113,7 @@ import
org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
import org.apache.ignite.internal.table.distributed.command.PartitionCommand;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException;
@@ -289,7 +293,11 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private static final AtomicInteger nextMonotonicInt = new AtomicInteger(1);
@BeforeEach
- public void beforeTest(@InjectConfiguration DataStorageConfiguration
dsCfg) {
+ public void beforeTest(
+ @InjectConfiguration DataStorageConfiguration dsCfg,
+ @InjectConfiguration("mock.tables.foo {}") TablesConfiguration
tablesConfig,
+ @InjectConfiguration DistributionZoneConfiguration
distributionZoneConfig
+ ) {
lenient().when(mockRaftClient.refreshAndGetLeaderWithTerm()).thenAnswer(invocationOnMock
-> {
if (!localLeader) {
return completedFuture(new LeaderWithTerm(new
Peer(anotherNode.name()), 1L));
@@ -397,8 +405,10 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
mock(LowWatermark.class)
),
schemas,
- peer -> localNode.name().equals(peer.consistentId()),
- completedFuture(schemaManager)
+ completedFuture(schemaManager),
+ localNode,
+ new TestMvTableStorage(tablesConfig.tables().get("foo"),
tablesConfig, distributionZoneConfig),
+ mock(IndexBuilder.class)
);
sortedIndexBinarySchema =
BinaryTupleSchema.createSchema(schemaDescriptor, new int[]{2 /* intVal column
*/});
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index a147752931..a7f1cc8a82 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -67,6 +67,7 @@ import
org.apache.ignite.internal.table.distributed.LowWatermark;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
@@ -305,8 +306,10 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
placementDriver,
storageUpdateHandler,
new DummySchemas(schemaManager),
- peer -> true,
- completedFuture(schemaManager)
+ completedFuture(schemaManager),
+ mock(ClusterNode.class),
+ mock(MvTableStorage.class),
+ mock(IndexBuilder.class)
);
partitionListener = new PartitionListener(