This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new a8f054ee51 IGNITE-20330 Create an abstraction for building indexes (#2631) a8f054ee51 is described below commit a8f054ee51e8da33fb849dc8381a7b26e1717b33 Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Mon Oct 9 13:31:29 2023 +0300 IGNITE-20330 Create an abstraction for building indexes (#2631) --- .../ignite/internal/catalog/CatalogService.java | 5 + .../internal/catalog/commands/CatalogUtils.java | 9 + .../catalog/commands/CreateTableCommand.java | 3 +- .../internal/catalog/CatalogManagerSelfTest.java | 41 ++- .../commands/DropIndexCommandValidationTest.java | 3 +- modules/index/build.gradle | 3 + .../internal/index/IndexBuildController.java | 314 +++++++++++++++++++++ .../apache/ignite/internal/index/IndexManager.java | 24 +- .../internal/index/IndexBuildControllerTest.java | 249 ++++++++++++++++ .../internal/placementdriver/PlacementDriver.java | 4 +- .../PrimaryReplicaAwaitException.java | 17 +- .../PrimaryReplicaAwaitTimeoutException.java | 20 +- .../placementdriver/event/PrimaryReplicaEvent.java | 2 +- .../placementdriver/TestPlacementDriver.java | 2 +- .../internal/placementdriver/ActiveActorTest.java | 1 - .../MultiActorPlacementDriverTest.java | 2 - .../PlacementDriverManagerTest.java | 2 - .../placementdriver/PlacementDriverManager.java | 27 +- .../placementdriver/leases/LeaseTracker.java | 25 +- .../placementdriver/PlacementDriverTest.java | 23 +- .../apache/ignite/internal/replicator/Replica.java | 2 - .../replicator/listener/ReplicaListener.java | 18 +- .../internal/runner/app/ItDataSchemaSyncTest.java | 83 +++--- .../internal/sql/engine/ItBuildIndexTest.java | 2 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 25 +- .../apache/ignite/internal/app/IgnitionImpl.java | 8 +- ...xDistributedTestSingleNodeNoCleanupMessage.java | 6 - .../internal/table/distributed/TableManager.java | 9 +- .../table/distributed/index/IndexBuilder.java | 4 +- .../request/BuildIndexReplicaRequest.java | 7 +- .../replicator/PartitionReplicaListener.java | 153 +--------- .../PartitionReplicaListenerIndexLockingTest.java | 5 - .../replication/PartitionReplicaListenerTest.java | 5 - .../apache/ignite/distributed/ItTxTestCluster.java | 10 +- .../ignite/internal/table/TableTestUtils.java | 45 +++ .../table/impl/DummyInternalTableImpl.java | 3 - 36 files changed, 770 insertions(+), 391 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java index 9c5113ec48..f514b310f6 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java @@ -35,6 +35,11 @@ import org.jetbrains.annotations.Nullable; * <p>Catalog service listens distributed schema update event, stores/restores schema evolution history (schema versions) for time-travelled * queries purposes and for lazy data evolution purposes. * + * <p>Notes:</p> + * <ul> + * <li>Events are fired in the metastore thread.</li> + * </ul> + * * <p>TBD: events */ public interface CatalogService extends EventProducer<CatalogEvent, CatalogEventParameters> { diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java index a858bc37f6..709aaaaa6d 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java @@ -367,4 +367,13 @@ public class CatalogUtils { return zone; } + + /** + * Returns the primary key index name for table. + * + * @param tableName Table name. + */ + public static String pkIndexName(String tableName) { + return tableName + "_PK"; + } } diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java index 3ef142c6b4..8edac3cd81 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java @@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNullElse; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN; import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.ensureNoTableIndexOrSysViewExistsWithGivenName; +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.zoneOrThrow; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; @@ -115,7 +116,7 @@ public class CreateTableCommand extends AbstractTableCommand { INITIAL_CAUSALITY_TOKEN ); - String indexName = tableName + "_PK"; + String indexName = pkIndexName(tableName); ensureNoTableIndexOrSysViewExistsWithGivenName(schema, indexName); diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java index 67de39a298..557ad4f953 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java @@ -40,6 +40,7 @@ import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_S import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_STORAGE_ENGINE; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE; +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName; import static org.apache.ignite.internal.catalog.commands.DefaultValue.constant; import static org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation.ASC_NULLS_LAST; import static org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation.DESC_NULLS_FIRST; @@ -216,12 +217,12 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { assertNull(schema.table(TABLE_NAME)); assertNull(manager.table(TABLE_NAME, 123L)); - assertNull(manager.index(createPkIndexName(TABLE_NAME), 123L)); + assertNull(manager.index(pkIndexName(TABLE_NAME), 123L)); // Validate actual catalog schema = manager.schema(SCHEMA_NAME, 1); CatalogTableDescriptor table = schema.table(TABLE_NAME); - CatalogHashIndexDescriptor pkIndex = (CatalogHashIndexDescriptor) schema.index(createPkIndexName(TABLE_NAME)); + CatalogHashIndexDescriptor pkIndex = (CatalogHashIndexDescriptor) schema.index(pkIndexName(TABLE_NAME)); assertNotNull(schema); assertEquals(SCHEMA_NAME, schema.name()); @@ -230,7 +231,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { assertSame(table, manager.table(TABLE_NAME, clock.nowLong())); assertSame(table, manager.table(table.id(), clock.nowLong())); - assertSame(pkIndex, manager.index(createPkIndexName(TABLE_NAME), clock.nowLong())); + assertSame(pkIndex, manager.index(pkIndexName(TABLE_NAME), clock.nowLong())); assertSame(pkIndex, manager.index(pkIndex.id(), clock.nowLong())); // Validate newly created table @@ -238,7 +239,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { assertEquals(manager.zone(ZONE_NAME, clock.nowLong()).id(), table.zoneId()); // Validate newly created pk index - assertEquals(createPkIndexName(TABLE_NAME), pkIndex.name()); + assertEquals(pkIndexName(TABLE_NAME), pkIndex.name()); assertEquals(table.id(), pkIndex.tableId()); assertEquals(table.primaryKeyColumns(), pkIndex.columns()); assertTrue(pkIndex.unique()); @@ -254,9 +255,9 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { // Validate actual catalog has both tables. schema = manager.schema(2); table = schema.table(TABLE_NAME); - pkIndex = (CatalogHashIndexDescriptor) schema.index(createPkIndexName(TABLE_NAME)); + pkIndex = (CatalogHashIndexDescriptor) schema.index(pkIndexName(TABLE_NAME)); CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2); - CatalogHashIndexDescriptor pkIndex2 = (CatalogHashIndexDescriptor) schema.index(createPkIndexName(TABLE_NAME_2)); + CatalogHashIndexDescriptor pkIndex2 = (CatalogHashIndexDescriptor) schema.index(pkIndexName(TABLE_NAME_2)); assertNotNull(schema); assertEquals(SCHEMA_NAME, schema.name()); @@ -265,13 +266,13 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { assertSame(table, manager.table(TABLE_NAME, clock.nowLong())); assertSame(table, manager.table(table.id(), clock.nowLong())); - assertSame(pkIndex, manager.index(createPkIndexName(TABLE_NAME), clock.nowLong())); + assertSame(pkIndex, manager.index(pkIndexName(TABLE_NAME), clock.nowLong())); assertSame(pkIndex, manager.index(pkIndex.id(), clock.nowLong())); assertSame(table2, manager.table(TABLE_NAME_2, clock.nowLong())); assertSame(table2, manager.table(table2.id(), clock.nowLong())); - assertSame(pkIndex2, manager.index(createPkIndexName(TABLE_NAME_2), clock.nowLong())); + assertSame(pkIndex2, manager.index(pkIndexName(TABLE_NAME_2), clock.nowLong())); assertSame(pkIndex2, manager.index(pkIndex2.id(), clock.nowLong())); assertNotSame(table, table2); @@ -300,8 +301,8 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { CatalogSchemaDescriptor schema = manager.schema(2); CatalogTableDescriptor table1 = schema.table(TABLE_NAME); CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2); - CatalogIndexDescriptor pkIndex1 = schema.index(createPkIndexName(TABLE_NAME)); - CatalogIndexDescriptor pkIndex2 = schema.index(createPkIndexName(TABLE_NAME_2)); + CatalogIndexDescriptor pkIndex1 = schema.index(pkIndexName(TABLE_NAME)); + CatalogIndexDescriptor pkIndex2 = schema.index(pkIndexName(TABLE_NAME_2)); assertNotEquals(table1.id(), table2.id()); assertNotEquals(pkIndex1.id(), pkIndex2.id()); @@ -313,13 +314,13 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { assertSame(table1, manager.table(TABLE_NAME, beforeDropTimestamp)); assertSame(table1, manager.table(table1.id(), beforeDropTimestamp)); - assertSame(pkIndex1, manager.index(createPkIndexName(TABLE_NAME), beforeDropTimestamp)); + assertSame(pkIndex1, manager.index(pkIndexName(TABLE_NAME), beforeDropTimestamp)); assertSame(pkIndex1, manager.index(pkIndex1.id(), beforeDropTimestamp)); assertSame(table2, manager.table(TABLE_NAME_2, beforeDropTimestamp)); assertSame(table2, manager.table(table2.id(), beforeDropTimestamp)); - assertSame(pkIndex2, manager.index(createPkIndexName(TABLE_NAME_2), beforeDropTimestamp)); + assertSame(pkIndex2, manager.index(pkIndexName(TABLE_NAME_2), beforeDropTimestamp)); assertSame(pkIndex2, manager.index(pkIndex2.id(), beforeDropTimestamp)); // Validate actual catalog @@ -333,14 +334,14 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { assertNull(manager.table(TABLE_NAME, clock.nowLong())); assertNull(manager.table(table1.id(), clock.nowLong())); - assertNull(schema.index(createPkIndexName(TABLE_NAME))); - assertNull(manager.index(createPkIndexName(TABLE_NAME), clock.nowLong())); + assertNull(schema.index(pkIndexName(TABLE_NAME))); + assertNull(manager.index(pkIndexName(TABLE_NAME), clock.nowLong())); assertNull(manager.index(pkIndex1.id(), clock.nowLong())); assertSame(table2, manager.table(TABLE_NAME_2, clock.nowLong())); assertSame(table2, manager.table(table2.id(), clock.nowLong())); - assertSame(pkIndex2, manager.index(createPkIndexName(TABLE_NAME_2), clock.nowLong())); + assertSame(pkIndex2, manager.index(pkIndexName(TABLE_NAME_2), clock.nowLong())); assertSame(pkIndex2, manager.index(pkIndex2.id(), clock.nowLong())); // Validate schema wasn't changed. @@ -1502,7 +1503,7 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { assertThat(manager.execute(createHashIndexCommand(INDEX_NAME, List.of("VAL"))), willBe(nullValue())); int tableId = manager.table(TABLE_NAME, clock.nowLong()).id(); - int pkIndexId = manager.index(createPkIndexName(TABLE_NAME), clock.nowLong()).id(); + int pkIndexId = manager.index(pkIndexName(TABLE_NAME), clock.nowLong()).id(); int indexId = manager.index(INDEX_NAME, clock.nowLong()).id(); assertNotEquals(tableId, indexId); @@ -1563,8 +1564,8 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { assertThat(manager.execute(simpleIndex()), willBe(nullValue())); assertThat(manager.indexes(0), empty()); - assertThat(manager.indexes(1), hasItems(index(1, createPkIndexName(TABLE_NAME)))); - assertThat(manager.indexes(2), hasItems(index(2, createPkIndexName(TABLE_NAME)), index(2, INDEX_NAME))); + assertThat(manager.indexes(1), hasItems(index(1, pkIndexName(TABLE_NAME)))); + assertThat(manager.indexes(2), hasItems(index(2, pkIndexName(TABLE_NAME)), index(2, INDEX_NAME))); } @Test @@ -1916,10 +1917,6 @@ public class CatalogManagerSelfTest extends BaseCatalogManagerTest { } } - private static String createPkIndexName(String tableName) { - return tableName + "_PK"; - } - private @Nullable CatalogTableDescriptor table(int catalogVersion, String tableName) { return manager.schema(catalogVersion).table(tableName); } diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandValidationTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandValidationTest.java index 0d83af17ac..b3bb2cddaa 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandValidationTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/DropIndexCommandValidationTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.catalog.commands; +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import java.util.List; @@ -109,7 +110,7 @@ public class DropIndexCommandValidationTest extends AbstractCommandValidationTes CatalogCommand command = DropIndexCommand.builder() .schemaName(SCHEMA_NAME) - .indexName(TABLE_NAME + "_PK") + .indexName(pkIndexName(TABLE_NAME)) .build(); assertThrowsWithCause( diff --git a/modules/index/build.gradle b/modules/index/build.gradle index 13a17abd1d..264ef39637 100644 --- a/modules/index/build.gradle +++ b/modules/index/build.gradle @@ -31,6 +31,7 @@ dependencies { implementation project(':ignite-network-api') implementation project(':ignite-raft-api') implementation project(':ignite-metastorage-api') + implementation project(':ignite-placement-driver-api') implementation libs.jetbrains.annotations testImplementation(testFixtures(project(':ignite-configuration'))) @@ -38,6 +39,8 @@ dependencies { testImplementation(testFixtures(project(':ignite-vault'))) testImplementation(testFixtures(project(':ignite-metastorage'))) testImplementation(testFixtures(project(':ignite-table'))) + testImplementation(testFixtures(project(':ignite-catalog'))) + testImplementation project(':ignite-placement-driver') testImplementation libs.mockito.core testImplementation libs.mockito.junit testImplementation libs.hamcrest.core diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java new file mode 100644 index 0000000000..55f751de2f --- /dev/null +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.index; + +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; +import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; + +import java.util.ArrayList; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.catalog.events.CatalogEvent; +import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters; +import org.apache.ignite.internal.catalog.events.DropIndexEventParameters; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent; +import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.engine.MvTableStorage; +import org.apache.ignite.internal.storage.index.IndexStorage; +import org.apache.ignite.internal.table.distributed.index.IndexBuilder; +import org.apache.ignite.internal.util.ExceptionUtils; +import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterService; + +/** + * Сomponent is responsible for starting and stopping the building of indexes on primary replicas. + * + * <p>Component handles the following events (indexes are started and stopped by {@link CatalogIndexDescriptor#tableId()} == + * {@link TablePartitionId#tableId()}): </p> + * <ul> + * <li>{@link CatalogEvent#INDEX_CREATE} - starts building indexes for the corresponding local primary replicas.</li> + * <li>{@link CatalogEvent#INDEX_DROP} - stops building indexes for the corresponding local primary replicas.</li> + * <li>{@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED} - for a new local primary replica, starts the building of all corresponding + * indexes, for an expired primary replica, stops the building of all corresponding indexes.</li> + * </ul> + */ +// TODO: IGNITE-20544 Start building indexes on node recovery +public class IndexBuildController implements IgniteComponent { + private static final long AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC = 10; + + private final IndexBuilder indexBuilder; + + private final IndexManager indexManager; + + private final CatalogService catalogService; + + private final ClusterService clusterService; + + private final PlacementDriver placementDriver; + + private final HybridClock clock; + + private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); + + private final AtomicBoolean closeGuard = new AtomicBoolean(); + + private final Set<TablePartitionId> primaryReplicaIds = ConcurrentHashMap.newKeySet(); + + /** Constructor. */ + public IndexBuildController( + IndexBuilder indexBuilder, + IndexManager indexManager, + CatalogService catalogService, + ClusterService clusterService, + PlacementDriver placementDriver, + HybridClock clock + ) { + this.indexBuilder = indexBuilder; + this.indexManager = indexManager; + this.catalogService = catalogService; + this.clusterService = clusterService; + this.placementDriver = placementDriver; + this.clock = clock; + + addListeners(); + } + + @Override + public void start() { + // No-op. + } + + @Override + public void stop() { + if (!closeGuard.compareAndSet(false, true)) { + return; + } + + busyLock.block(); + + indexBuilder.close(); + } + + private void addListeners() { + catalogService.listen(CatalogEvent.INDEX_CREATE, (parameters, exception) -> { + if (exception != null) { + return failedFuture(exception); + } + + return onIndexCreate(((CreateIndexEventParameters) parameters)).thenApply(unused -> false); + }); + + catalogService.listen(CatalogEvent.INDEX_DROP, (parameters, exception) -> { + if (exception != null) { + return failedFuture(exception); + } + + return onIndexDrop(((DropIndexEventParameters) parameters)).thenApply(unused -> false); + }); + + placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, (parameters, exception) -> { + if (exception != null) { + return failedFuture(exception); + } + + return onPrimaryReplicaElected(parameters).thenApply(unused -> false); + }); + } + + private CompletableFuture<?> onIndexCreate(CreateIndexEventParameters parameters) { + return inBusyLockAsync(busyLock, () -> { + var startBuildIndexFutures = new ArrayList<CompletableFuture<?>>(); + + for (TablePartitionId primaryReplicaId : primaryReplicaIds) { + if (primaryReplicaId.tableId() == parameters.indexDescriptor().tableId()) { + CompletableFuture<?> startBuildIndexFuture = getMvTableStorageFuture(parameters.causalityToken(), primaryReplicaId) + .thenCompose(mvTableStorage -> awaitPrimaryReplicaForNow(primaryReplicaId) + .thenAccept(replicaMeta -> tryScheduleBuildIndex( + primaryReplicaId, + parameters.indexDescriptor(), + mvTableStorage, + replicaMeta + )) + ); + + startBuildIndexFutures.add(startBuildIndexFuture); + } + } + + return allOf(startBuildIndexFutures.toArray(CompletableFuture[]::new)); + }); + } + + private CompletableFuture<?> onIndexDrop(DropIndexEventParameters parameters) { + return inBusyLockAsync(busyLock, () -> { + indexBuilder.stopBuildingIndexes(parameters.indexId()); + + return completedFuture(null); + }); + } + + private CompletableFuture<?> onPrimaryReplicaElected(PrimaryReplicaEventParameters parameters) { + return inBusyLockAsync(busyLock, () -> { + TablePartitionId primaryReplicaId = (TablePartitionId) parameters.groupId(); + + if (isLocalNode(parameters.leaseholder())) { + primaryReplicaIds.add(primaryReplicaId); + + // It is safe to get the latest version of the catalog because the PRIMARY_REPLICA_ELECTED event is handled on the + // metastore thread. + int catalogVersion = catalogService.latestCatalogVersion(); + + return getMvTableStorageFuture(parameters.causalityToken(), primaryReplicaId) + .thenCompose(mvTableStorage -> awaitPrimaryReplicaForNow(primaryReplicaId) + .thenAccept(replicaMeta -> tryScheduleBuildIndexesForNewPrimaryReplica( + catalogVersion, + primaryReplicaId, + mvTableStorage, + replicaMeta + )) + ); + } else { + stopBuildingIndexesIfPrimaryExpired(primaryReplicaId); + + return completedFuture(null); + } + }); + } + + private void tryScheduleBuildIndexesForNewPrimaryReplica( + int catalogVersion, + TablePartitionId primaryReplicaId, + MvTableStorage mvTableStorage, + ReplicaMeta replicaMeta + ) { + inBusyLock(busyLock, () -> { + if (isLeaseExpire(replicaMeta)) { + stopBuildingIndexesIfPrimaryExpired(primaryReplicaId); + + return; + } + + // TODO: IGNITE-20530 We only need to get write-only indexes + for (CatalogIndexDescriptor indexDescriptor : catalogService.indexes(catalogVersion)) { + if (primaryReplicaId.tableId() == indexDescriptor.tableId()) { + scheduleBuildIndex(primaryReplicaId, indexDescriptor, mvTableStorage); + } + } + }); + } + + private void tryScheduleBuildIndex( + TablePartitionId primaryReplicaId, + CatalogIndexDescriptor indexDescriptor, + MvTableStorage mvTableStorage, + ReplicaMeta replicaMeta + ) { + inBusyLock(busyLock, () -> { + if (isLeaseExpire(replicaMeta)) { + stopBuildingIndexesIfPrimaryExpired(primaryReplicaId); + + return; + } + + scheduleBuildIndex(primaryReplicaId, indexDescriptor, mvTableStorage); + }); + } + + /** + * Stops building indexes for a replica that has expired, if it has not been done before. + * + * <p>We need to stop building indexes at the event of a change of primary replica or after executing asynchronous code, we understand + * that the {@link ReplicaMeta#getExpirationTime() expiration time} has come.</p> + * + * @param replicaId Replica ID. + */ + private void stopBuildingIndexesIfPrimaryExpired(TablePartitionId replicaId) { + if (primaryReplicaIds.remove(replicaId)) { + // Primary replica is no longer current, we need to stop building indexes for it. + indexBuilder.stopBuildingIndexes(replicaId.tableId(), replicaId.partitionId()); + } + } + + private CompletableFuture<MvTableStorage> getMvTableStorageFuture(long causalityToken, TablePartitionId replicaId) { + return indexManager.getMvTableStorage(causalityToken, replicaId.tableId()); + } + + private CompletableFuture<ReplicaMeta> awaitPrimaryReplicaForNow(TablePartitionId replicaId) { + return placementDriver + .awaitPrimaryReplica(replicaId, clock.now(), AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC, SECONDS) + .handle((replicaMeta, throwable) -> { + if (throwable != null) { + Throwable unwrapThrowable = ExceptionUtils.unwrapCause(throwable); + + if (unwrapThrowable instanceof PrimaryReplicaAwaitTimeoutException) { + return awaitPrimaryReplicaForNow(replicaId); + } else { + return CompletableFuture.<ReplicaMeta>failedFuture(unwrapThrowable); + } + } + + return completedFuture(replicaMeta); + }).thenCompose(Function.identity()); + } + + /** Shortcut to schedule index building. */ + private void scheduleBuildIndex(TablePartitionId replicaId, CatalogIndexDescriptor indexDescriptor, MvTableStorage mvTableStorage) { + int partitionId = replicaId.partitionId(); + + MvPartitionStorage mvPartition = mvTableStorage.getMvPartition(partitionId); + + assert mvPartition != null : replicaId; + + int indexId = indexDescriptor.id(); + + IndexStorage indexStorage = mvTableStorage.getIndex(partitionId, indexId); + + assert indexStorage != null : "replicaId=" + replicaId + ", indexId=" + indexId; + + indexBuilder.scheduleBuildIndex(replicaId.tableId(), partitionId, indexId, indexStorage, mvPartition, localNode()); + } + + private boolean isLocalNode(String nodeConsistentId) { + return nodeConsistentId.equals(localNode().name()); + } + + private ClusterNode localNode() { + return clusterService.topologyService().localMember(); + } + + private boolean isLeaseExpire(ReplicaMeta replicaMeta) { + return !isLocalNode(replicaMeta.getLeaseholder()) || clock.now().after(replicaMeta.getExpirationTime()); + } +} diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java index 885357fced..2c78d562fb 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java @@ -35,7 +35,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongFunction; -import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.apache.ignite.internal.catalog.events.CatalogEvent; @@ -80,8 +80,8 @@ public class IndexManager implements IgniteComponent { /** Table manager. */ private final TableManager tableManager; - /** Catalog manager. */ - private final CatalogManager catalogManager; + /** Catalog service. */ + private final CatalogService catalogService; /** Meta storage manager. */ private final MetaStorageManager metaStorageManager; @@ -103,18 +103,18 @@ public class IndexManager implements IgniteComponent { * * @param schemaManager Schema manager. * @param tableManager Table manager. - * @param catalogManager Catalog manager. + * @param catalogService Catalog manager. */ public IndexManager( CatalogSchemaManager schemaManager, TableManager tableManager, - CatalogManager catalogManager, + CatalogService catalogService, MetaStorageManager metaStorageManager, Consumer<LongFunction<CompletableFuture<?>>> registry ) { this.schemaManager = schemaManager; this.tableManager = tableManager; - this.catalogManager = catalogManager; + this.catalogService = catalogService; this.metaStorageManager = metaStorageManager; startVv = new IncrementalVersionedValue<>(registry); @@ -127,7 +127,7 @@ public class IndexManager implements IgniteComponent { startIndexes(); - catalogManager.listen(INDEX_CREATE, (parameters, exception) -> { + catalogService.listen(INDEX_CREATE, (parameters, exception) -> { if (exception != null) { return failedFuture(exception); } @@ -135,7 +135,7 @@ public class IndexManager implements IgniteComponent { return onIndexCreate((CreateIndexEventParameters) parameters); }); - catalogManager.listen(INDEX_DROP, (parameters, exception) -> { + catalogService.listen(INDEX_DROP, (parameters, exception) -> { if (exception != null) { return failedFuture(exception); } @@ -209,7 +209,7 @@ public class IndexManager implements IgniteComponent { long causalityToken = parameters.causalityToken(); int catalogVersion = parameters.catalogVersion(); - CatalogTableDescriptor table = catalogManager.table(tableId, catalogVersion); + CatalogTableDescriptor table = catalogService.table(tableId, catalogVersion); assert table != null : "tableId=" + tableId + ", indexId=" + indexId; @@ -322,15 +322,15 @@ public class IndexManager implements IgniteComponent { assert recoveryFinishedFuture.isDone(); - int catalogVersion = catalogManager.latestCatalogVersion(); + int catalogVersion = catalogService.latestCatalogVersion(); long causalityToken = recoveryFinishedFuture.join(); List<CompletableFuture<?>> startIndexFutures = new ArrayList<>(); - for (CatalogIndexDescriptor index : catalogManager.indexes(catalogVersion)) { + for (CatalogIndexDescriptor index : catalogService.indexes(catalogVersion)) { int tableId = index.tableId(); - CatalogTableDescriptor table = catalogManager.table(tableId, catalogVersion); + CatalogTableDescriptor table = catalogService.table(tableId, catalogVersion); assert table != null : "tableId=" + tableId + ", indexId=" + index.id(); diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java new file mode 100644 index 0000000000..dca73259f9 --- /dev/null +++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.index; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME; +import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME; +import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName; +import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex; +import static org.apache.ignite.internal.table.TableTestUtils.getIndexIdStrict; +import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.sql.ColumnType.INT32; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.CatalogTestUtils; +import org.apache.ignite.internal.catalog.commands.ColumnParams; +import org.apache.ignite.internal.event.AbstractEventProducer; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent; +import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; +import org.apache.ignite.internal.placementdriver.leases.Lease; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.engine.MvTableStorage; +import org.apache.ignite.internal.storage.index.IndexStorage; +import org.apache.ignite.internal.table.TableTestUtils; +import org.apache.ignite.internal.table.distributed.index.IndexBuilder; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.TopologyService; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** For {@link IndexBuildController} testing. */ +public class IndexBuildControllerTest extends BaseIgniteAbstractTest { + private static final String NODE_NAME = "test_node"; + + private static final String TABLE_NAME = "test_table"; + + private static final String COLUMN_NAME = "test_column"; + + private static final String INDEX_NAME = "test_index"; + + private static final int PARTITION_ID = 10; + + private ClusterNode localNode; + + private IndexBuilder indexBuilder; + + private CatalogManager catalogManager; + + private IndexBuildController indexBuildController; + + private final TestPlacementDriver placementDriver = new TestPlacementDriver(); + + private final HybridClock clock = new HybridClockImpl(); + + @BeforeEach + void setUp() { + localNode = mock(ClusterNode.class, invocation -> NODE_NAME); + + indexBuilder = mock(IndexBuilder.class); + + IndexManager indexManager = mock(IndexManager.class, invocation -> { + MvTableStorage mvTableStorage = mock(MvTableStorage.class); + MvPartitionStorage mvPartitionStorage = mock(MvPartitionStorage.class); + IndexStorage indexStorage = mock(IndexStorage.class); + + when(mvTableStorage.getMvPartition(anyInt())).thenReturn(mvPartitionStorage); + when(mvTableStorage.getIndex(anyInt(), anyInt())).thenReturn(indexStorage); + + return completedFuture(mvTableStorage); + }); + + ClusterService clusterService = mock(ClusterService.class, invocation -> mock(TopologyService.class, invocation1 -> localNode)); + + catalogManager = CatalogTestUtils.createTestCatalogManager(NODE_NAME, clock); + catalogManager.start(); + + TableTestUtils.createTable( + catalogManager, + DEFAULT_SCHEMA_NAME, + DEFAULT_ZONE_NAME, + TABLE_NAME, + List.of(ColumnParams.builder().name(COLUMN_NAME).type(INT32).build()), + List.of(COLUMN_NAME) + ); + + indexBuildController = new IndexBuildController(indexBuilder, indexManager, catalogManager, clusterService, placementDriver, clock); + } + + @AfterEach + void tearDown() throws Exception { + IgniteUtils.stopAll(catalogManager, indexBuildController); + } + + @Test + void testStartBuildIndexesOnIndexCreate() { + setPrimaryReplicaWitchExpireInOneSecond(PARTITION_ID, NODE_NAME, clock.now()); + + clearInvocations(indexBuilder); + + createIndex(INDEX_NAME); + + verify(indexBuilder).scheduleBuildIndex(eq(tableId()), eq(PARTITION_ID), eq(indexId(INDEX_NAME)), any(), any(), eq(localNode)); + } + + @Test + void testStartBuildIndexesOnPrimaryReplicaElected() { + createIndex(INDEX_NAME); + + setPrimaryReplicaWitchExpireInOneSecond(PARTITION_ID, NODE_NAME, clock.now()); + + verify(indexBuilder).scheduleBuildIndex(eq(tableId()), eq(PARTITION_ID), eq(indexId(INDEX_NAME)), any(), any(), eq(localNode)); + + verify(indexBuilder).scheduleBuildIndex( + eq(tableId()), + eq(PARTITION_ID), + eq(indexId(pkIndexName(TABLE_NAME))), + any(), + any(), + eq(localNode) + ); + } + + @Test + void testStopBuildIndexesOnIndexDrop() { + createIndex(INDEX_NAME); + + int indexId = indexId(INDEX_NAME); + + dropIndex(INDEX_NAME); + + verify(indexBuilder).stopBuildingIndexes(indexId); + } + + @Test + void testStopBuildIndexesOnChangePrimaryReplica() { + setPrimaryReplicaWitchExpireInOneSecond(PARTITION_ID, NODE_NAME, clock.now()); + setPrimaryReplicaWitchExpireInOneSecond(PARTITION_ID, NODE_NAME + "_other", clock.now()); + + verify(indexBuilder).stopBuildingIndexes(tableId(), PARTITION_ID); + } + + private void createIndex(String indexName) { + createHashIndex(catalogManager, DEFAULT_SCHEMA_NAME, TABLE_NAME, indexName, List.of(COLUMN_NAME), false); + } + + private void dropIndex(String indexName) { + TableTestUtils.dropIndex(catalogManager, DEFAULT_SCHEMA_NAME, indexName); + } + + private void setPrimaryReplicaWitchExpireInOneSecond(int partitionId, String leaseholder, HybridTimestamp startTime) { + CompletableFuture<ReplicaMeta> replicaMetaFuture = completedFuture(replicaMetaForOneSecond(leaseholder, startTime)); + + assertThat(placementDriver.setPrimaryReplicaMeta(0, replicaId(partitionId), replicaMetaFuture), willCompleteSuccessfully()); + } + + private int tableId() { + return getTableIdStrict(catalogManager, TABLE_NAME, clock.nowLong()); + } + + private int indexId(String indexName) { + return getIndexIdStrict(catalogManager, indexName, clock.nowLong()); + } + + private TablePartitionId replicaId(int partitionId) { + return new TablePartitionId(tableId(), partitionId); + } + + private ReplicaMeta replicaMetaForOneSecond(String leaseholder, HybridTimestamp startTime) { + return new Lease(leaseholder, startTime, startTime.addPhysicalTime(1_000), new TablePartitionId(tableId(), PARTITION_ID)); + } + + private static class TestPlacementDriver extends AbstractEventProducer<PrimaryReplicaEvent, PrimaryReplicaEventParameters> implements + PlacementDriver { + private final Map<ReplicationGroupId, CompletableFuture<ReplicaMeta>> primaryReplicaMetaFutureById = new ConcurrentHashMap<>(); + + @Override + public CompletableFuture<ReplicaMeta> awaitPrimaryReplica( + ReplicationGroupId groupId, + HybridTimestamp timestamp, + long timeout, + TimeUnit unit + ) { + return primaryReplicaMetaFutureById.get(groupId); + } + + @Override + public CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId replicationGroupId, HybridTimestamp timestamp) { + return primaryReplicaMetaFutureById.get(replicationGroupId); + } + + @Override + public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId grpId) { + throw new UnsupportedOperationException(); + } + + CompletableFuture<Void> setPrimaryReplicaMeta( + long causalityToken, + TablePartitionId replicaId, + CompletableFuture<ReplicaMeta> replicaMetaFuture + ) { + primaryReplicaMetaFutureById.put(replicaId, replicaMetaFuture); + + return replicaMetaFuture.thenCompose(replicaMeta -> fireEvent( + PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, + new PrimaryReplicaEventParameters(causalityToken, replicaId, replicaMeta.getLeaseholder()) + )); + } + } +} diff --git a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java index 81292626c6..f396aa453a 100644 --- a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java +++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java @@ -37,8 +37,8 @@ public interface PlacementDriver extends EventProducer<PrimaryReplicaEvent, Prim * * @param groupId Replication group id. * @param timestamp Timestamp reference value. - * @param timeout – How long to wait before completing exceptionally with a TimeoutException, in units of unit. - * @param unit – A TimeUnit determining how to interpret the timeout parameter. + * @param timeout How long to wait before completing exceptionally with a TimeoutException, in units of unit. + * @param unit A TimeUnit determining how to interpret the timeout parameter. * @return Primary replica future. */ CompletableFuture<ReplicaMeta> awaitPrimaryReplica( diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java similarity index 79% rename from modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java rename to modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java index c0497f6b19..5de7483175 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java +++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java @@ -17,17 +17,18 @@ package org.apache.ignite.internal.placementdriver; +import static org.apache.ignite.lang.ErrorGroups.PlacementDriver.PRIMARY_REPLICA_AWAIT_ERR; + import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; -import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.lang.ErrorGroups; /** * The exception is thrown when a primary replica await process has failed. Please pay attention that there is a specific * {@link PrimaryReplicaAwaitTimeoutException} for the primary replica await timeout. */ public class PrimaryReplicaAwaitException extends IgniteInternalException { + private static final long serialVersionUID = 1029917546884926160L; /** * The constructor. @@ -37,11 +38,11 @@ public class PrimaryReplicaAwaitException extends IgniteInternalException { * @param cause Cause exception. */ public PrimaryReplicaAwaitException(ReplicationGroupId replicationGroupId, HybridTimestamp referenceTimestamp, Throwable cause) { - super(ErrorGroups.PlacementDriver.PRIMARY_REPLICA_AWAIT_ERR, - IgniteStringFormatter.format( - "The primary replica await exception [replicationGroupId={}, referenceTimestamp={}]", - replicationGroupId, referenceTimestamp - ), - cause); + super( + PRIMARY_REPLICA_AWAIT_ERR, + "The primary replica await exception [replicationGroupId={}, referenceTimestamp={}]", + cause, + replicationGroupId, referenceTimestamp + ); } } diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java similarity index 73% rename from modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java rename to modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java index cbe33d5702..35045f9b06 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java +++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java @@ -17,18 +17,18 @@ package org.apache.ignite.internal.placementdriver; +import static org.apache.ignite.lang.ErrorGroups.PlacementDriver.PRIMARY_REPLICA_AWAIT_TIMEOUT_ERR; + import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteInternalException; -import org.apache.ignite.internal.lang.IgniteStringFormatter; -import org.apache.ignite.internal.placementdriver.leases.Lease; import org.apache.ignite.internal.replicator.ReplicationGroupId; -import org.apache.ignite.lang.ErrorGroups; import org.jetbrains.annotations.Nullable; /** * The exception is thrown when a primary replica await process has times out. */ public class PrimaryReplicaAwaitTimeoutException extends IgniteInternalException { + private static final long serialVersionUID = -1450288033816499192L; /** * The constructor. @@ -40,14 +40,14 @@ public class PrimaryReplicaAwaitTimeoutException extends IgniteInternalException public PrimaryReplicaAwaitTimeoutException( ReplicationGroupId replicationGroupId, HybridTimestamp referenceTimestamp, - @Nullable Lease currentLease, + @Nullable ReplicaMeta currentLease, Throwable cause ) { - super(ErrorGroups.PlacementDriver.PRIMARY_REPLICA_AWAIT_TIMEOUT_ERR, - IgniteStringFormatter.format( - "The primary replica await timed out [replicationGroupId={}, referenceTimestamp={}, currentLease={}]", - replicationGroupId, referenceTimestamp, currentLease - ), - cause); + super( + PRIMARY_REPLICA_AWAIT_TIMEOUT_ERR, + "The primary replica await timed out [replicationGroupId={}, referenceTimestamp={}, currentLease={}]", + cause, + replicationGroupId, referenceTimestamp, currentLease + ); } } diff --git a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEvent.java b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEvent.java index f4be9ca502..345bbc906c 100644 --- a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEvent.java +++ b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/event/PrimaryReplicaEvent.java @@ -33,7 +33,7 @@ public enum PrimaryReplicaEvent implements Event { * <p>Notes:</p> * <ul> * <li>This event will fire strictly after the completion of the future from {@link PlacementDriver#awaitPrimaryReplica}.</li> - * <li>This event will fire when on node recovery and will indicate the primary replica at the time the node was stopped.</li> + * <li>This event will fire in the metastore thread.</li> * <li>If a lease prolongation occurs, this event will not fire.</li> * <li>When working from a primary replica, it is recommended to check whether it has become outdated using * {@link ReplicaMeta#getExpirationTime()}.</li> diff --git a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java index 1c4d728072..c5131fd3cb 100644 --- a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java +++ b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java @@ -69,6 +69,6 @@ public class TestPlacementDriver implements PlacementDriver { @Override public CompletableFuture<Void> previousPrimaryExpired(ReplicationGroupId grpId) { - return CompletableFuture.completedFuture(null); + return completedFuture(null); } } diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java index f896124d5a..66feec2163 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java @@ -156,7 +156,6 @@ public class ActiveActorTest extends IgniteAbstractTest { PlacementDriverManager placementDriverManager = new PlacementDriverManager( nodeName, - mock(Consumer.class), msm, GROUP_ID, clusterService, diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java index 90383760dd..cd37a2e0b9 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java @@ -38,7 +38,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.LongFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; @@ -269,7 +268,6 @@ public class MultiActorPlacementDriverTest extends BasePlacementDriverTest { var placementDriverManager = new PlacementDriverManager( nodeName, - (LongFunction<CompletableFuture<?>> function) -> metaStorageManager.registerRevisionUpdateListener(function::apply), metaStorageManager, MetastorageGroupId.INSTANCE, clusterService, diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java index 05658f7fd4..6e4ed69be9 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java @@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; -import java.util.function.LongFunction; import java.util.stream.Stream; import org.apache.ignite.internal.affinity.Assignment; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; @@ -199,7 +198,6 @@ public class PlacementDriverManagerTest extends BasePlacementDriverTest { placementDriverManager = new PlacementDriverManager( nodeName, - (LongFunction<CompletableFuture<?>> function) -> metaStorageManager.registerRevisionUpdateListener(function::apply), metaStorageManager, MetastorageGroupId.INSTANCE, clusterService, diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java index 835c951650..51526218e0 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java @@ -25,9 +25,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import java.util.function.LongFunction; import java.util.function.Supplier; -import org.apache.ignite.internal.causality.IncrementalVersionedValue; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.lang.ByteArray; @@ -87,9 +85,6 @@ public class PlacementDriverManager implements IgniteComponent { /** Lease updater. */ private final LeaseUpdater leaseUpdater; - /** Versioned value used only at manager startup for correct asynchronous start of internal components. */ - private final IncrementalVersionedValue<Void> startVv; - /** Meta Storage manager. */ private final MetaStorageManager metastore; @@ -97,7 +92,6 @@ public class PlacementDriverManager implements IgniteComponent { * Constructor. * * @param nodeName Node name. - * @param registry Registry for versioned values. * @param metastore Meta Storage manager. * @param replicationGroupId Id of placement driver group. * @param clusterService Cluster service. @@ -109,7 +103,6 @@ public class PlacementDriverManager implements IgniteComponent { */ public PlacementDriverManager( String nodeName, - Consumer<LongFunction<CompletableFuture<?>>> registry, MetaStorageManager metastore, ReplicationGroupId replicationGroupId, ClusterService clusterService, @@ -138,8 +131,6 @@ public class PlacementDriverManager implements IgniteComponent { leaseTracker, clock ); - - this.startVv = new IncrementalVersionedValue<>(registry); } @Override @@ -242,11 +233,7 @@ public class PlacementDriverManager implements IgniteComponent { return leaseUpdater.active(); } - /** - * Returns placement driver service. - * - * @return Placement driver service. - */ + /** Returns placement driver service. */ public PlacementDriver placementDriver() { return leaseTracker; } @@ -258,16 +245,6 @@ public class PlacementDriverManager implements IgniteComponent { long recoveryRevision = recoveryFinishedFuture.join(); - CompletableFuture<Void> startLeaserTrackerFuture = leaseTracker.startTrackAsync(recoveryRevision); - - // Forces to wait until recovery is complete before the metastore watches is deployed to avoid races with other components. - startVv.update(recoveryRevision, (unused, throwable) -> startLeaserTrackerFuture) - .whenComplete((unused, throwable) -> { - if (throwable != null) { - LOG.error("Error starting the PlacementDriverManager internal components", throwable); - } else { - LOG.debug("Internal components of the PlacementDriverManager started successfully"); - } - }); + leaseTracker.startTrack(recoveryRevision); } } diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java index df20b3d946..a9d5fe41b3 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java @@ -109,11 +109,11 @@ public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, Pri * * @param recoveryRevision Revision from {@link MetaStorageManager#recoveryFinishedFuture()}. */ - public CompletableFuture<Void> startTrackAsync(long recoveryRevision) { - return inBusyLock(busyLock, () -> { + public void startTrack(long recoveryRevision) { + inBusyLock(busyLock, () -> { msManager.registerPrefixWatch(PLACEMENTDRIVER_LEASES_KEY, updateListener); - return loadLeasesBusyAsync(recoveryRevision); + loadLeasesBusyAsync(recoveryRevision); }); } @@ -281,15 +281,11 @@ public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, Pri return primaryReplicaWaiters.computeIfAbsent(groupId, key -> new PendingIndependentComparableValuesTracker<>(MIN_VALUE)); } - private CompletableFuture<Void> loadLeasesBusyAsync(long recoveryRevision) { + private void loadLeasesBusyAsync(long recoveryRevision) { Entry entry = msManager.getLocally(PLACEMENTDRIVER_LEASES_KEY, recoveryRevision); - CompletableFuture<Void> loadLeasesFuture; - if (entry.empty() || entry.tombstone()) { leases = new Leases(Map.of(), BYTE_EMPTY_ARRAY); - - loadLeasesFuture = completedFuture(null); } else { byte[] leasesBytes = entry.value(); @@ -297,8 +293,6 @@ public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, Pri Map<ReplicationGroupId, Lease> leasesMap = new HashMap<>(); - List<CompletableFuture<?>> fireEventFutures = new ArrayList<>(); - leaseBatch.leases().forEach(lease -> { ReplicationGroupId grpId = lease.replicationGroupId(); @@ -306,22 +300,13 @@ public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, Pri if (lease.isAccepted()) { getOrCreatePrimaryReplicaWaiter(grpId).update(lease.getExpirationTime(), lease); - - // needFireEventReplicaBecomePrimary is not needed because we need to recover the last leases. - fireEventFutures.add(fireEventReplicaBecomePrimary(recoveryRevision, lease)); } - - firePrimaryReplicaExpiredEventIfNeed(recoveryRevision, lease); }); leases = new Leases(unmodifiableMap(leasesMap), leasesBytes); - - loadLeasesFuture = allOf(fireEventFutures.toArray(CompletableFuture[]::new)); } LOG.info("Leases cache recovered [leases={}]", leases); - - return loadLeasesFuture; } /** @@ -365,6 +350,6 @@ public class LeaseTracker extends AbstractEventProducer<PrimaryReplicaEvent, Pri private static boolean needFireEventReplicaBecomePrimary(@Nullable Lease previousLease, Lease newLease) { assert newLease.isAccepted() : newLease; - return previousLease == null || !previousLease.getStartTime().equals(newLease.getStartTime()); + return previousLease == null || !previousLease.isAccepted() || !previousLease.getStartTime().equals(newLease.getStartTime()); } } diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java index 49cf533c2a..8eb2516c41 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java @@ -133,7 +133,7 @@ public class PlacementDriverTest extends BaseIgniteAbstractTest { assertThat(recoveryFinishedFuture, willCompleteSuccessfully()); - placementDriver.startTrackAsync(recoveryFinishedFuture.join()); + placementDriver.startTrack(recoveryFinishedFuture.join()); assertThat("Watches were not deployed", metastore.deployWatches(), willCompleteSuccessfully()); } @@ -408,27 +408,6 @@ public class PlacementDriverTest extends BaseIgniteAbstractTest { checkReplicaBecomePrimaryEventParameters(LEASE_FROM_15_000_TO_30_000, parameters); } - @Test - void testListenReplicaBecomePrimaryEventOnStartPlacementDriver() { - long newRecoveryRevision = publishLease(LEASE_FROM_1_TO_5_000); - - placementDriver.stopTrack(); - - placementDriver = createPlacementDriver(); - - CompletableFuture<PrimaryReplicaEventParameters> eventParametersFuture = listenAnyReplicaBecomePrimaryEvent(); - - placementDriver.startTrackAsync(newRecoveryRevision); - - assertThat(eventParametersFuture, willCompleteSuccessfully()); - - PrimaryReplicaEventParameters parameters = eventParametersFuture.join(); - - assertThat(parameters.causalityToken(), equalTo(newRecoveryRevision)); - - checkReplicaBecomePrimaryEventParameters(LEASE_FROM_1_TO_5_000, parameters); - } - @Test void testListenReplicaBecomePrimaryEventCaseOnlyExpirationTimeShifted() { publishLease(LEASE_FROM_1_TO_5_000); diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java index a7d0280cc9..2fdde8028e 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java @@ -163,8 +163,6 @@ public class Replica { if (!leaderFuture.isDone()) { leaderFuture.complete(leaderRef); } - - listener.onBecomePrimary(clusterNode); } private CompletableFuture<ClusterNode> leaderFuture() { diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java index b15f8426f8..88a1937e97 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java @@ -20,11 +20,8 @@ package org.apache.ignite.internal.replicator.listener; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.replicator.ReplicaResult; import org.apache.ignite.internal.replicator.message.ReplicaRequest; -import org.apache.ignite.network.ClusterNode; -/** - * Replica listener. - */ +/** Replica listener. */ @FunctionalInterface public interface ReplicaListener { /** @@ -36,18 +33,7 @@ public interface ReplicaListener { */ CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String senderId); - /** - * Callback on becoming the primary replica. - * - * @param clusterNode Primary replica node. - */ - default void onBecomePrimary(ClusterNode clusterNode) { - // No-op. - } - - /** - * Callback on replica shutdown. - */ + /** Callback on replica shutdown. */ default void onShutdown() { // No-op. } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java index 1a9144f1f1..eb59ef50de 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java @@ -18,24 +18,22 @@ package org.apache.ignite.internal.runner.app; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.ignite.Ignite; import org.apache.ignite.IgnitionManager; import org.apache.ignite.InitParameters; @@ -43,7 +41,6 @@ import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.test.WatchListenerInhibitor; import org.apache.ignite.internal.testframework.IgniteAbstractTest; -import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.testframework.TestIgnitionManager; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; import org.apache.ignite.internal.util.IgniteUtils; @@ -51,6 +48,7 @@ import org.apache.ignite.lang.IgniteException; import org.apache.ignite.sql.ResultSet; import org.apache.ignite.sql.Session; import org.apache.ignite.sql.SqlRow; +import org.apache.ignite.table.Table; import org.apache.ignite.table.Tuple; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -62,14 +60,9 @@ import org.junit.jupiter.api.extension.ExtendWith; */ @ExtendWith(WorkDirectoryExtension.class) public class ItDataSchemaSyncTest extends IgniteAbstractTest { - /** - * Table name. - */ public static final String TABLE_NAME = "tbl1"; - /** - * Nodes bootstrap configuration. - */ + /** Nodes bootstrap configuration. */ private static final Map<String, String> nodesBootstrapCfg = Map.of( "node0", "{\n" + " \"network\": {\n" @@ -101,9 +94,6 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest { + "}" ); - /** - * Cluster nodes. - */ private final List<Ignite> clusterNodes = new ArrayList<>(); /** @@ -155,7 +145,7 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest { createTable(ignite0, TABLE_NAME); - TableImpl table = (TableImpl) ignite0.tables().table(TABLE_NAME); + TableImpl table = tableImpl(ignite0, TABLE_NAME); assertEquals(1, table.schemaView().schema().version()); @@ -165,7 +155,7 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest { alterTable(ignite0, TABLE_NAME); - table = (TableImpl) ignite2.tables().table(TABLE_NAME); + table = tableImpl(ignite2, TABLE_NAME); TableImpl table0 = table; assertTrue(waitForCondition(() -> table0.schemaView().schema().version() == 2, 5_000)); @@ -186,9 +176,9 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest { assertThat(ignite1Fut, willCompleteSuccessfully()); - ignite1 = (IgniteImpl) ignite1Fut.get(); + ignite1 = (IgniteImpl) ignite1Fut.join(); - table = (TableImpl) ignite1.tables().table(TABLE_NAME); + table = tableImpl(ignite1, TABLE_NAME); TableImpl table1 = table; assertTrue(waitForCondition(() -> table1.schemaView().schema().version() == 2, 5_000)); @@ -198,7 +188,7 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest { * Check that sql query will wait until appropriate schema is not propagated into all nodes. */ @Test - public void queryWaitAppropriateSchema() throws Exception { + public void queryWaitAppropriateSchema() { Ignite ignite0 = clusterNodes.get(0); IgniteImpl ignite1 = (IgniteImpl) clusterNodes.get(1); @@ -210,33 +200,24 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest { sql(ignite0, "CREATE INDEX idx1 ON " + TABLE_NAME + "(valint)"); - CompletableFuture<Void> fut = CompletableFuture.runAsync(() -> sql(ignite0, "SELECT * FROM " - + TABLE_NAME + " WHERE valint > 0")); - - try { - // wait a timeout to observe that query can`t be executed. - fut.get(1, TimeUnit.SECONDS); - - fail(); - } catch (TimeoutException e) { - // Expected, no op. - } + assertThat( + runAsync(() -> sql(ignite0, "SELECT * FROM " + TABLE_NAME + " WHERE valint > 0")), + willTimeoutIn(1, TimeUnit.SECONDS) + ); listenerInhibitor.stopInhibit(); // only check that request is executed without timeout. - ResultSet<SqlRow> rs = sql(ignite0, "SELECT * FROM " + TABLE_NAME + " WHERE valint > 0"); - - assertNotNull(rs); - - rs.close(); + try (ResultSet<SqlRow> rs = sql(ignite0, "SELECT * FROM " + TABLE_NAME + " WHERE valint > 0")) { + assertNotNull(rs); + } } /** * Test correctness of schemes recovery after node restart. */ @Test - public void checkSchemasCorrectlyRestore() throws Exception { + public void checkSchemasCorrectlyRestore() { Ignite ignite1 = clusterNodes.get(1); sql(ignite1, "CREATE TABLE " + TABLE_NAME + "(key BIGINT PRIMARY KEY, valint1 INT, valint2 INT)"); @@ -260,7 +241,9 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest { .map(e -> TestIgnitionManager.start(e.getKey(), e.getValue(), workDir.resolve(e.getKey()))) .findFirst().get(); - ignite1 = ignite1Fut.get(); + assertThat(ignite1Fut, willCompleteSuccessfully()); + + ignite1 = ignite1Fut.join(); try (Session ses = ignite1.sql().createSession()) { ResultSet<SqlRow> res = ses.execute(null, "SELECT valint2 FROM tbl1"); @@ -277,6 +260,8 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest { sql(ignite1, "ALTER TABLE " + TABLE_NAME + " ADD COLUMN valint5 INT"); + res.close(); + res = ses.execute(null, "SELECT sum(valint4) FROM tbl1"); assertEquals(10L * (10 + 19) / 2, res.next().iterator().next()); @@ -296,7 +281,7 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest { createTable(ignite0, TABLE_NAME); - TableImpl table = (TableImpl) ignite0.tables().table(TABLE_NAME); + TableImpl table = tableImpl(ignite0, TABLE_NAME); assertEquals(1, table.schemaView().schema().version()); @@ -321,12 +306,12 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest { continue; } - TableImpl tableOnNode = (TableImpl) node.tables().table(TABLE_NAME); + TableImpl tableOnNode = tableImpl(node, TABLE_NAME); - waitForCondition(() -> tableOnNode.schemaView().lastSchemaVersion() == 2, 10_000); + assertTrue(waitForCondition(() -> tableOnNode.schemaView().lastSchemaVersion() == 2, 10_000)); } - CompletableFuture<?> insertFut = IgniteTestUtils.runAsync(() -> { + CompletableFuture<?> insertFut = runAsync(() -> { for (int i = 10; i < 20; i++) { table.recordView().insert( null, @@ -340,8 +325,10 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest { } ); - IgniteException ex = assertThrows(IgniteException.class, () -> await(insertFut)); - assertThat(ex.getMessage(), containsString("Replication is timed out")); + assertThat( + insertFut, + willThrow(IgniteException.class, 30, TimeUnit.SECONDS, "Replication is timed out") + ); } /** @@ -365,4 +352,12 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest { } return rs; } + + private static TableImpl tableImpl(Ignite ignite, String tableName) { + CompletableFuture<Table> tableFuture = ignite.tables().tableAsync(tableName); + + assertThat(tableFuture, willCompleteSuccessfully()); + + return (TableImpl) tableFuture.join(); + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java index 60aa23c679..2425e508af 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItBuildIndexTest.java @@ -96,7 +96,7 @@ public class ItBuildIndexTest extends ClusterPerClassIntegrationTest { } @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-20330") + @Disabled("https://issues.apache.org/jira/browse/IGNITE-20525") void testChangePrimaryReplicaOnMiddleBuildIndex() throws Exception { prepareBuildIndexToChangePrimaryReplica(); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index e9210418d7..9b2ef9eaf1 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -84,6 +84,7 @@ import org.apache.ignite.internal.deployunit.metastore.DeploymentUnitStoreImpl; import org.apache.ignite.internal.distributionzones.DistributionZoneManager; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.index.IndexBuildController; import org.apache.ignite.internal.index.IndexManager; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; @@ -131,6 +132,7 @@ import org.apache.ignite.internal.storage.DataStorageModules; import org.apache.ignite.internal.systemview.SystemViewManagerImpl; import org.apache.ignite.internal.table.distributed.TableManager; import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.internal.table.distributed.index.IndexBuilder; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; import org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnActionRequest; import org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries; @@ -304,6 +306,9 @@ public class IgniteImpl implements Ignite { /** System views manager. */ private final SystemViewManagerImpl systemViewManager; + /** Index build controller. */ + private final IndexBuildController indexBuildController; + /** * The Constructor. * @@ -462,7 +467,6 @@ public class IgniteImpl implements Ignite { placementDriverMgr = new PlacementDriverManager( name, - registry, metaStorageMgr, MetastorageGroupId.INSTANCE, clusterSvc, @@ -569,6 +573,17 @@ public class IgniteImpl implements Ignite { indexManager = new IndexManager(schemaManager, distributedTblMgr, catalogManager, metaStorageMgr, registry); + IndexBuilder indexBuilder = new IndexBuilder(name, Runtime.getRuntime().availableProcessors(), replicaSvc); + + indexBuildController = new IndexBuildController( + indexBuilder, + indexManager, + catalogManager, + clusterSvc, + placementDriverMgr.placementDriver(), + clock + ); + qryEngine = new SqlQueryProcessor( registry, clusterSvc, @@ -766,10 +781,10 @@ public class IgniteImpl implements Ignite { // Start all other components after the join request has completed and the node has been validated. try { lifecycleManager.startComponents( + catalogManager, clusterCfgMgr, placementDriverMgr, metricManager, - catalogManager, distributionZoneManager, computeComponent, replicaMgr, @@ -780,6 +795,7 @@ public class IgniteImpl implements Ignite { outgoingSnapshotsManager, distributedTblMgr, indexManager, + indexBuildController, qryEngine, clientHandlerModule, deploymentManager @@ -867,11 +883,6 @@ public class IgniteImpl implements Ignite { return qryEngine; } - @TestOnly - public IndexManager indexManager() { - return indexManager; - } - @TestOnly public MetaStorageManager metaStorageManager() { return metaStorageMgr; diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java index 03083b3ce6..8a1c6edc7b 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.properties.IgniteProductVersion; import org.apache.ignite.lang.ErrorGroups; +import org.apache.ignite.lang.ErrorGroups.Common; import org.apache.ignite.lang.IgniteException; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -103,13 +104,16 @@ public class IgnitionImpl implements Ignition { ); } - /** {@inheritDoc} */ @Override public void stop(String nodeName) { readyForInitNodes.remove(nodeName); nodes.computeIfPresent(nodeName, (name, node) -> { - node.stop(); + try { + node.stop(); + } catch (Exception e) { + throw new IgniteException(Common.NODE_STOPPING_ERR, e); + } return null; }); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java index 4c36f81d61..08c7bcee7a 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java @@ -40,11 +40,9 @@ import org.apache.ignite.internal.replicator.ReplicaResult; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.storage.MvPartitionStorage; -import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.distributed.IndexLocker; import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage; -import org.apache.ignite.internal.table.distributed.index.IndexBuilder; import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver; import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; @@ -135,8 +133,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut StorageUpdateHandler storageUpdateHandler, Schemas schemas, ClusterNode localNode, - MvTableStorage mvTableStorage, - IndexBuilder indexBuilder, SchemaSyncService schemaSyncService, CatalogService catalogService, PlacementDriver placementDriver @@ -159,8 +155,6 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage extends ItTxDistribut storageUpdateHandler, schemas, localNode, - mvTableStorage, - indexBuilder, schemaSyncService, catalogService, placementDriver diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 4a441e8c72..30cb2fcbc0 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -137,7 +137,6 @@ import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; import org.apache.ignite.internal.table.distributed.gc.MvGc; -import org.apache.ignite.internal.table.distributed.index.IndexBuilder; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.message.HasDataRequest; import org.apache.ignite.internal.table.distributed.message.HasDataResponse; @@ -337,8 +336,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private final LowWatermark lowWatermark; - private final IndexBuilder indexBuilder; - private final Marshaller raftCommandsMarshaller; private final HybridTimestampTracker observableTimestampTracker; @@ -477,8 +474,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { lowWatermark = new LowWatermark(nodeName, gcConfig.lowWatermark(), clock, txManager, vaultManager, mvGc); - indexBuilder = new IndexBuilder(nodeName, cpus, replicaSvc); - raftCommandsMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()); startVv = new IncrementalVersionedValue<>(registry); @@ -951,8 +946,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { partitionUpdateHandlers.storageUpdateHandler, new NonHistoricSchemas(schemaManager), localNode(), - table.internalTable().storage(), - indexBuilder, schemaSyncService, catalogService, placementDriver @@ -1062,7 +1055,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { cleanUpTablesResources(tablesToStop); try { - IgniteUtils.closeAllManually(lowWatermark, mvGc, indexBuilder); + IgniteUtils.closeAllManually(lowWatermark, mvGc); } catch (Throwable t) { LOG.error("Failed to close internal components", t); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java index 533fee7115..b50f51e991 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexBuilder.java @@ -81,7 +81,7 @@ public class IndexBuilder implements ManuallyCloseable { } /** - * Starts building the index if it is not already built or is not yet in progress. + * Schedules building the index if it is not already built or is not yet in progress. * * <p>Index is built in batches using {@link BuildIndexReplicaRequest}, which are then transformed into {@link BuildIndexCommand} on the * replica, batches are sent sequentially.</p> @@ -96,7 +96,7 @@ public class IndexBuilder implements ManuallyCloseable { * @param node Node to which requests to build the index will be sent. */ // TODO: IGNITE-19498 Perhaps we need to start building the index only once - public void startBuildIndex( + public void scheduleBuildIndex( int tableId, int partitionId, int indexId, diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java index 83e895f592..957bffe72f 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BuildIndexReplicaRequest.java @@ -19,11 +19,16 @@ package org.apache.ignite.internal.table.distributed.replication.request; import java.util.List; import java.util.UUID; +import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.network.annotations.Transferable; -/** Replica request to build a table index. */ +/** + * Replica request to build a table index. + * + * <p>It is possible to receive a {@link PrimaryReplicaMissException} in response to message processing if the leaseholder changes.</p> + */ @Transferable(TableMessageGroup.BUILD_INDEX_REPLICA_REQUEST) public interface BuildIndexReplicaRequest extends ReplicaRequest { /** Returns index ID. */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index e01f6a98d5..ee6b2183a8 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -24,8 +24,6 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE; -import static org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_DROP; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.tx.TxState.ABANDONED; @@ -61,7 +59,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -69,11 +66,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.ignite.internal.binarytuple.BinaryTupleCommon; import org.apache.ignite.internal.catalog.CatalogService; -import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; -import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters; -import org.apache.ignite.internal.catalog.events.DropIndexEventParameters; -import org.apache.ignite.internal.event.EventListener; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; @@ -104,13 +97,11 @@ import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.PartitionTimestampCursor; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; -import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.storage.index.BinaryTupleComparator; import org.apache.ignite.internal.storage.index.IndexRow; import org.apache.ignite.internal.storage.index.IndexRowImpl; import org.apache.ignite.internal.storage.index.IndexStorage; import org.apache.ignite.internal.storage.index.SortedIndexStorage; -import org.apache.ignite.internal.storage.index.StorageIndexDescriptor; import org.apache.ignite.internal.table.distributed.IndexLocker; import org.apache.ignite.internal.table.distributed.SortedIndexLocker; import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; @@ -123,7 +114,6 @@ import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder; -import org.apache.ignite.internal.table.distributed.index.IndexBuilder; import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage; import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest; @@ -240,31 +230,16 @@ public class PartitionReplicaListener implements ReplicaListener { /** Instance of the local node. */ private final ClusterNode localNode; - /** Table storage. */ - private final MvTableStorage mvTableStorage; - - /** Index builder. */ - private final IndexBuilder indexBuilder; - private final SchemaSyncService schemaSyncService; private final CatalogService catalogService; - /** Listener for creating an index in catalog, {@code null} if the replica is not the leader. */ - private final AtomicReference<EventListener<CreateIndexEventParameters>> createIndexListener = new AtomicReference<>(); - - /** Listener for dropping an index in catalog, {@code null} if the replica is not the leader. */ - private final AtomicReference<EventListener<DropIndexEventParameters>> dropIndexListener = new AtomicReference<>(); - /** Busy lock to stop synchronously. */ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); /** Prevents double stopping. */ private final AtomicBoolean stopGuard = new AtomicBoolean(); - /** Flag indicates whether the current replica is the primary. */ - private volatile boolean primary; - /** Placement driver. */ private final PlacementDriver placementDriver; @@ -286,8 +261,6 @@ public class PartitionReplicaListener implements ReplicaListener { * @param transactionStateResolver Transaction state resolver. * @param storageUpdateHandler Handler that processes updates writing them to storage. * @param localNode Instance of the local node. - * @param mvTableStorage Table storage. - * @param indexBuilder Index builder. * @param catalogService Catalog service. * @param placementDriver Placement driver. */ @@ -309,8 +282,6 @@ public class PartitionReplicaListener implements ReplicaListener { StorageUpdateHandler storageUpdateHandler, Schemas schemas, ClusterNode localNode, - MvTableStorage mvTableStorage, - IndexBuilder indexBuilder, SchemaSyncService schemaSyncService, CatalogService catalogService, PlacementDriver placementDriver @@ -329,8 +300,6 @@ public class PartitionReplicaListener implements ReplicaListener { this.transactionStateResolver = transactionStateResolver; this.storageUpdateHandler = storageUpdateHandler; this.localNode = localNode; - this.mvTableStorage = mvTableStorage; - this.indexBuilder = indexBuilder; this.schemaSyncService = schemaSyncService; this.catalogService = catalogService; this.placementDriver = placementDriver; @@ -352,7 +321,7 @@ public class PartitionReplicaListener implements ReplicaListener { for (UUID txId : txCleanupReadyFutures.keySet()) { txCleanupReadyFutures.compute(txId, (id, txOps) -> { - if (txOps == null || TxState.isFinalState(txOps.state)) { + if (txOps == null || isFinalState(txOps.state)) { return null; } @@ -2638,9 +2607,7 @@ public class PartitionReplicaListener implements ReplicaListener { return placementDriver.getPrimaryReplica(replicationGroupId, now) .thenApply(primaryReplica -> (primaryReplica != null && isLocalPeer(primaryReplica.getLeaseholder()))); } else if (request instanceof BuildIndexReplicaRequest) { - // TODO: IGNITE-20330 Possibly replaced by placementDriver#getPrimaryReplica and should also be added to the documentation - // about PrimaryReplicaMissException - return placementDriver.awaitPrimaryReplica(replicationGroupId, now, 30, SECONDS) + return placementDriver.awaitPrimaryReplica(replicationGroupId, now, AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS) .thenCompose(replicaMeta -> { if (isLocalPeer(replicaMeta.getLeaseholder())) { return completedFuture(null); @@ -3036,31 +3003,6 @@ public class PartitionReplicaListener implements ReplicaListener { TxState state = PENDING; } - @Override - public void onBecomePrimary(ClusterNode clusterNode) { - inBusyLockNoException(() -> { - if (clusterNode.equals(localNode)) { - if (primary) { - // Current replica has already become the primary, we do not need to do anything. - return; - } - - primary = true; - - startBuildIndexes(); - } else { - if (!primary) { - // Current replica was not the primary replica, we do not need to do anything. - return; - } - - primary = false; - - stopBuildIndexes(); - } - }); - } - @Override public void onShutdown() { if (!stopGuard.compareAndSet(false, true)) { @@ -3068,50 +3010,6 @@ public class PartitionReplicaListener implements ReplicaListener { } busyLock.block(); - - stopBuildIndexes(); - } - - private void registerIndexesListener() { - // TODO: IGNITE-19498 Might need to listen to something else - EventListener<CreateIndexEventParameters> createIndexListener = (parameters, exception) -> inBusyLockAsync(busyLock, () -> { - assert exception == null : parameters; - - int tableId = parameters.indexDescriptor().tableId(); - - if (tableId() == tableId) { - CatalogTableDescriptor tableDescriptor = getTableDescriptor(tableId, parameters.catalogVersion()); - - startBuildIndex(StorageIndexDescriptor.create(tableDescriptor, parameters.indexDescriptor())); - } - - return completedFuture(false); - }); - - EventListener<DropIndexEventParameters> dropIndexListener = (parameters, exception) -> inBusyLockAsync(busyLock, () -> { - assert exception == null : parameters; - - if (tableId() == parameters.tableId()) { - indexBuilder.stopBuildIndex(tableId(), partId(), parameters.indexId()); - } - - return completedFuture(false); - }); - - boolean casResult = this.createIndexListener.compareAndSet(null, createIndexListener) - && this.dropIndexListener.compareAndSet(null, dropIndexListener); - - assert casResult : replicationGroupId; - - catalogService.listen(INDEX_CREATE, createIndexListener); - catalogService.listen(INDEX_DROP, dropIndexListener); - } - - private void startBuildIndex(StorageIndexDescriptor indexDescriptor) { - // TODO: IGNITE-19112 We only need to create the index storage once - IndexStorage indexStorage = mvTableStorage.getOrCreateIndex(partId(), indexDescriptor); - - indexBuilder.startBuildIndex(tableId(), partId(), indexDescriptor.id(), indexStorage, mvDataStorage, localNode); } private int partId() { @@ -3126,53 +3024,6 @@ public class PartitionReplicaListener implements ReplicaListener { return localNode.name().equals(nodeName); } - private void inBusyLockNoException(Runnable runnable) { - if (!busyLock.enterBusy()) { - // This method does not throw a NodeStoppingException to avoid killing JRaft. - // It's expected that the code will be rewritten together with index creation redesign. - // TODO: https://issues.apache.org/jira/browse/IGNITE-20330 - return; - } - - try { - runnable.run(); - } finally { - busyLock.leaveBusy(); - } - } - - private void startBuildIndexes() { - registerIndexesListener(); - - // Let's try to build an index for the previously created indexes for the table. - int catalogVersion = catalogService.latestCatalogVersion(); - - for (CatalogIndexDescriptor indexDescriptor : catalogService.indexes(catalogVersion)) { - if (indexDescriptor.tableId() != tableId()) { - continue; - } - - CatalogTableDescriptor tableDescriptor = getTableDescriptor(indexDescriptor.tableId(), catalogVersion); - - startBuildIndex(StorageIndexDescriptor.create(tableDescriptor, indexDescriptor)); - } - } - - private void stopBuildIndexes() { - EventListener<CreateIndexEventParameters> createIndexListener = this.createIndexListener.getAndSet(null); - EventListener<DropIndexEventParameters> dropIndexListener = this.dropIndexListener.getAndSet(null); - - if (createIndexListener != null) { - catalogService.removeListener(INDEX_CREATE, createIndexListener); - } - - if (dropIndexListener != null) { - catalogService.removeListener(INDEX_DROP, dropIndexListener); - } - - indexBuilder.stopBuildingIndexes(tableId(), partId()); - } - /** * Marks the transaction as finished in local tx state map. * diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java index e652b2cab2..c972553a1d 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.table.distributed.replication; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; -import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT; import static org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener.tablePartitionId; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.hamcrest.MatcherAssert.assertThat; @@ -66,7 +65,6 @@ import org.apache.ignite.internal.schema.marshaller.KvMarshaller; import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage; -import org.apache.ignite.internal.storage.impl.TestMvTableStorage; import org.apache.ignite.internal.storage.index.SortedIndexStorage; import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor; import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor; @@ -82,7 +80,6 @@ import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; import org.apache.ignite.internal.table.distributed.TableMessagesFactory; import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage; import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; -import org.apache.ignite.internal.table.distributed.index.IndexBuilder; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; @@ -239,8 +236,6 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest ), new DummySchemas(schemaManager), localNode, - new TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT), - mock(IndexBuilder.class), new AlwaysSyncedSchemaSyncService(), catalogService, new TestPlacementDriver(localNode.name()) diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 6ddd3274af..9985d15830 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -21,7 +21,6 @@ import static java.util.Collections.singletonList; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN; -import static org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow; import static org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast; @@ -112,7 +111,6 @@ import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshal import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage; -import org.apache.ignite.internal.storage.impl.TestMvTableStorage; import org.apache.ignite.internal.storage.index.IndexRowImpl; import org.apache.ignite.internal.storage.index.SortedIndexStorage; import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor; @@ -135,7 +133,6 @@ import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMess import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand; import org.apache.ignite.internal.table.distributed.command.UpdateCommand; import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; -import org.apache.ignite.internal.table.distributed.index.IndexBuilder; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage; @@ -487,8 +484,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { ), schemas, localNode, - new TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT), - mock(IndexBuilder.class), schemaSyncService, catalogService, new TestPlacementDriver(localNode.name()) diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index eaab89c75b..7ec68e8fab 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -100,7 +100,6 @@ import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; import org.apache.ignite.internal.table.distributed.TableMessageGroup; import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage; import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; -import org.apache.ignite.internal.table.distributed.index.IndexBuilder; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; import org.apache.ignite.internal.table.distributed.raft.PartitionListener; @@ -518,8 +517,6 @@ public class ItTxTestCluster { storageUpdateHandler, new DummySchemas(schemaManager), consistentIdToNode.apply(assignment), - mvTableStorage, - mock(IndexBuilder.class), new AlwaysSyncedSchemaSyncService(), catalogService, placementDriver @@ -611,11 +608,10 @@ public class ItTxTestCluster { StorageUpdateHandler storageUpdateHandler, Schemas schemas, ClusterNode localNode, - MvTableStorage mvTableStorage, - IndexBuilder indexBuilder, SchemaSyncService schemaSyncService, CatalogService catalogService, - PlacementDriver placementDriver) { + PlacementDriver placementDriver + ) { return new PartitionReplicaListener( mvDataStorage, raftClient, @@ -634,8 +630,6 @@ public class ItTxTestCluster { storageUpdateHandler, schemas, localNode, - mvTableStorage, - indexBuilder, schemaSyncService, catalogService, placementDriver diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java index c011a766f7..dcc78df7cd 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TableTestUtils.java @@ -28,7 +28,9 @@ import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.catalog.commands.ColumnParams; import org.apache.ignite.internal.catalog.commands.CreateHashIndexCommand; import org.apache.ignite.internal.catalog.commands.CreateTableCommand; +import org.apache.ignite.internal.catalog.commands.DropIndexCommand; import org.apache.ignite.internal.catalog.commands.DropTableCommand; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; import org.jetbrains.annotations.Nullable; @@ -79,6 +81,20 @@ public class TableTestUtils { ); } + /** + * Drops index in the catalog. + * + * @param catalogManager Catalog manager. + * @param schemaName Schema name. + * @param indexName Index name. + */ + public static void dropIndex(CatalogManager catalogManager, String schemaName, String indexName) { + assertThat( + catalogManager.execute(DropIndexCommand.builder().schemaName(schemaName).indexName(indexName).build()), + willCompleteSuccessfully() + ); + } + /** * Creates hash index in the catalog. * @@ -159,4 +175,33 @@ public class TableTestUtils { public static int getTableIdStrict(CatalogService catalogService, String tableName, long timestamp) { return getTableStrict(catalogService, tableName, timestamp).id(); } + + /** + * Returns index ID form catalog, {@code null} if table is absent. + * + * @param catalogService Catalog service. + * @param indexName Index name. + * @param timestamp Timestamp. + */ + public static @Nullable Integer getIndexId(CatalogService catalogService, String indexName, long timestamp) { + CatalogIndexDescriptor index = catalogService.index(indexName, timestamp); + + return index == null ? null : index.id(); + } + + /** + * Returns index ID from catalog. + * + * @param catalogService Catalog service. + * @param indexName Index name. + * @param timestamp Timestamp. + * @throws AssertionError If table is absent. + */ + public static int getIndexIdStrict(CatalogService catalogService, String indexName, long timestamp) { + Integer indexId = getIndexId(catalogService, indexName, timestamp); + + assertNotNull(indexId, "indexName=" + indexName + ", timestamp=" + timestamp); + + return indexId; + } } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index 1346769f4d..8b6abd99cf 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -79,7 +79,6 @@ import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier; import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage; import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; -import org.apache.ignite.internal.table.distributed.index.IndexBuilder; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; import org.apache.ignite.internal.table.distributed.raft.PartitionListener; @@ -383,8 +382,6 @@ public class DummyInternalTableImpl extends InternalTableImpl { storageUpdateHandler, new DummySchemas(schemaManager), LOCAL_NODE, - mock(MvTableStorage.class), - mock(IndexBuilder.class), new AlwaysSyncedSchemaSyncService(), catalogService, new TestPlacementDriver(LOCAL_NODE.name())