This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8dbb46c8c9 IGNITE-19085 Fix waiting for indexes on recovery when
building indexes (#1855)
8dbb46c8c9 is described below
commit 8dbb46c8c98c349f24f86662ec3b8fca7dd6a832
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Mar 29 17:15:01 2023 +0300
IGNITE-19085 Fix waiting for indexes on recovery when building indexes
(#1855)
---
.../apache/ignite/internal/index/IndexManager.java | 1 +
.../ignite/internal/index/IndexManagerTest.java | 2 +-
.../apache/ignite/internal/table/TableImpl.java | 88 ++++++++++------------
.../internal/table/distributed/TableManager.java | 29 +++++--
.../org/apache/ignite/internal/table/Example.java | 4 +-
5 files changed, 66 insertions(+), 58 deletions(-)
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 47a0ec6a3c..ed514ba196 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -73,6 +73,7 @@ import org.jetbrains.annotations.NotNull;
* An Ignite component that is responsible for handling index-related commands
like CREATE or DROP
* as well as managing indexes' lifecycle.
*/
+// TODO: IGNITE-19082 Delete this class
public class IndexManager extends Producer<IndexEvent, IndexEventParameters>
implements IgniteComponent {
private static final IgniteLogger LOG =
Loggers.forClass(IndexManager.class);
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index a072fb3caa..99e8ff769b 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -85,7 +85,7 @@ public class IndexManagerTest {
Mockito.doReturn(inv.getArgument(1)).when(tbl).tableId();
- return completedFuture(new TableImpl(tbl, new HeapLockManager(),
() -> completedFuture(List.of())));
+ return completedFuture(new TableImpl(tbl, new HeapLockManager()));
});
SchemaManager schManager = mock(SchemaManager.class);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 35b5c5f456..560c0b0543 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table;
import static java.util.concurrent.CompletableFuture.allOf;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -51,7 +52,6 @@ import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.TestOnly;
/**
@@ -63,8 +63,6 @@ public class TableImpl implements Table {
private final LockManager lockManager;
- // private final Supplier<List<UUID>> activeIndexIds;
-
/** Schema registry. Should be set either in constructor or via {@link
#schemaView(SchemaRegistry)} before start of using the table. */
private volatile SchemaRegistry schemaReg;
@@ -80,12 +78,10 @@ public class TableImpl implements Table {
*
* @param tbl The table.
* @param lockManager Lock manager.
- * @param activeIndexIds Supplier of index ids which considered active on
the moment of invocation.
*/
- public TableImpl(InternalTable tbl, LockManager lockManager,
Supplier<CompletableFuture<List<UUID>>> activeIndexIds) {
+ public TableImpl(InternalTable tbl, LockManager lockManager) {
this.tbl = tbl;
this.lockManager = lockManager;
- // this.activeIndexIds = activeIndexIds;
}
/**
@@ -100,8 +96,6 @@ public class TableImpl implements Table {
this.tbl = tbl;
this.schemaReg = schemaReg;
this.lockManager = lockManager;
-
- // activeIndexIds = List::of;
}
/**
@@ -109,7 +103,7 @@ public class TableImpl implements Table {
*
* @return Table id as UUID.
*/
- public @NotNull UUID tableId() {
+ public UUID tableId() {
return tbl.tableId();
}
@@ -132,8 +126,7 @@ public class TableImpl implements Table {
return tbl;
}
- /** {@inheritDoc} */
- @Override public @NotNull String name() {
+ @Override public String name() {
return tbl.name();
}
@@ -149,7 +142,7 @@ public class TableImpl implements Table {
/**
* Sets a schema view for the table.
*/
- public void schemaView(@NotNull SchemaRegistry schemaReg) {
+ public void schemaView(SchemaRegistry schemaReg) {
assert this.schemaReg == null : "Schema registry is already set
[tableName=" + name() + ']';
Objects.requireNonNull(schemaReg, "Schema registry must not be null
[tableName=" + name() + ']');
@@ -157,25 +150,21 @@ public class TableImpl implements Table {
this.schemaReg = schemaReg;
}
- /** {@inheritDoc} */
@Override
public <R> RecordView<R> recordView(Mapper<R> recMapper) {
return new RecordViewImpl<>(tbl, schemaReg, recMapper);
}
- /** {@inheritDoc} */
@Override
public RecordView<Tuple> recordView() {
return new RecordBinaryViewImpl(tbl, schemaReg);
}
- /** {@inheritDoc} */
@Override
public <K, V> KeyValueView<K, V> keyValueView(Mapper<K> keyMapper,
Mapper<V> valMapper) {
return new KeyValueViewImpl<>(tbl, schemaReg, keyMapper, valMapper);
}
- /** {@inheritDoc} */
@Override
public KeyValueView<Tuple, Tuple> keyValueView() {
return new KeyValueBinaryViewImpl(tbl, schemaReg);
@@ -311,11 +300,7 @@ public class TableImpl implements Table {
)
);
- CompletableFuture<?> indexFuture = indexesToWait.remove(indexId);
-
- if (indexFuture != null) {
- indexFuture.complete(null);
- }
+ completeWaitIndex(indexId);
}
/**
@@ -346,11 +331,7 @@ public class TableImpl implements Table {
)
);
- CompletableFuture<?> indexFuture = indexesToWait.remove(indexId);
-
- if (indexFuture != null) {
- indexFuture.complete(null);
- }
+ completeWaitIndex(indexId);
}
/**
@@ -361,33 +342,18 @@ public class TableImpl implements Table {
public void unregisterIndex(UUID indexId) {
indexLockerFactories.remove(indexId);
indexStorageAdapterFactories.remove(indexId);
+
+ completeWaitIndex(indexId);
+
+ // TODO: IGNITE-19150 Also need to destroy the index storages
}
private void awaitIndexes() {
- // TODO: replace with actual call to ids supplier
- List<UUID> indexIds = List.of(pkId()); // activeIndexIds.get();
-
List<CompletableFuture<?>> toWait = new ArrayList<>();
- for (UUID indexId : indexIds) {
- if (indexLockerFactories.containsKey(indexId) &&
indexStorageAdapterFactories.containsKey(indexId)) {
- continue;
- }
-
- CompletableFuture<?> indexFuture =
indexesToWait.computeIfAbsent(indexId, k -> new CompletableFuture<>());
-
- // there is no synchronization between modification of
index*Factories collections
- // and indexesToWait collection, thus we may run into situation,
when index was
- // registered in the between of index existence check and
registering a wait future.
- // This second check aimed to resolve this race
- if (indexLockerFactories.containsKey(indexId) &&
indexStorageAdapterFactories.containsKey(indexId)) {
- indexesToWait.remove(indexId);
-
- continue;
- }
+ toWait.add(pkId);
- toWait.add(indexFuture);
- }
+ toWait.addAll(indexesToWait.values());
allOf(toWait.toArray(CompletableFuture[]::new)).join();
}
@@ -396,10 +362,14 @@ public class TableImpl implements Table {
* Prepares this table for being closed.
*/
public void beforeClose() {
- pkId.completeExceptionally(new IgniteInternalException(
+ IgniteInternalException closeTableException = new
IgniteInternalException(
ErrorGroups.Table.TABLE_STOPPING_ERR,
"Table is being stopped: tableId=" + tableId()
- ));
+ );
+
+ pkId.completeExceptionally(closeTableException);
+
+ indexesToWait.values().forEach(future ->
future.completeExceptionally(closeTableException));
}
@FunctionalInterface
@@ -413,4 +383,24 @@ public class TableImpl implements Table {
/** Creates the index decorator for given partition. */
TableSchemaAwareIndexStorage create(int partitionId);
}
+
+ /**
+ * Adds indexes to wait before inserting data into the table.
+ *
+ * @param indexIds Indexes Index IDs.
+ */
+ // TODO: IGNITE-19082 Needs to be redone/improved
+ public void addIndexesToWait(Collection<UUID> indexIds) {
+ for (UUID indexId : indexIds) {
+ indexesToWait.computeIfAbsent(indexId, uuid -> new
CompletableFuture<>());
+ }
+ }
+
+ private void completeWaitIndex(UUID indexId) {
+ CompletableFuture<?> indexToWaitFuture = indexesToWait.remove(indexId);
+
+ if (indexToWaitFuture != null) {
+ indexToWaitFuture.complete(null);
+ }
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 24f028aff4..03136e6def 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodes;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
@@ -1130,8 +1131,10 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
distributionZoneConfiguration.partitions().value(),
clusterNodeResolver, txManager, tableStorage,
txStateStorage, replicaSvc, clock);
- // TODO: IGNITE-16288 directIndexIds should use async configuration API
- var table = new TableImpl(internalTable, lockMgr, () ->
CompletableFuture.supplyAsync(this::directIndexIds));
+ var table = new TableImpl(internalTable, lockMgr);
+
+ // TODO: IGNITE-19082 Need another way to wait for indexes
+ table.addIndexesToWait(collectTableIndexes(tblId));
tablesByIdVv.update(causalityToken, (previous, e) ->
inBusyLock(busyLock, () -> {
if (e != null) {
@@ -1560,7 +1563,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
*/
private CompletableFuture<List<Table>> tablesAsyncInternal() {
// TODO: IGNITE-16288 directTableIds should use async configuration API
- return CompletableFuture.supplyAsync(() -> inBusyLock(busyLock,
this::directTableIds))
+ return supplyAsync(() -> inBusyLock(busyLock, this::directTableIds),
ioExecutor)
.thenCompose(tableIds -> inBusyLock(busyLock, () -> {
var tableFuts = new CompletableFuture[tableIds.size()];
@@ -1709,7 +1712,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
try {
// TODO: IGNITE-16288 directTableId should use async configuration
API
- return CompletableFuture.supplyAsync(() -> inBusyLock(busyLock, ()
-> directTableId(name)))
+ return supplyAsync(() -> inBusyLock(busyLock, () ->
directTableId(name)), ioExecutor)
.thenCompose(tableId -> inBusyLock(busyLock, () -> {
if (tableId == null) {
return completedFuture(null);
@@ -1733,7 +1736,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
public CompletableFuture<TableImpl> tableAsyncInternal(UUID id, boolean
checkConfiguration) {
CompletableFuture<Boolean> tblCfgFut = checkConfiguration
// TODO: IGNITE-16288 isTableConfigured should use async
configuration API
- ? CompletableFuture.supplyAsync(() -> inBusyLock(busyLock, ()
-> isTableConfigured(id)))
+ ? supplyAsync(() -> inBusyLock(busyLock, () ->
isTableConfigured(id)), ioExecutor)
: completedFuture(true);
return tblCfgFut.thenCompose(isCfg -> inBusyLock(busyLock, () -> {
@@ -2380,4 +2383,20 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
nodeStoppingEx.set(true);
}
}
+
+ private Collection<UUID> collectTableIndexes(UUID tableId) {
+ NamedListView<? extends TableIndexView> indexes =
tablesCfg.value().indexes();
+
+ List<UUID> indexIds = new ArrayList<>();
+
+ for (int i = 0; i < indexes.size(); i++) {
+ TableIndexView indexConfig = indexes.get(i);
+
+ if (indexConfig.tableId().equals(tableId)) {
+ indexIds.add(indexConfig.id());
+ }
+ }
+
+ return indexIds;
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java
index 3035d21993..26224fea9a 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java
@@ -21,7 +21,6 @@ import java.math.BigDecimal;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjects;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -53,8 +52,7 @@ public class Example {
*/
private static List<Table> tableFactory() {
return Collections.singletonList(
- new TableImpl(new
DummyInternalTableImpl(Mockito.mock(ReplicaService.class)), new
HeapLockManager(),
- () -> CompletableFuture.completedFuture(List.of())));
+ new TableImpl(new
DummyInternalTableImpl(Mockito.mock(ReplicaService.class)), new
HeapLockManager()));
}
/**