This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 60b76cf961 IGNITE-22682 Use IndexMeta in FullStateTransferIndexChooser
(#4067)
60b76cf961 is described below
commit 60b76cf96101cee1ac3d686c3f93cfd0e24b5914
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Jul 11 11:49:28 2024 +0300
IGNITE-22682 Use IndexMeta in FullStateTransferIndexChooser (#4067)
---
.../internal/catalog/events/CatalogEvent.java | 6 +-
.../apache/ignite/internal/util/IgniteUtils.java | 8 +-
.../internal/table/distributed/TableManager.java | 2 +-
.../table/distributed/index/IndexMetaStorage.java | 9 +-
.../snapshot/FullStateTransferIndexChooser.java | 191 +++++----------------
.../distributed/TableManagerRecoveryTest.java | 48 +++---
.../table/distributed/TableManagerTest.java | 35 ++--
.../index/BaseIndexMetaStorageTest.java | 2 +-
.../distributed/index/IndexMetaStorageTest.java | 21 +++
.../FullStateTransferIndexChooserTest.java | 37 +++-
10 files changed, 158 insertions(+), 201 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
index 8157cd4a51..f4c41bd5ad 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
@@ -70,9 +70,9 @@ public enum CatalogEvent implements Event {
INDEX_STOPPING,
/**
- * Fired when an index is removed from the Catalog. This happens when an
index that never was {@link CatalogIndexStatus#AVAILABLE}
- * gets dropped, or when an index that is {@link
CatalogIndexStatus#STOPPING} is finished with and we don't need to keep it in
- * the Catalog anymore, or when an index gets dropped because its table
gets dropped.
+ * Fired when an index is removed from the Catalog. This happens when an
index that never was {@link CatalogIndexStatus#AVAILABLE}
+ * gets dropped, or when an index that is {@link
CatalogIndexStatus#STOPPING} is finished with and we don't need to keep it in
+ * the Catalog anymore, or when an index gets dropped because its table
gets dropped.
*
* @see RemoveIndexEventParameters
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index b92a4f2c41..2f9eed2924 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -1187,10 +1187,10 @@ public class IgniteUtils {
* Asynchronously starts all ignite components.
*
* @param componentContext Ignite component context.
- * @param components Array of ignite components to start.
+ * @param components Array of ignite components to start, may contain
{@code null} elements.
* @return CompletableFuture that will be completed when all components
are started.
*/
- public static CompletableFuture<Void> startAsync(ComponentContext
componentContext, IgniteComponent... components) {
+ public static CompletableFuture<Void> startAsync(ComponentContext
componentContext, @Nullable IgniteComponent... components) {
return startAsync(componentContext, Stream.of(components));
}
@@ -1243,10 +1243,10 @@ public class IgniteUtils {
* Asynchronously stops all ignite components.
*
* @param componentContext Ignite component context.
- * @param components Array of ignite components to stop.
+ * @param components Array of ignite components to stop, may contain
{@code null} elements.
* @return CompletableFuture that will be completed when all components
are stopped.
*/
- public static CompletableFuture<Void> stopAsync(ComponentContext
componentContext, IgniteComponent... components) {
+ public static CompletableFuture<Void> stopAsync(ComponentContext
componentContext, @Nullable IgniteComponent... components) {
return stopAsync(componentContext, Stream.of(components));
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index da8639b6a8..f93b4f6f2c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -574,7 +574,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER
);
- fullStateTransferIndexChooser = new
FullStateTransferIndexChooser(catalogService, lowWatermark);
+ fullStateTransferIndexChooser = new
FullStateTransferIndexChooser(catalogService, lowWatermark, indexMetaStorage);
}
@Override
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
index 88122388a6..34feaed67e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.index;
+import static java.util.Collections.unmodifiableCollection;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toMap;
@@ -82,7 +83,6 @@ import
org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
/**
* Local storage of {@link IndexMeta index metadata}, based on a {@link
CatalogIndexDescriptor} and stored in metastore, is responsible for
@@ -201,10 +201,9 @@ public class IndexMetaStorage implements IgniteComponent {
return indexMetaByIndexId.get(indexId);
}
- /** Returns a snapshot of all {@link IndexMeta}s. */
- @TestOnly
- Collection<IndexMeta> indexMetasSnapshot() {
- return List.copyOf(indexMetaByIndexId.values());
+ /** Returns a view of all index meta. */
+ public Collection<IndexMeta> indexMetas() {
+ return unmodifiableCollection(indexMetaByIndexId.values());
}
private CompletableFuture<Boolean>
onCatalogIndexCreateEvent(CreateIndexEventParameters parameters) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java
index e2ff50cc89..427e768a35 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooser.java
@@ -20,42 +20,27 @@ package
org.apache.ignite.internal.table.distributed.raft.snapshot;
import static java.util.Comparator.comparingInt;
import static java.util.stream.Collectors.toCollection;
import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
-import static
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
import static
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.REGISTERED;
-import static
org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.STOPPING;
-import static
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_REMOVED;
import static org.apache.ignite.internal.event.EventListener.fromConsumer;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
-import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
-import static org.apache.ignite.internal.util.CollectionUtils.difference;
import static org.apache.ignite.internal.util.CollectionUtils.view;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
-import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
-import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -63,6 +48,9 @@ import org.apache.ignite.internal.lowwatermark.LowWatermark;
import
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.index.IndexMeta;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import org.apache.ignite.internal.table.distributed.index.MetaIndexStatus;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
/** Index chooser for full state transfer. */
@@ -79,16 +67,17 @@ public class FullStateTransferIndexChooser implements
ManuallyCloseable {
.thenComparingInt(ReadOnlyIndexInfo::indexId)
);
- private final Map<Integer, Integer> tableVersionByIndexId = new
ConcurrentHashMap<>();
-
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
private final AtomicBoolean closeGuard = new AtomicBoolean();
+ private final IndexMetaStorage indexMetaStorage;
+
/** Constructor. */
- public FullStateTransferIndexChooser(CatalogService catalogService,
LowWatermark lowWatermark) {
+ public FullStateTransferIndexChooser(CatalogService catalogService,
LowWatermark lowWatermark, IndexMetaStorage indexMetaStorage) {
this.catalogService = catalogService;
this.lowWatermark = lowWatermark;
+ this.indexMetaStorage = indexMetaStorage;
}
/** Starts the component. */
@@ -257,7 +246,6 @@ public class FullStateTransferIndexChooser implements
ManuallyCloseable {
}
private void addListenersBusy() {
- catalogService.listen(INDEX_CREATE,
fromConsumer(this::onIndexCreated));
catalogService.listen(INDEX_REMOVED,
fromConsumer(this::onIndexRemoved));
lowWatermark.listen(LOW_WATERMARK_CHANGED,
fromConsumer(this::onLwmChanged));
@@ -273,137 +261,37 @@ public class FullStateTransferIndexChooser implements
ManuallyCloseable {
if (catalogVersion <= lwmCatalogVersion) {
// There is no need to add a read-only indexes, since the
index should be destroyed under the updated low watermark.
- tableVersionByIndexId.remove(indexId);
- } else {
- CatalogIndexDescriptor index = indexBusy(indexId,
catalogVersion - 1);
-
- if (index.status() == AVAILABLE) {
- // On drop table event.
- readOnlyIndexes.add(new ReadOnlyIndexInfo(index,
catalogActivationTimestampBusy(catalogVersion), catalogVersion));
- } else if (index.status() == STOPPING) {
- readOnlyIndexes.add(
- new ReadOnlyIndexInfo(index,
findStoppingActivationTsBusy(indexId, catalogVersion - 1), catalogVersion)
- );
- } else {
- // Index that is dropped before even becoming
available.
- tableVersionByIndexId.remove(indexId);
- }
+ return;
}
- });
- });
- }
- private void onIndexCreated(CreateIndexEventParameters parameters) {
- inBusyLock(busyLock, () -> {
- CatalogIndexDescriptor index = parameters.indexDescriptor();
-
- int tableVersion = tableVersionBusy(index,
parameters.catalogVersion());
+ IndexMeta indexMeta = indexMetaStorage.indexMeta(indexId);
- tableVersionByIndexId.put(index.id(), tableVersion);
- });
- }
+ assert indexMeta != null : "indexId=" + indexId + ",
catalogVersion=" + catalogVersion;
- private long catalogActivationTimestampBusy(int catalogVersion) {
- Catalog catalog = catalogService.catalog(catalogVersion);
-
- assert catalog != null : catalogVersion;
-
- return catalog.time();
- }
-
- // TODO: IGNITE-21771 Deal with catalog compaction
- private void recoverStructuresBusy() {
- int earliestCatalogVersion = catalogService.earliestCatalogVersion();
- int latestCatalogVersion = catalogService.latestCatalogVersion();
- int lwmCatalogVersion =
catalogService.activeCatalogVersion(hybridTimestampToLong(lowWatermark.getLowWatermark()));
-
- var tableVersionByIndexId = new HashMap<Integer, Integer>();
- var readOnlyIndexes = new HashSet<ReadOnlyIndexInfo>();
-
- var stoppingActivationTsByIndexId = new HashMap<Integer, Long>();
- var previousCatalogVersionIndexIds = Set.<Integer>of();
-
- for (int catalogVersion = earliestCatalogVersion; catalogVersion <=
latestCatalogVersion; catalogVersion++) {
- int finalCatalogVersion = catalogVersion;
-
- var indexIds = new HashSet<Integer>();
-
- catalogService.indexes(finalCatalogVersion).forEach(index -> {
- tableVersionByIndexId.computeIfAbsent(index.id(), i ->
tableVersionBusy(index, finalCatalogVersion));
-
- if (index.status() == STOPPING) {
- stoppingActivationTsByIndexId.computeIfAbsent(index.id(),
i -> catalogActivationTimestampBusy(finalCatalogVersion));
+ if (indexMeta.status() == MetaIndexStatus.READ_ONLY) {
+ readOnlyIndexes.add(toReadOnlyIndexInfo(indexMeta));
}
-
- indexIds.add(index.id());
});
-
- // We are looking for removed indexes.
- difference(previousCatalogVersionIndexIds, indexIds).stream()
- .map(indexId -> catalogService.index(indexId,
finalCatalogVersion - 1))
- .forEach(index -> {
- if (index.status() == STOPPING && finalCatalogVersion
> lwmCatalogVersion) {
- readOnlyIndexes.add(
- new ReadOnlyIndexInfo(index,
stoppingActivationTsByIndexId.get(index.id()), finalCatalogVersion)
- );
- } else if (index.status() == AVAILABLE &&
finalCatalogVersion > lwmCatalogVersion) {
- // Drop table case.
- readOnlyIndexes.add(
- new ReadOnlyIndexInfo(index,
catalogActivationTimestampBusy(finalCatalogVersion), finalCatalogVersion)
- );
- } else {
- tableVersionByIndexId.remove(index.id());
- }
- });
-
- previousCatalogVersionIndexIds = indexIds;
- }
-
- this.tableVersionByIndexId.putAll(tableVersionByIndexId);
- this.readOnlyIndexes.addAll(readOnlyIndexes);
- }
-
- private CatalogIndexDescriptor indexBusy(int indexId, int catalogVersion) {
- CatalogIndexDescriptor index = catalogService.index(indexId,
catalogVersion);
-
- assert index != null : "indexId=" + indexId + ", catalogVersion=" +
catalogVersion;
-
- return index;
- }
-
- private long findStoppingActivationTsBusy(int indexId, int
toCatalogVersionIncluded) {
- int earliestCatalogVersion = catalogService.earliestCatalogVersion();
-
- for (int catalogVersion = toCatalogVersionIncluded; catalogVersion >=
earliestCatalogVersion; catalogVersion--) {
- if (indexBusy(indexId, catalogVersion).status() == AVAILABLE) {
- return catalogActivationTimestampBusy(catalogVersion + 1);
- }
- }
-
- throw new AssertionError(format(
- "{} status activation timestamp was not found for index:
[indexId={}, toCatalogVersionIncluded={}]",
- STOPPING, indexId, toCatalogVersionIncluded
- ));
- }
-
- private Set<Integer> tableIds(int catalogVersion) {
- return
catalogService.tables(catalogVersion).stream().map(CatalogObjectDescriptor::id).collect(toSet());
+ });
}
- private int tableVersionBusy(CatalogIndexDescriptor index, int
catalogVersion) {
- CatalogTableDescriptor table = catalogService.table(index.tableId(),
catalogVersion);
-
- assert table != null : "indexId=" + index.id() + ", tableId=" +
index.tableId() + ", catalogVersion=" + catalogVersion;
-
- return table.tableVersion();
+ private void recoverStructuresBusy() {
+ indexMetaStorage.indexMetas().stream()
+ .filter(indexMeta -> indexMeta.status() ==
MetaIndexStatus.READ_ONLY)
+ .map(FullStateTransferIndexChooser::toReadOnlyIndexInfo)
+ .forEach(readOnlyIndexes::add);
}
private List<IndexIdAndTableVersion> enrichWithTableVersions(List<Integer>
indexIds) {
return indexIds.stream()
.map(indexId -> {
- Integer tableVersion = tableVersionByIndexId.get(indexId);
+ IndexMeta indexMeta = indexMetaStorage.indexMeta(indexId);
+
+ if (indexMeta == null || indexMeta.status() ==
MetaIndexStatus.REMOVED) {
+ return null;
+ }
- return tableVersion == null ? null : new
IndexIdAndTableVersion(indexId, tableVersion);
+ return new IndexIdAndTableVersion(indexId,
indexMeta.tableVersion());
})
.filter(Objects::nonNull)
.collect(toCollection(() -> new ArrayList<>(indexIds.size())));
@@ -413,17 +301,30 @@ public class FullStateTransferIndexChooser implements
ManuallyCloseable {
inBusyLock(busyLock, () -> {
int lwmCatalogVersion =
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
- Iterator<ReadOnlyIndexInfo> it = readOnlyIndexes.iterator();
+ readOnlyIndexes.removeIf(readOnlyIndexInfo ->
readOnlyIndexInfo.indexRemovalCatalogVersion() <= lwmCatalogVersion);
+ });
+ }
- while (it.hasNext()) {
- ReadOnlyIndexInfo readOnlyIndexInfo = it.next();
+ private static long activationTs(IndexMeta indexMeta, MetaIndexStatus
status) {
+ return indexMeta.statusChange(status).activationTimestamp();
+ }
- if (readOnlyIndexInfo.indexRemovalCatalogVersion() <=
lwmCatalogVersion) {
- it.remove();
+ private static ReadOnlyIndexInfo toReadOnlyIndexInfo(IndexMeta indexMeta) {
+ assert indexMeta.status() == MetaIndexStatus.READ_ONLY : "indexId=" +
indexMeta.indexId() + ", status=" + indexMeta.status();
- tableVersionByIndexId.remove(readOnlyIndexInfo.indexId());
- }
- }
- });
+ long activationTs;
+
+ if (indexMeta.statusChanges().containsKey(MetaIndexStatus.STOPPING)) {
+ activationTs = activationTs(indexMeta, MetaIndexStatus.STOPPING);
+ } else {
+ activationTs = activationTs(indexMeta, MetaIndexStatus.READ_ONLY);
+ }
+
+ return new ReadOnlyIndexInfo(
+ indexMeta.tableId(),
+ activationTs,
+ indexMeta.indexId(),
+
indexMeta.statusChange(MetaIndexStatus.READ_ONLY).catalogVersion()
+ );
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index a4127f5c7c..2a3d718913 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table.distributed;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex;
@@ -29,7 +30,9 @@ import static
org.apache.ignite.internal.util.CompletableFutures.emptySetComplet
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
+import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.apache.ignite.sql.ColumnType.INT64;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -53,7 +56,6 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -120,7 +122,6 @@ import
org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
@@ -148,7 +149,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
private static final String INDEXED_COLUMN_NAME = "columnName";
private static final int PARTITIONS = 8;
private static final ClusterNode node = new ClusterNodeImpl("nodeid",
NODE_NAME, new NetworkAddress("127.0.0.1", 2245));
- private static final long WAIT_TIMEOUT = TimeUnit.SECONDS.toMillis(10);
+ private static final long WAIT_TIMEOUT = SECONDS.toMillis(10);
// Configuration
@InjectConfiguration("mock.profiles.default = {engine = \"aipersist\"}")
@@ -172,6 +173,8 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
private HybridClockImpl clock;
private TestLowWatermark lowWatermark;
+ private IndexMetaStorage indexMetaStorage;
+
// Table internal components
@Mock
private ReplicaManager replicaMgr;
@@ -311,6 +314,9 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
lowWatermark = new TestLowWatermark();
lowWatermark.updateWithoutNotify(savedWatermark);
ClockService clockService = new TestClockService(clock);
+
+ indexMetaStorage = new IndexMetaStorage(catalogManager, lowWatermark,
metaStorageManager);
+
tableManager = new TableManager(
NODE_NAME,
revisionUpdater,
@@ -343,7 +349,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
new RemotelyTriggeredResourceRegistry(),
lowWatermark,
new TransactionInflights(placementDriver, clockService),
- mock(IndexMetaStorage.class)
+ indexMetaStorage
) {
@Override
@@ -364,36 +370,32 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
}
};
- ComponentContext componentContext = new ComponentContext();
+ var componentContext = new ComponentContext();
+
assertThat(
metaStorageManager.startAsync(componentContext)
.thenCompose(unused ->
metaStorageManager.recoveryFinishedFuture())
- .thenCompose(unused -> startAsync(componentContext,
catalogManager, sm, tableManager))
+ .thenCompose(unused -> startAsync(componentContext,
catalogManager, sm, indexMetaStorage, tableManager))
.thenCompose(unused -> ((MetaStorageManagerImpl)
metaStorageManager).notifyRevisionUpdateListenerOnStart())
.thenCompose(unused ->
metaStorageManager.deployWatches()),
willCompleteSuccessfully()
);
}
- /**
- * Stops TableManager and dependencies.
- */
+ /** Stops TableManager and dependencies. */
private void stopComponents() throws Exception {
- ComponentContext componentContext = new ComponentContext();
- if (tableManager != null) {
- tableManager.beforeNodeStop();
- assertThat(tableManager.stopAsync(componentContext),
willCompleteSuccessfully());
- }
-
closeAll(
- dsm == null ? null : () ->
assertThat(dsm.stopAsync(componentContext), willCompleteSuccessfully()),
- sm == null ? null : () ->
assertThat(sm.stopAsync(componentContext), willCompleteSuccessfully()),
- catalogManager == null ? null :
- () ->
assertThat(catalogManager.stopAsync(componentContext),
willCompleteSuccessfully()),
- metaStorageManager == null ? null :
- () ->
assertThat(metaStorageManager.stopAsync(componentContext),
willCompleteSuccessfully()),
- partitionOperationsExecutor == null ? null
- : () ->
IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 10,
TimeUnit.SECONDS)
+ tableManager == null ? null : tableManager::beforeNodeStop,
+ dsm == null ? null : dsm::beforeNodeStop,
+ sm == null ? null : sm::beforeNodeStop,
+ indexMetaStorage == null ? null :
indexMetaStorage::beforeNodeStop,
+ catalogManager == null ? null : catalogManager::beforeNodeStop,
+ metaStorageManager == null ? null :
metaStorageManager::beforeNodeStop,
+ () -> assertThat(
+ stopAsync(new ComponentContext(), tableManager, dsm,
sm, indexMetaStorage, catalogManager, metaStorageManager),
+ willCompleteSuccessfully()
+ ),
+ partitionOperationsExecutor == null ? null : () ->
shutdownAndAwaitTermination(partitionOperationsExecutor, 10, SECONDS)
);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index ecb6c2016b..1ab612e2d5 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -31,6 +31,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
+import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.apache.ignite.sql.ColumnType.INT64;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -255,13 +256,16 @@ public class TableManagerTest extends IgniteAbstractTest {
private TestLowWatermark lowWatermark;
+ private IndexMetaStorage indexMetaStorage;
+
@BeforeEach
void before() throws NodeStoppingException {
lowWatermark = new TestLowWatermark();
catalogMetastore = StandaloneMetaStorageManager.create(new
SimpleInMemoryKeyValueStorage(NODE_NAME));
catalogManager = CatalogTestUtils.createTestCatalogManager(NODE_NAME,
clock, catalogMetastore);
+ indexMetaStorage = new IndexMetaStorage(catalogManager, lowWatermark,
catalogMetastore);
- assertThat(startAsync(new ComponentContext(), catalogMetastore,
catalogManager), willCompleteSuccessfully());
+ assertThat(startAsync(new ComponentContext(), catalogMetastore,
catalogManager, indexMetaStorage), willCompleteSuccessfully());
revisionUpdater = (LongFunction<CompletableFuture<?>> function) ->
catalogMetastore.registerRevisionUpdateListener(function::apply);
@@ -306,7 +310,8 @@ public class TableManagerTest extends IgniteAbstractTest {
@AfterEach
void after() throws Exception {
- ComponentContext componentContext = new ComponentContext();
+ var componentContext = new ComponentContext();
+
closeAll(
() -> {
assertTrue(tblManagerFut.isDone());
@@ -314,12 +319,15 @@ public class TableManagerTest extends IgniteAbstractTest {
tblManagerFut.join().beforeNodeStop();
assertThat(tblManagerFut.join().stopAsync(componentContext),
willCompleteSuccessfully());
},
- dsm == null ? null : () ->
assertThat(dsm.stopAsync(componentContext), willCompleteSuccessfully()),
- sm == null ? null : () ->
assertThat(sm.stopAsync(componentContext), willCompleteSuccessfully()),
- catalogManager == null ? null :
- () ->
assertThat(catalogManager.stopAsync(componentContext),
willCompleteSuccessfully()),
- catalogMetastore == null ? null :
- () ->
assertThat(catalogMetastore.stopAsync(componentContext),
willCompleteSuccessfully()),
+ dsm == null ? null : dsm::beforeNodeStop,
+ sm == null ? null : sm::beforeNodeStop,
+ indexMetaStorage == null ? null :
indexMetaStorage::beforeNodeStop,
+ catalogManager == null ? null : catalogManager::beforeNodeStop,
+ catalogMetastore == null ? null :
catalogMetastore::beforeNodeStop,
+ () -> assertThat(
+ stopAsync(componentContext, dsm, sm, indexMetaStorage,
catalogManager, catalogMetastore),
+ willCompleteSuccessfully()
+ ),
partitionOperationsExecutor == null ? null
: () ->
IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 10,
TimeUnit.SECONDS)
);
@@ -781,9 +789,12 @@ public class TableManagerTest extends IgniteAbstractTest {
*
* @return Table manager.
*/
- private TableManager createTableManager(CompletableFuture<TableManager>
tblManagerFut, Consumer<MvTableStorage> tableStorageDecorator,
- Consumer<TxStateTableStorage> txStateTableStorageDecorator) {
- TableManager tableManager = new TableManager(
+ private TableManager createTableManager(
+ CompletableFuture<TableManager> tblManagerFut,
+ Consumer<MvTableStorage> tableStorageDecorator,
+ Consumer<TxStateTableStorage> txStateTableStorageDecorator
+ ) {
+ var tableManager = new TableManager(
NODE_NAME,
revisionUpdater,
gcConfig,
@@ -815,7 +826,7 @@ public class TableManagerTest extends IgniteAbstractTest {
new RemotelyTriggeredResourceRegistry(),
lowWatermark,
mock(TransactionInflights.class),
- mock(IndexMetaStorage.class)
+ indexMetaStorage
) {
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/BaseIndexMetaStorageTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/BaseIndexMetaStorageTest.java
index 4e0139e771..ea97cbba65 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/BaseIndexMetaStorageTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/BaseIndexMetaStorageTest.java
@@ -155,7 +155,7 @@ abstract class BaseIndexMetaStorageTest extends
BaseIgniteAbstractTest {
}
List<String> allIndexNamesFromSnapshotIndexMetas() {
- return indexMetaStorage.indexMetasSnapshot().stream()
+ return indexMetaStorage.indexMetas().stream()
.map(IndexMeta::indexName)
.collect(toList());
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageTest.java
index a390a6ae61..a1b7cc039b 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageTest.java
@@ -34,12 +34,17 @@ import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REMOVED;
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.STOPPING;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import java.util.Collection;
import java.util.Map;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -483,4 +488,20 @@ public class IndexMetaStorageTest extends
BaseIndexMetaStorageTest {
fail("Unknown status: " + metaIndexStatus);
}
}
+
+ @Test
+ void testIndexMetas() {
+ createSimpleHashIndex(catalogManager, TABLE_NAME, INDEX_NAME);
+
+ int pkIndexId = indexId(PK_INDEX_NAME);
+ int indexId = indexId(INDEX_NAME);
+
+ Collection<IndexMeta> indexMetas = indexMetaStorage.indexMetas();
+
+ assertThat(indexMetas,
containsInAnyOrder(indexMetaStorage.indexMeta(pkIndexId),
indexMetaStorage.indexMeta(indexId)));
+
+ assertThrows(UnsupportedOperationException.class, () ->
indexMetas.add(mock(IndexMeta.class)));
+ assertThrows(UnsupportedOperationException.class, () ->
indexMetas.remove(mock(IndexMeta.class)));
+ assertThrows(UnsupportedOperationException.class, () ->
indexMetas.iterator().remove());
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java
index c87371cb66..ab01252e6a 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/FullStateTransferIndexChooserTest.java
@@ -36,6 +36,8 @@ import static
org.apache.ignite.internal.table.TableTestUtils.startBuildingIndex
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CollectionUtils.view;
import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
+import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
+import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.apache.ignite.sql.ColumnType.INT32;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -55,8 +57,12 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -66,6 +72,8 @@ import org.junit.jupiter.params.provider.ValueSource;
/** For {@link FullStateTransferIndexChooser} testing. */
public class FullStateTransferIndexChooserTest extends BaseIgniteAbstractTest {
+ private static final String NODE_NAME = "test";
+
private static final String REGISTERED_INDEX_NAME = INDEX_NAME + "_" +
REGISTERED;
private static final String BUILDING_INDEX_NAME = INDEX_NAME + "_" +
BUILDING;
@@ -82,18 +90,28 @@ public class FullStateTransferIndexChooserTest extends
BaseIgniteAbstractTest {
private final TestLowWatermark lowWatermark = new TestLowWatermark();
+ private MetaStorageManager metaStorageManager;
+
+ private IndexMetaStorage indexMetaStorage;
+
private FullStateTransferIndexChooser indexChooser;
@BeforeEach
void setUp() {
- catalogManager =
CatalogTestUtils.createCatalogManagerWithTestUpdateLog("test", clock);
+ metaStorageManager = StandaloneMetaStorageManager.create(new
SimpleInMemoryKeyValueStorage(NODE_NAME), clock);
- indexChooser = new FullStateTransferIndexChooser(catalogManager,
lowWatermark);
+ catalogManager =
CatalogTestUtils.createCatalogManagerWithTestUpdateLog(NODE_NAME, clock);
- assertThat(catalogManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+ indexMetaStorage = new IndexMetaStorage(catalogManager, lowWatermark,
metaStorageManager);
+
+ indexChooser = new FullStateTransferIndexChooser(catalogManager,
lowWatermark, indexMetaStorage);
+
+ assertThat(startAsync(new ComponentContext(), metaStorageManager,
catalogManager, indexMetaStorage), willCompleteSuccessfully());
indexChooser.start();
+ assertThat(metaStorageManager.deployWatches(),
willCompleteSuccessfully());
+
createSimpleTable(catalogManager, TABLE_NAME);
}
@@ -101,8 +119,13 @@ public class FullStateTransferIndexChooserTest extends
BaseIgniteAbstractTest {
void tearDown() throws Exception {
closeAllManually(
indexChooser,
- catalogManager::beforeNodeStop,
- () -> assertThat(catalogManager.stopAsync(new
ComponentContext()), willCompleteSuccessfully())
+ indexMetaStorage == null ? null :
indexMetaStorage::beforeNodeStop,
+ catalogManager == null ? null : catalogManager::beforeNodeStop,
+ metaStorageManager == null ? null :
metaStorageManager::beforeNodeStop,
+ () -> assertThat(
+ stopAsync(new ComponentContext(), indexMetaStorage,
catalogManager, metaStorageManager),
+ willCompleteSuccessfully()
+ )
);
}
@@ -444,7 +467,7 @@ public class FullStateTransferIndexChooserTest extends
BaseIgniteAbstractTest {
}
private void dropIndex(String indexName) {
- TableTestUtils.dropIndex(catalogManager,
SqlCommon.DEFAULT_SCHEMA_NAME, indexName);
+ TableTestUtils.dropSimpleIndex(catalogManager, indexName);
}
private int latestCatalogVersion() {
@@ -471,7 +494,7 @@ public class FullStateTransferIndexChooserTest extends
BaseIgniteAbstractTest {
private void recoverIndexChooser() {
indexChooser.close();
- indexChooser = new FullStateTransferIndexChooser(catalogManager,
lowWatermark);
+ indexChooser = new FullStateTransferIndexChooser(catalogManager,
lowWatermark, indexMetaStorage);
indexChooser.start();
}