This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8dbb46c8c9 IGNITE-19085 Fix waiting for indexes on recovery when 
building indexes (#1855)
8dbb46c8c9 is described below

commit 8dbb46c8c98c349f24f86662ec3b8fca7dd6a832
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Mar 29 17:15:01 2023 +0300

    IGNITE-19085 Fix waiting for indexes on recovery when building indexes 
(#1855)
---
 .../apache/ignite/internal/index/IndexManager.java |  1 +
 .../ignite/internal/index/IndexManagerTest.java    |  2 +-
 .../apache/ignite/internal/table/TableImpl.java    | 88 ++++++++++------------
 .../internal/table/distributed/TableManager.java   | 29 +++++--
 .../org/apache/ignite/internal/table/Example.java  |  4 +-
 5 files changed, 66 insertions(+), 58 deletions(-)

diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 47a0ec6a3c..ed514ba196 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -73,6 +73,7 @@ import org.jetbrains.annotations.NotNull;
  * An Ignite component that is responsible for handling index-related commands 
like CREATE or DROP
  * as well as managing indexes' lifecycle.
  */
+// TODO: IGNITE-19082 Delete this class
 public class IndexManager extends Producer<IndexEvent, IndexEventParameters> 
implements IgniteComponent {
     private static final IgniteLogger LOG = 
Loggers.forClass(IndexManager.class);
 
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index a072fb3caa..99e8ff769b 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -85,7 +85,7 @@ public class IndexManagerTest {
 
             Mockito.doReturn(inv.getArgument(1)).when(tbl).tableId();
 
-            return completedFuture(new TableImpl(tbl, new HeapLockManager(), 
() -> completedFuture(List.of())));
+            return completedFuture(new TableImpl(tbl, new HeapLockManager()));
         });
 
         SchemaManager schManager = mock(SchemaManager.class);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 35b5c5f456..560c0b0543 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table;
 import static java.util.concurrent.CompletableFuture.allOf;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,7 +52,6 @@ import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -63,8 +63,6 @@ public class TableImpl implements Table {
 
     private final LockManager lockManager;
 
-    // private final Supplier<List<UUID>> activeIndexIds;
-
     /** Schema registry. Should be set either in constructor or via {@link 
#schemaView(SchemaRegistry)} before start of using the table. */
     private volatile SchemaRegistry schemaReg;
 
@@ -80,12 +78,10 @@ public class TableImpl implements Table {
      *
      * @param tbl       The table.
      * @param lockManager Lock manager.
-     * @param activeIndexIds Supplier of index ids which considered active on 
the moment of invocation.
      */
-    public TableImpl(InternalTable tbl, LockManager lockManager, 
Supplier<CompletableFuture<List<UUID>>> activeIndexIds) {
+    public TableImpl(InternalTable tbl, LockManager lockManager) {
         this.tbl = tbl;
         this.lockManager = lockManager;
-        // this.activeIndexIds = activeIndexIds;
     }
 
     /**
@@ -100,8 +96,6 @@ public class TableImpl implements Table {
         this.tbl = tbl;
         this.schemaReg = schemaReg;
         this.lockManager = lockManager;
-
-        // activeIndexIds = List::of;
     }
 
     /**
@@ -109,7 +103,7 @@ public class TableImpl implements Table {
      *
      * @return Table id as UUID.
      */
-    public @NotNull UUID tableId() {
+    public UUID tableId() {
         return tbl.tableId();
     }
 
@@ -132,8 +126,7 @@ public class TableImpl implements Table {
         return tbl;
     }
 
-    /** {@inheritDoc} */
-    @Override public @NotNull String name() {
+    @Override public String name() {
         return tbl.name();
     }
 
@@ -149,7 +142,7 @@ public class TableImpl implements Table {
     /**
      * Sets a schema view for the table.
      */
-    public void schemaView(@NotNull SchemaRegistry schemaReg) {
+    public void schemaView(SchemaRegistry schemaReg) {
         assert this.schemaReg == null : "Schema registry is already set 
[tableName=" + name() + ']';
 
         Objects.requireNonNull(schemaReg, "Schema registry must not be null 
[tableName=" + name() + ']');
@@ -157,25 +150,21 @@ public class TableImpl implements Table {
         this.schemaReg = schemaReg;
     }
 
-    /** {@inheritDoc} */
     @Override
     public <R> RecordView<R> recordView(Mapper<R> recMapper) {
         return new RecordViewImpl<>(tbl, schemaReg, recMapper);
     }
 
-    /** {@inheritDoc} */
     @Override
     public RecordView<Tuple> recordView() {
         return new RecordBinaryViewImpl(tbl, schemaReg);
     }
 
-    /** {@inheritDoc} */
     @Override
     public <K, V> KeyValueView<K, V> keyValueView(Mapper<K> keyMapper, 
Mapper<V> valMapper) {
         return new KeyValueViewImpl<>(tbl, schemaReg, keyMapper, valMapper);
     }
 
-    /** {@inheritDoc} */
     @Override
     public KeyValueView<Tuple, Tuple> keyValueView() {
         return new KeyValueBinaryViewImpl(tbl, schemaReg);
@@ -311,11 +300,7 @@ public class TableImpl implements Table {
                 )
         );
 
-        CompletableFuture<?> indexFuture = indexesToWait.remove(indexId);
-
-        if (indexFuture != null) {
-            indexFuture.complete(null);
-        }
+        completeWaitIndex(indexId);
     }
 
     /**
@@ -346,11 +331,7 @@ public class TableImpl implements Table {
                 )
         );
 
-        CompletableFuture<?> indexFuture = indexesToWait.remove(indexId);
-
-        if (indexFuture != null) {
-            indexFuture.complete(null);
-        }
+        completeWaitIndex(indexId);
     }
 
     /**
@@ -361,33 +342,18 @@ public class TableImpl implements Table {
     public void unregisterIndex(UUID indexId) {
         indexLockerFactories.remove(indexId);
         indexStorageAdapterFactories.remove(indexId);
+
+        completeWaitIndex(indexId);
+
+        // TODO: IGNITE-19150 Also need to destroy the index storages
     }
 
     private void awaitIndexes() {
-        // TODO: replace with actual call to ids supplier
-        List<UUID> indexIds = List.of(pkId()); // activeIndexIds.get();
-
         List<CompletableFuture<?>> toWait = new ArrayList<>();
 
-        for (UUID indexId : indexIds) {
-            if (indexLockerFactories.containsKey(indexId) && 
indexStorageAdapterFactories.containsKey(indexId)) {
-                continue;
-            }
-
-            CompletableFuture<?> indexFuture = 
indexesToWait.computeIfAbsent(indexId, k -> new CompletableFuture<>());
-
-            // there is no synchronization between modification of 
index*Factories collections
-            // and indexesToWait collection, thus we may run into situation, 
when index was
-            // registered in the between of index existence check and 
registering a wait future.
-            // This second check aimed to resolve this race
-            if (indexLockerFactories.containsKey(indexId) && 
indexStorageAdapterFactories.containsKey(indexId)) {
-                indexesToWait.remove(indexId);
-
-                continue;
-            }
+        toWait.add(pkId);
 
-            toWait.add(indexFuture);
-        }
+        toWait.addAll(indexesToWait.values());
 
         allOf(toWait.toArray(CompletableFuture[]::new)).join();
     }
@@ -396,10 +362,14 @@ public class TableImpl implements Table {
      * Prepares this table for being closed.
      */
     public void beforeClose() {
-        pkId.completeExceptionally(new IgniteInternalException(
+        IgniteInternalException closeTableException = new 
IgniteInternalException(
                 ErrorGroups.Table.TABLE_STOPPING_ERR,
                 "Table is being stopped: tableId=" + tableId()
-        ));
+        );
+
+        pkId.completeExceptionally(closeTableException);
+
+        indexesToWait.values().forEach(future -> 
future.completeExceptionally(closeTableException));
     }
 
     @FunctionalInterface
@@ -413,4 +383,24 @@ public class TableImpl implements Table {
         /** Creates the index decorator for given partition. */
         TableSchemaAwareIndexStorage create(int partitionId);
     }
+
+    /**
+     * Adds indexes to wait before inserting data into the table.
+     *
+     * @param indexIds Indexes Index IDs.
+     */
+    // TODO: IGNITE-19082 Needs to be redone/improved
+    public void addIndexesToWait(Collection<UUID> indexIds) {
+        for (UUID indexId : indexIds) {
+            indexesToWait.computeIfAbsent(indexId, uuid -> new 
CompletableFuture<>());
+        }
+    }
+
+    private void completeWaitIndex(UUID indexId) {
+        CompletableFuture<?> indexToWaitFuture = indexesToWait.remove(indexId);
+
+        if (indexToWaitFuture != null) {
+            indexToWaitFuture.complete(null);
+        }
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 24f028aff4..03136e6def 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodes;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
@@ -1130,8 +1131,10 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 distributionZoneConfiguration.partitions().value(), 
clusterNodeResolver, txManager, tableStorage,
                 txStateStorage, replicaSvc, clock);
 
-        // TODO: IGNITE-16288 directIndexIds should use async configuration API
-        var table = new TableImpl(internalTable, lockMgr, () -> 
CompletableFuture.supplyAsync(this::directIndexIds));
+        var table = new TableImpl(internalTable, lockMgr);
+
+        // TODO: IGNITE-19082 Need another way to wait for indexes
+        table.addIndexesToWait(collectTableIndexes(tblId));
 
         tablesByIdVv.update(causalityToken, (previous, e) -> 
inBusyLock(busyLock, () -> {
             if (e != null) {
@@ -1560,7 +1563,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      */
     private CompletableFuture<List<Table>> tablesAsyncInternal() {
         // TODO: IGNITE-16288 directTableIds should use async configuration API
-        return CompletableFuture.supplyAsync(() -> inBusyLock(busyLock, 
this::directTableIds))
+        return supplyAsync(() -> inBusyLock(busyLock, this::directTableIds), 
ioExecutor)
                 .thenCompose(tableIds -> inBusyLock(busyLock, () -> {
                     var tableFuts = new CompletableFuture[tableIds.size()];
 
@@ -1709,7 +1712,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         }
         try {
             // TODO: IGNITE-16288 directTableId should use async configuration 
API
-            return CompletableFuture.supplyAsync(() -> inBusyLock(busyLock, () 
-> directTableId(name)))
+            return supplyAsync(() -> inBusyLock(busyLock, () -> 
directTableId(name)), ioExecutor)
                     .thenCompose(tableId -> inBusyLock(busyLock, () -> {
                         if (tableId == null) {
                             return completedFuture(null);
@@ -1733,7 +1736,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     public CompletableFuture<TableImpl> tableAsyncInternal(UUID id, boolean 
checkConfiguration) {
         CompletableFuture<Boolean> tblCfgFut = checkConfiguration
                 // TODO: IGNITE-16288 isTableConfigured should use async 
configuration API
-                ? CompletableFuture.supplyAsync(() -> inBusyLock(busyLock, () 
-> isTableConfigured(id)))
+                ? supplyAsync(() -> inBusyLock(busyLock, () -> 
isTableConfigured(id)), ioExecutor)
                 : completedFuture(true);
 
         return tblCfgFut.thenCompose(isCfg -> inBusyLock(busyLock, () -> {
@@ -2380,4 +2383,20 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             nodeStoppingEx.set(true);
         }
     }
+
+    private Collection<UUID> collectTableIndexes(UUID tableId) {
+        NamedListView<? extends TableIndexView> indexes = 
tablesCfg.value().indexes();
+
+        List<UUID> indexIds = new ArrayList<>();
+
+        for (int i = 0; i < indexes.size(); i++) {
+            TableIndexView indexConfig = indexes.get(i);
+
+            if (indexConfig.tableId().equals(tableId)) {
+                indexIds.add(indexConfig.id());
+            }
+        }
+
+        return indexIds;
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java 
b/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java
index 3035d21993..26224fea9a 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java
@@ -21,7 +21,6 @@ import java.math.BigDecimal;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjects;
 import org.apache.ignite.internal.replicator.ReplicaService;
@@ -53,8 +52,7 @@ public class Example {
      */
     private static List<Table> tableFactory() {
         return Collections.singletonList(
-                new TableImpl(new 
DummyInternalTableImpl(Mockito.mock(ReplicaService.class)), new 
HeapLockManager(),
-                        () -> CompletableFuture.completedFuture(List.of())));
+                new TableImpl(new 
DummyInternalTableImpl(Mockito.mock(ReplicaService.class)), new 
HeapLockManager()));
     }
 
     /**

Reply via email to