This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 28e6186147 IGNITE-22442 Improve recovery of local index metadata 
(#3928)
28e6186147 is described below

commit 28e61861476c5bae883d9f39d580d9d61b0d93f9
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Jun 17 11:02:03 2024 +0300

    IGNITE-22442 Improve recovery of local index metadata (#3928)
---
 .../table/distributed/index/IndexMetaStorage.java  | 200 +++++++++++++--------
 .../index/IndexMetaStorageRecoveryTest.java        |  31 ++++
 2 files changed, 155 insertions(+), 76 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
index dc00d5c9ad..3fbd87cd0c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed.index;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_AVAILABLE;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_BUILDING;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
@@ -39,15 +40,18 @@ import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus
 import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.statusOnRemoveIndex;
 import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
 import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.apache.ignite.internal.util.CollectionUtils.difference;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
@@ -65,6 +69,8 @@ import 
org.apache.ignite.internal.catalog.events.RenameTableEventParameters;
 import 
org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.StoppingIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.TableEventParameters;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lowwatermark.LowWatermark;
 import 
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
@@ -117,6 +123,20 @@ public class IndexMetaStorage implements IgniteComponent {
 
     private final Map<Integer, IndexMeta> indexMetaByIndexId = new 
ConcurrentHashMap<>();
 
+    private final EventListener<CreateIndexEventParameters> 
onCatalogIndexCreateEventListener;
+
+    private final EventListener<RemoveIndexEventParameters> 
onCatalogIndexRemovedEventListener;
+
+    private final EventListener<StartBuildingIndexEventParameters> 
onCatalogIndexBuildingEventListener;
+
+    private final EventListener<MakeIndexAvailableEventParameters> 
onCatalogIndexAvailableEventListener;
+
+    private final EventListener<StoppingIndexEventParameters> 
onCatalogIndexStoppingEventListener;
+
+    private final EventListener<TableEventParameters> 
onCatalogTableAlterEventListener;
+
+    private final EventListener<ChangeLowWatermarkEventParameters> 
onLwmChangedListener;
+
     /** Constructor. */
     public IndexMetaStorage(
             CatalogService catalogService,
@@ -126,20 +146,28 @@ public class IndexMetaStorage implements IgniteComponent {
         this.catalogService = catalogService;
         this.lowWatermark = lowWatermark;
         this.metaStorageManager = metaStorageManager;
+
+        onCatalogIndexCreateEventListener = 
fromFunction(this::onCatalogIndexCreateEvent);
+        onCatalogIndexRemovedEventListener = 
fromFunction(this::onCatalogIndexRemovedEvent);
+        onCatalogIndexBuildingEventListener = 
fromFunction(this::onCatalogIndexBuildingEvent);
+        onCatalogIndexAvailableEventListener = 
fromFunction(this::onCatalogIndexAvailableEvent);
+        onCatalogIndexStoppingEventListener = 
fromFunction(this::onCatalogIndexStoppingEvent);
+        onCatalogTableAlterEventListener = 
fromFunction(this::onCatalogTableAlterEvent);
+        onLwmChangedListener = fromFunction(this::onLwmChanged);
     }
 
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
         try {
-            catalogService.listen(INDEX_CREATE, 
fromFunction(this::onCatalogIndexCreateEvent));
-            catalogService.listen(INDEX_REMOVED, 
fromFunction(this::onCatalogIndexRemovedEvent));
-            catalogService.listen(INDEX_BUILDING, 
fromFunction(this::onCatalogIndexBuildingEvent));
-            catalogService.listen(INDEX_AVAILABLE, 
fromFunction(this::onCatalogIndexAvailableEvent));
-            catalogService.listen(INDEX_STOPPING, 
fromFunction(this::onCatalogIndexStoppingEvent));
+            catalogService.listen(INDEX_CREATE, 
onCatalogIndexCreateEventListener);
+            catalogService.listen(INDEX_REMOVED, 
onCatalogIndexRemovedEventListener);
+            catalogService.listen(INDEX_BUILDING, 
onCatalogIndexBuildingEventListener);
+            catalogService.listen(INDEX_AVAILABLE, 
onCatalogIndexAvailableEventListener);
+            catalogService.listen(INDEX_STOPPING, 
onCatalogIndexStoppingEventListener);
 
-            catalogService.listen(TABLE_ALTER, 
fromFunction(this::onCatalogTableAlterEvent));
+            catalogService.listen(TABLE_ALTER, 
onCatalogTableAlterEventListener);
 
-            lowWatermark.listen(LOW_WATERMARK_CHANGED, 
fromFunction(this::onLwmChanged));
+            lowWatermark.listen(LOW_WATERMARK_CHANGED, onLwmChangedListener);
 
             return recoverIndexMetas();
         } catch (Throwable t) {
@@ -149,6 +177,16 @@ public class IndexMetaStorage implements IgniteComponent {
 
     @Override
     public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
+        catalogService.removeListener(INDEX_CREATE, 
onCatalogIndexCreateEventListener);
+        catalogService.removeListener(INDEX_REMOVED, 
onCatalogIndexRemovedEventListener);
+        catalogService.removeListener(INDEX_BUILDING, 
onCatalogIndexBuildingEventListener);
+        catalogService.removeListener(INDEX_AVAILABLE, 
onCatalogIndexAvailableEventListener);
+        catalogService.removeListener(INDEX_STOPPING, 
onCatalogIndexStoppingEventListener);
+
+        catalogService.removeListener(TABLE_ALTER, 
onCatalogTableAlterEventListener);
+
+        lowWatermark.removeListener(LOW_WATERMARK_CHANGED, 
onLwmChangedListener);
+
         indexMetaByIndexId.clear();
 
         return nullCompletedFuture();
@@ -191,7 +229,7 @@ public class IndexMetaStorage implements IgniteComponent {
         CompletableFuture<?>[] resultFuture = new CompletableFuture<?>[1];
 
         lowWatermark.getLowWatermarkSafe(lwm -> {
-            int lwmCatalogVersion = 
catalogService.activeCatalogVersion(hybridTimestampToLong(lwm));
+            int lwmCatalogVersion = lwmCatalogVersion(lwm);
 
             if (eventCatalogVersion <= lwmCatalogVersion) {
                 // There is no need to add a read-only index, since the index 
should be destroyed under the updated low watermark.
@@ -269,91 +307,72 @@ public class IndexMetaStorage implements IgniteComponent {
     }
 
     private CompletableFuture<Boolean> 
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
-        int lwmCatalogVersion = 
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
-
-        Iterator<IndexMeta> it = indexMetaByIndexId.values().iterator();
-
-        var futures = new ArrayList<CompletableFuture<?>>();
+        int lwmCatalogVersion = 
lwmCatalogVersion(parameters.newLowWatermark());
 
-        while (it.hasNext()) {
-            IndexMeta indexMeta = it.next();
-
-            if (shouldBeRemoved(indexMeta, lwmCatalogVersion)) {
-                it.remove();
-
-                futures.add(removeFromMetastore(indexMeta));
-            }
-        }
-
-        return futures.isEmpty() ? falseCompletedFuture() : 
allOf(futures.toArray(CompletableFuture[]::new)).thenApply(unused -> false);
+        return 
removeIndexMetasFromMetastore(lwmCatalogVersion).thenApply(unused -> false);
     }
 
-    // TODO: IGNITE-22442 Improve recovery
     private CompletableFuture<Void> recoverIndexMetas() {
-        Catalog catalog = catalog(catalogService.latestCatalogVersion());
+        int lwmCatalogVersion = 
lwmCatalogVersion(lowWatermark.getLowWatermark());
+        int latestCatalogVersion = catalogService.latestCatalogVersion();
+        int startCatalogVersion = Math.max(lwmCatalogVersion, 
catalogService.earliestCatalogVersion());
 
-        Map<Integer, IndexMeta> fromMetastore = 
readAllFromMetastoreOnRecovery();
-        Map<Integer, CatalogIndexDescriptor> fromCatalogLatest = 
readAllFromCatalogOnRecovery(catalog.version());
+        indexMetaByIndexId.putAll(readAllFromMetastoreOnRecovery());
 
         var futures = new ArrayList<CompletableFuture<?>>();
 
-        for (CatalogIndexDescriptor catalogIndexDescriptor : 
fromCatalogLatest.values()) {
-            int indexId = catalogIndexDescriptor.id();
-
-            IndexMeta indexMetaFromMetastore = fromMetastore.get(indexId);
-
-            if (indexMetaFromMetastore == null) {
-                // We did not have time to save at the index creation event.
-                futures.add(updateAndSaveIndexMetaToMetastore(indexId, 
indexMeta -> IndexMeta.of(catalogIndexDescriptor, catalog)));
-            } else if 
(!catalogIndexDescriptor.name().equals(indexMetaFromMetastore.indexName())) {
-                // We did not have time to process the index renaming event.
-                futures.add(updateAndSaveIndexMetaToMetastore(
-                        indexId,
-                        indexMeta -> 
indexMetaFromMetastore.indexName(catalog.version(), 
catalogIndexDescriptor.name())
-                ));
-            } else if 
(MetaIndexStatus.convert(catalogIndexDescriptor.status()) != 
indexMetaFromMetastore.status()) {
-                // We did not have time to process the index status change 
event.
-                futures.add(updateAndSaveIndexMetaToMetastore(
-                        indexId,
-                        indexMeta -> setNewStatus(indexMetaFromMetastore, 
MetaIndexStatus.convert(catalogIndexDescriptor.status()), catalog)
-                ));
-            } else {
-                indexMetaByIndexId.put(indexId, indexMetaFromMetastore);
+        Set<Integer> previousCatalogVersionIndexIds = 
indexIdsForCatalogVersion(indexMetaByIndexId.values(), startCatalogVersion - 1);
+
+        for (int catalogVersion = startCatalogVersion; catalogVersion <= 
latestCatalogVersion; catalogVersion++) {
+            Catalog catalog = catalog(catalogVersion);
+
+            var indexIds = new HashSet<Integer>();
+
+            for (CatalogIndexDescriptor catalogIndexDescriptor : 
catalog.indexes()) {
+                int indexId = catalogIndexDescriptor.id();
+
+                indexIds.add(indexId);
+
+                IndexMeta fromMetastore = indexMetaByIndexId.get(indexId);
+
+                if (fromMetastore == null) {
+                    // We did not have time to save at the index creation 
event.
+                    futures.add(updateAndSaveIndexMetaToMetastore(indexId, 
indexMeta -> IndexMeta.of(catalogIndexDescriptor, catalog)));
+                } else if (fromMetastore.catalogVersion() < catalog.version()) 
{
+                    if 
(!catalogIndexDescriptor.name().equals(fromMetastore.indexName())) {
+                        // We did not have time to process the index renaming 
event.
+                        futures.add(updateAndSaveIndexMetaToMetastore(
+                                indexId,
+                                indexMeta -> 
indexMeta.indexName(catalog.version(), catalogIndexDescriptor.name())
+                        ));
+                    } else if 
(MetaIndexStatus.convert(catalogIndexDescriptor.status()) != 
fromMetastore.status()) {
+                        // We did not have time to process the index status 
change event.
+                        futures.add(updateAndSaveIndexMetaToMetastore(
+                                indexId,
+                                indexMeta -> setNewStatus(fromMetastore, 
MetaIndexStatus.convert(catalogIndexDescriptor.status()), catalog)
+                        ));
+                    }
+                }
             }
-        }
-
-        // Let’s deal with read-only indexes.
-        int lwmCatalogVersion = 
catalogService.activeCatalogVersion(hybridTimestampToLong(lowWatermark.getLowWatermark()));
 
-        for (IndexMeta indexMeta : fromMetastore.values()) {
-            int indexId = indexMeta.indexId();
+            // Let's identify the indexes that were removed from the catalog 
in this version.
+            for (Integer removedIndexId : 
difference(previousCatalogVersionIndexIds, indexIds)) {
+                IndexMeta fromMetastore = 
indexMetaByIndexId.get(removedIndexId);
 
-            if (indexMetaByIndexId.containsKey(indexId)) {
-                continue;
-            }
-
-            if (indexMeta.status() == READ_ONLY || indexMeta.status() == 
REMOVED) {
-                if (shouldBeRemoved(indexMeta, lwmCatalogVersion)) {
-                    // We did not have time to process the lwm change event.
-                    futures.add(removeFromMetastore(indexMeta));
-                } else {
-                    indexMetaByIndexId.put(indexId, indexMeta);
-                }
-            } else {
-                // We did not have time to process the index removal event.
-                if (catalog.version() <= lwmCatalogVersion) {
-                    // There is no need to add a read-only indexes, since the 
index should be destroyed under the updated low watermark.
-                    futures.add(removeFromMetastore(indexMeta));
-                } else {
+                if (fromMetastore.catalogVersion() < catalog.version()) {
                     futures.add(updateAndSaveIndexMetaToMetastore(
-                            indexId,
-                            meta -> setNewStatus(indexMeta, 
statusOnRemoveIndex(indexMeta.status()), catalog)
+                            removedIndexId,
+                            indexMeta -> setNewStatus(fromMetastore, 
statusOnRemoveIndex(fromMetastore.status()), catalog)
                     ));
                 }
             }
+
+            previousCatalogVersionIndexIds = indexIds;
         }
 
-        return futures.isEmpty() ? nullCompletedFuture() : 
allOf(futures.toArray(CompletableFuture[]::new));
+        futures.add(removeIndexMetasFromMetastore(lwmCatalogVersion));
+
+        return allOf(futures.toArray(CompletableFuture[]::new));
     }
 
     private CatalogIndexDescriptor catalogIndexDescriptor(int indexId, int 
catalogVersion) {
@@ -450,4 +469,33 @@ public class IndexMetaStorage implements IgniteComponent {
 
         return saveToMetastore(newMeta);
     }
+
+    private int lwmCatalogVersion(@Nullable HybridTimestamp lwm) {
+        return catalogService.activeCatalogVersion(hybridTimestampToLong(lwm));
+    }
+
+    private static Set<Integer> 
indexIdsForCatalogVersion(Collection<IndexMeta> indexMetas, int catalogVersion) 
{
+        return indexMetas.stream()
+                .filter(indexMeta -> indexMeta.catalogVersion() <= 
catalogVersion)
+                .map(IndexMeta::indexId)
+                .collect(toSet());
+    }
+
+    private CompletableFuture<Void> removeIndexMetasFromMetastore(int 
lwmCatalogVersion) {
+        Iterator<IndexMeta> it = indexMetaByIndexId.values().iterator();
+
+        var futures = new ArrayList<CompletableFuture<?>>();
+
+        while (it.hasNext()) {
+            IndexMeta indexMeta = it.next();
+
+            if (shouldBeRemoved(indexMeta, lwmCatalogVersion)) {
+                it.remove();
+
+                futures.add(removeFromMetastore(indexMeta));
+            }
+        }
+
+        return futures.isEmpty() ? nullCompletedFuture() : 
allOf(futures.toArray(CompletableFuture[]::new));
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
index c7f2ebde69..6a59ff878c 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
@@ -416,6 +416,37 @@ public class IndexMetaStorageRecoveryTest extends 
BaseIndexMetaStorageTest {
         assertNull(fromMetastore(indexId));
     }
 
+    @Test
+    void testMissingMultipleIndexUpdates() throws Exception {
+        assertThat(indexMetaStorage.stopAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        int registeredIndexCatalogVersion = executeCatalogUpdate(() -> 
createSimpleHashIndex(catalogManager, TABLE_NAME, INDEX_NAME));
+
+        int indexId = indexId(INDEX_NAME);
+        int tableId = tableId(TABLE_NAME);
+
+        int buildingIndexCatalogVersion = executeCatalogUpdate(() -> 
startBuildingIndex(catalogManager, indexId));
+        int availableIndexCatalogVersion = executeCatalogUpdate(() -> 
makeIndexAvailable(catalogManager, indexId));
+        int stoppingIndexCatalogVersion = executeCatalogUpdate(() -> 
dropSimpleIndex(catalogManager, INDEX_NAME));
+        int removingIndexCatalogVersion = executeCatalogUpdate(() -> 
removeIndex(catalogManager, indexId));
+
+        restartComponents();
+
+        IndexMeta indexMeta = indexMetaStorage.indexMeta(indexId);
+        IndexMeta fromMetastore = fromMetastore(indexId);
+
+        Map<MetaIndexStatus, MetaIndexStatusChange> expectedStatuses = Map.of(
+                REGISTERED, toChangeInfo(registeredIndexCatalogVersion),
+                BUILDING, toChangeInfo(buildingIndexCatalogVersion),
+                AVAILABLE, toChangeInfo(availableIndexCatalogVersion),
+                STOPPING, toChangeInfo(stoppingIndexCatalogVersion),
+                READ_ONLY, toChangeInfo(removingIndexCatalogVersion)
+        );
+
+        checkFields(indexMeta, indexId, tableId, INDEX_NAME, READ_ONLY, 
expectedStatuses, removingIndexCatalogVersion);
+        checkFields(fromMetastore, indexId, tableId, INDEX_NAME, READ_ONLY, 
expectedStatuses, removingIndexCatalogVersion);
+    }
+
     private void executeCatalogUpdateWithDropEvents(RunnableX task) {
         CompletableFuture<Void> startDropEventsFuture = 
interceptor.startDropEvents();
 

Reply via email to