This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 28e6186147 IGNITE-22442 Improve recovery of local index metadata
(#3928)
28e6186147 is described below
commit 28e61861476c5bae883d9f39d580d9d61b0d93f9
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Jun 17 11:02:03 2024 +0300
IGNITE-22442 Improve recovery of local index metadata (#3928)
---
.../table/distributed/index/IndexMetaStorage.java | 200 +++++++++++++--------
.../index/IndexMetaStorageRecoveryTest.java | 31 ++++
2 files changed, 155 insertions(+), 76 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
index dc00d5c9ad..3fbd87cd0c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed.index;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_AVAILABLE;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_BUILDING;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
@@ -39,15 +40,18 @@ import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.statusOnRemoveIndex;
import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.apache.ignite.internal.util.CollectionUtils.difference;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
@@ -65,6 +69,8 @@ import
org.apache.ignite.internal.catalog.events.RenameTableEventParameters;
import
org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
import org.apache.ignite.internal.catalog.events.StoppingIndexEventParameters;
import org.apache.ignite.internal.catalog.events.TableEventParameters;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
import
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
@@ -117,6 +123,20 @@ public class IndexMetaStorage implements IgniteComponent {
private final Map<Integer, IndexMeta> indexMetaByIndexId = new
ConcurrentHashMap<>();
+ private final EventListener<CreateIndexEventParameters>
onCatalogIndexCreateEventListener;
+
+ private final EventListener<RemoveIndexEventParameters>
onCatalogIndexRemovedEventListener;
+
+ private final EventListener<StartBuildingIndexEventParameters>
onCatalogIndexBuildingEventListener;
+
+ private final EventListener<MakeIndexAvailableEventParameters>
onCatalogIndexAvailableEventListener;
+
+ private final EventListener<StoppingIndexEventParameters>
onCatalogIndexStoppingEventListener;
+
+ private final EventListener<TableEventParameters>
onCatalogTableAlterEventListener;
+
+ private final EventListener<ChangeLowWatermarkEventParameters>
onLwmChangedListener;
+
/** Constructor. */
public IndexMetaStorage(
CatalogService catalogService,
@@ -126,20 +146,28 @@ public class IndexMetaStorage implements IgniteComponent {
this.catalogService = catalogService;
this.lowWatermark = lowWatermark;
this.metaStorageManager = metaStorageManager;
+
+ onCatalogIndexCreateEventListener =
fromFunction(this::onCatalogIndexCreateEvent);
+ onCatalogIndexRemovedEventListener =
fromFunction(this::onCatalogIndexRemovedEvent);
+ onCatalogIndexBuildingEventListener =
fromFunction(this::onCatalogIndexBuildingEvent);
+ onCatalogIndexAvailableEventListener =
fromFunction(this::onCatalogIndexAvailableEvent);
+ onCatalogIndexStoppingEventListener =
fromFunction(this::onCatalogIndexStoppingEvent);
+ onCatalogTableAlterEventListener =
fromFunction(this::onCatalogTableAlterEvent);
+ onLwmChangedListener = fromFunction(this::onLwmChanged);
}
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
try {
- catalogService.listen(INDEX_CREATE,
fromFunction(this::onCatalogIndexCreateEvent));
- catalogService.listen(INDEX_REMOVED,
fromFunction(this::onCatalogIndexRemovedEvent));
- catalogService.listen(INDEX_BUILDING,
fromFunction(this::onCatalogIndexBuildingEvent));
- catalogService.listen(INDEX_AVAILABLE,
fromFunction(this::onCatalogIndexAvailableEvent));
- catalogService.listen(INDEX_STOPPING,
fromFunction(this::onCatalogIndexStoppingEvent));
+ catalogService.listen(INDEX_CREATE,
onCatalogIndexCreateEventListener);
+ catalogService.listen(INDEX_REMOVED,
onCatalogIndexRemovedEventListener);
+ catalogService.listen(INDEX_BUILDING,
onCatalogIndexBuildingEventListener);
+ catalogService.listen(INDEX_AVAILABLE,
onCatalogIndexAvailableEventListener);
+ catalogService.listen(INDEX_STOPPING,
onCatalogIndexStoppingEventListener);
- catalogService.listen(TABLE_ALTER,
fromFunction(this::onCatalogTableAlterEvent));
+ catalogService.listen(TABLE_ALTER,
onCatalogTableAlterEventListener);
- lowWatermark.listen(LOW_WATERMARK_CHANGED,
fromFunction(this::onLwmChanged));
+ lowWatermark.listen(LOW_WATERMARK_CHANGED, onLwmChangedListener);
return recoverIndexMetas();
} catch (Throwable t) {
@@ -149,6 +177,16 @@ public class IndexMetaStorage implements IgniteComponent {
@Override
public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
+ catalogService.removeListener(INDEX_CREATE,
onCatalogIndexCreateEventListener);
+ catalogService.removeListener(INDEX_REMOVED,
onCatalogIndexRemovedEventListener);
+ catalogService.removeListener(INDEX_BUILDING,
onCatalogIndexBuildingEventListener);
+ catalogService.removeListener(INDEX_AVAILABLE,
onCatalogIndexAvailableEventListener);
+ catalogService.removeListener(INDEX_STOPPING,
onCatalogIndexStoppingEventListener);
+
+ catalogService.removeListener(TABLE_ALTER,
onCatalogTableAlterEventListener);
+
+ lowWatermark.removeListener(LOW_WATERMARK_CHANGED,
onLwmChangedListener);
+
indexMetaByIndexId.clear();
return nullCompletedFuture();
@@ -191,7 +229,7 @@ public class IndexMetaStorage implements IgniteComponent {
CompletableFuture<?>[] resultFuture = new CompletableFuture<?>[1];
lowWatermark.getLowWatermarkSafe(lwm -> {
- int lwmCatalogVersion =
catalogService.activeCatalogVersion(hybridTimestampToLong(lwm));
+ int lwmCatalogVersion = lwmCatalogVersion(lwm);
if (eventCatalogVersion <= lwmCatalogVersion) {
// There is no need to add a read-only index, since the index
should be destroyed under the updated low watermark.
@@ -269,91 +307,72 @@ public class IndexMetaStorage implements IgniteComponent {
}
private CompletableFuture<Boolean>
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
- int lwmCatalogVersion =
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
-
- Iterator<IndexMeta> it = indexMetaByIndexId.values().iterator();
-
- var futures = new ArrayList<CompletableFuture<?>>();
+ int lwmCatalogVersion =
lwmCatalogVersion(parameters.newLowWatermark());
- while (it.hasNext()) {
- IndexMeta indexMeta = it.next();
-
- if (shouldBeRemoved(indexMeta, lwmCatalogVersion)) {
- it.remove();
-
- futures.add(removeFromMetastore(indexMeta));
- }
- }
-
- return futures.isEmpty() ? falseCompletedFuture() :
allOf(futures.toArray(CompletableFuture[]::new)).thenApply(unused -> false);
+ return
removeIndexMetasFromMetastore(lwmCatalogVersion).thenApply(unused -> false);
}
- // TODO: IGNITE-22442 Improve recovery
private CompletableFuture<Void> recoverIndexMetas() {
- Catalog catalog = catalog(catalogService.latestCatalogVersion());
+ int lwmCatalogVersion =
lwmCatalogVersion(lowWatermark.getLowWatermark());
+ int latestCatalogVersion = catalogService.latestCatalogVersion();
+ int startCatalogVersion = Math.max(lwmCatalogVersion,
catalogService.earliestCatalogVersion());
- Map<Integer, IndexMeta> fromMetastore =
readAllFromMetastoreOnRecovery();
- Map<Integer, CatalogIndexDescriptor> fromCatalogLatest =
readAllFromCatalogOnRecovery(catalog.version());
+ indexMetaByIndexId.putAll(readAllFromMetastoreOnRecovery());
var futures = new ArrayList<CompletableFuture<?>>();
- for (CatalogIndexDescriptor catalogIndexDescriptor :
fromCatalogLatest.values()) {
- int indexId = catalogIndexDescriptor.id();
-
- IndexMeta indexMetaFromMetastore = fromMetastore.get(indexId);
-
- if (indexMetaFromMetastore == null) {
- // We did not have time to save at the index creation event.
- futures.add(updateAndSaveIndexMetaToMetastore(indexId,
indexMeta -> IndexMeta.of(catalogIndexDescriptor, catalog)));
- } else if
(!catalogIndexDescriptor.name().equals(indexMetaFromMetastore.indexName())) {
- // We did not have time to process the index renaming event.
- futures.add(updateAndSaveIndexMetaToMetastore(
- indexId,
- indexMeta ->
indexMetaFromMetastore.indexName(catalog.version(),
catalogIndexDescriptor.name())
- ));
- } else if
(MetaIndexStatus.convert(catalogIndexDescriptor.status()) !=
indexMetaFromMetastore.status()) {
- // We did not have time to process the index status change
event.
- futures.add(updateAndSaveIndexMetaToMetastore(
- indexId,
- indexMeta -> setNewStatus(indexMetaFromMetastore,
MetaIndexStatus.convert(catalogIndexDescriptor.status()), catalog)
- ));
- } else {
- indexMetaByIndexId.put(indexId, indexMetaFromMetastore);
+ Set<Integer> previousCatalogVersionIndexIds =
indexIdsForCatalogVersion(indexMetaByIndexId.values(), startCatalogVersion - 1);
+
+ for (int catalogVersion = startCatalogVersion; catalogVersion <=
latestCatalogVersion; catalogVersion++) {
+ Catalog catalog = catalog(catalogVersion);
+
+ var indexIds = new HashSet<Integer>();
+
+ for (CatalogIndexDescriptor catalogIndexDescriptor :
catalog.indexes()) {
+ int indexId = catalogIndexDescriptor.id();
+
+ indexIds.add(indexId);
+
+ IndexMeta fromMetastore = indexMetaByIndexId.get(indexId);
+
+ if (fromMetastore == null) {
+ // We did not have time to save at the index creation
event.
+ futures.add(updateAndSaveIndexMetaToMetastore(indexId,
indexMeta -> IndexMeta.of(catalogIndexDescriptor, catalog)));
+ } else if (fromMetastore.catalogVersion() < catalog.version())
{
+ if
(!catalogIndexDescriptor.name().equals(fromMetastore.indexName())) {
+ // We did not have time to process the index renaming
event.
+ futures.add(updateAndSaveIndexMetaToMetastore(
+ indexId,
+ indexMeta ->
indexMeta.indexName(catalog.version(), catalogIndexDescriptor.name())
+ ));
+ } else if
(MetaIndexStatus.convert(catalogIndexDescriptor.status()) !=
fromMetastore.status()) {
+ // We did not have time to process the index status
change event.
+ futures.add(updateAndSaveIndexMetaToMetastore(
+ indexId,
+ indexMeta -> setNewStatus(fromMetastore,
MetaIndexStatus.convert(catalogIndexDescriptor.status()), catalog)
+ ));
+ }
+ }
}
- }
-
- // Let’s deal with read-only indexes.
- int lwmCatalogVersion =
catalogService.activeCatalogVersion(hybridTimestampToLong(lowWatermark.getLowWatermark()));
- for (IndexMeta indexMeta : fromMetastore.values()) {
- int indexId = indexMeta.indexId();
+ // Let's identify the indexes that were removed from the catalog
in this version.
+ for (Integer removedIndexId :
difference(previousCatalogVersionIndexIds, indexIds)) {
+ IndexMeta fromMetastore =
indexMetaByIndexId.get(removedIndexId);
- if (indexMetaByIndexId.containsKey(indexId)) {
- continue;
- }
-
- if (indexMeta.status() == READ_ONLY || indexMeta.status() ==
REMOVED) {
- if (shouldBeRemoved(indexMeta, lwmCatalogVersion)) {
- // We did not have time to process the lwm change event.
- futures.add(removeFromMetastore(indexMeta));
- } else {
- indexMetaByIndexId.put(indexId, indexMeta);
- }
- } else {
- // We did not have time to process the index removal event.
- if (catalog.version() <= lwmCatalogVersion) {
- // There is no need to add a read-only indexes, since the
index should be destroyed under the updated low watermark.
- futures.add(removeFromMetastore(indexMeta));
- } else {
+ if (fromMetastore.catalogVersion() < catalog.version()) {
futures.add(updateAndSaveIndexMetaToMetastore(
- indexId,
- meta -> setNewStatus(indexMeta,
statusOnRemoveIndex(indexMeta.status()), catalog)
+ removedIndexId,
+ indexMeta -> setNewStatus(fromMetastore,
statusOnRemoveIndex(fromMetastore.status()), catalog)
));
}
}
+
+ previousCatalogVersionIndexIds = indexIds;
}
- return futures.isEmpty() ? nullCompletedFuture() :
allOf(futures.toArray(CompletableFuture[]::new));
+ futures.add(removeIndexMetasFromMetastore(lwmCatalogVersion));
+
+ return allOf(futures.toArray(CompletableFuture[]::new));
}
private CatalogIndexDescriptor catalogIndexDescriptor(int indexId, int
catalogVersion) {
@@ -450,4 +469,33 @@ public class IndexMetaStorage implements IgniteComponent {
return saveToMetastore(newMeta);
}
+
+ private int lwmCatalogVersion(@Nullable HybridTimestamp lwm) {
+ return catalogService.activeCatalogVersion(hybridTimestampToLong(lwm));
+ }
+
+ private static Set<Integer>
indexIdsForCatalogVersion(Collection<IndexMeta> indexMetas, int catalogVersion)
{
+ return indexMetas.stream()
+ .filter(indexMeta -> indexMeta.catalogVersion() <=
catalogVersion)
+ .map(IndexMeta::indexId)
+ .collect(toSet());
+ }
+
+ private CompletableFuture<Void> removeIndexMetasFromMetastore(int
lwmCatalogVersion) {
+ Iterator<IndexMeta> it = indexMetaByIndexId.values().iterator();
+
+ var futures = new ArrayList<CompletableFuture<?>>();
+
+ while (it.hasNext()) {
+ IndexMeta indexMeta = it.next();
+
+ if (shouldBeRemoved(indexMeta, lwmCatalogVersion)) {
+ it.remove();
+
+ futures.add(removeFromMetastore(indexMeta));
+ }
+ }
+
+ return futures.isEmpty() ? nullCompletedFuture() :
allOf(futures.toArray(CompletableFuture[]::new));
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
index c7f2ebde69..6a59ff878c 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
@@ -416,6 +416,37 @@ public class IndexMetaStorageRecoveryTest extends
BaseIndexMetaStorageTest {
assertNull(fromMetastore(indexId));
}
+ @Test
+ void testMissingMultipleIndexUpdates() throws Exception {
+ assertThat(indexMetaStorage.stopAsync(new ComponentContext()),
willCompleteSuccessfully());
+
+ int registeredIndexCatalogVersion = executeCatalogUpdate(() ->
createSimpleHashIndex(catalogManager, TABLE_NAME, INDEX_NAME));
+
+ int indexId = indexId(INDEX_NAME);
+ int tableId = tableId(TABLE_NAME);
+
+ int buildingIndexCatalogVersion = executeCatalogUpdate(() ->
startBuildingIndex(catalogManager, indexId));
+ int availableIndexCatalogVersion = executeCatalogUpdate(() ->
makeIndexAvailable(catalogManager, indexId));
+ int stoppingIndexCatalogVersion = executeCatalogUpdate(() ->
dropSimpleIndex(catalogManager, INDEX_NAME));
+ int removingIndexCatalogVersion = executeCatalogUpdate(() ->
removeIndex(catalogManager, indexId));
+
+ restartComponents();
+
+ IndexMeta indexMeta = indexMetaStorage.indexMeta(indexId);
+ IndexMeta fromMetastore = fromMetastore(indexId);
+
+ Map<MetaIndexStatus, MetaIndexStatusChange> expectedStatuses = Map.of(
+ REGISTERED, toChangeInfo(registeredIndexCatalogVersion),
+ BUILDING, toChangeInfo(buildingIndexCatalogVersion),
+ AVAILABLE, toChangeInfo(availableIndexCatalogVersion),
+ STOPPING, toChangeInfo(stoppingIndexCatalogVersion),
+ READ_ONLY, toChangeInfo(removingIndexCatalogVersion)
+ );
+
+ checkFields(indexMeta, indexId, tableId, INDEX_NAME, READ_ONLY,
expectedStatuses, removingIndexCatalogVersion);
+ checkFields(fromMetastore, indexId, tableId, INDEX_NAME, READ_ONLY,
expectedStatuses, removingIndexCatalogVersion);
+ }
+
private void executeCatalogUpdateWithDropEvents(RunnableX task) {
CompletableFuture<Void> startDropEventsFuture =
interceptor.startDropEvents();