This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 952c97b2d5 IGNITE-21909 Fix race on getting and destroying an index in
SharedRocksDbInstance (#3544)
952c97b2d5 is described below
commit 952c97b2d5ccf532fb8181c8a8cefaa783ea4a5d
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Apr 4 09:57:51 2024 +0300
IGNITE-21909 Fix race on getting and destroying an index in
SharedRocksDbInstance (#3544)
---
.../ignite/internal/storage/rocksdb/HashIndex.java | 2 +-
.../ignite/internal/storage/rocksdb/Index.java | 28 ++++--
.../internal/storage/rocksdb/RocksDbIndexes.java | 12 ++-
.../internal/storage/rocksdb/SortedIndex.java | 2 +-
.../rocksdb/instance/SharedRocksDbInstance.java | 106 +++++++++++++--------
.../instance/SharedRocksDbInstanceTest.java | 44 ++++++++-
6 files changed, 134 insertions(+), 60 deletions(-)
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index bd45c252f3..29b204014c 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -43,6 +43,6 @@ class HashIndex extends Index<RocksDbHashIndexStorage> {
@Override
RocksDbHashIndexStorage createStorage(int partitionId) {
- return new RocksDbHashIndexStorage(descriptor, tableId, partitionId,
indexColumnFamily().columnFamily(), indexMetaStorage);
+ return new RocksDbHashIndexStorage(descriptor, tableId(), partitionId,
columnFamily(), indexMetaStorage);
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
index 58c23aefaa..0625ceaf09 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.StorageException;
import
org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage;
-import org.apache.ignite.internal.storage.rocksdb.instance.IndexColumnFamily;
import org.apache.ignite.internal.storage.util.StorageState;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;
@@ -36,19 +35,30 @@ import org.rocksdb.WriteBatch;
* Represents an index for all its partitions.
*/
abstract class Index<S extends AbstractRocksDbIndexStorage> {
- final int tableId;
+ private final int tableId;
- private final IndexColumnFamily indexColumnFamily;
+ private final int indexId;
+
+ private final ColumnFamily columnFamily;
private final ConcurrentMap<Integer, S> storageByPartitionId = new
ConcurrentHashMap<>();
Index(int tableId, int indexId, ColumnFamily cf) {
this.tableId = tableId;
- this.indexColumnFamily = new IndexColumnFamily(indexId, cf);
+ this.indexId = indexId;
+ this.columnFamily = cf;
+ }
+
+ int tableId() {
+ return tableId;
+ }
+
+ int indexId() {
+ return indexId;
}
- IndexColumnFamily indexColumnFamily() {
- return indexColumnFamily;
+ ColumnFamily columnFamily() {
+ return columnFamily;
}
/**
@@ -76,7 +86,7 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> {
try {
IgniteUtils.closeAll(storageByPartitionId.values().stream().map(index ->
index::close));
} catch (Exception e) {
- throw new StorageException("Failed to close index storages: " +
indexColumnFamily.indexId(), e);
+ throw new StorageException("Failed to close index storages: " +
indexId, e);
}
}
@@ -87,7 +97,7 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> {
try {
IgniteUtils.closeAll(storageByPartitionId.values().stream().map(index ->
index::transitionToDestroyedState));
} catch (Exception e) {
- throw new StorageException("Failed to transition index storages to
the DESTROYED state: " + indexColumnFamily.indexId(), e);
+ throw new StorageException("Failed to transition index storages to
the DESTROYED state: " + indexId, e);
}
}
@@ -113,6 +123,6 @@ abstract class Index<S extends AbstractRocksDbIndexStorage>
{
void destroy(WriteBatch writeBatch) throws RocksDBException {
transitionToDestroyedState();
- deleteByPrefix(writeBatch, indexColumnFamily.columnFamily(),
indexPrefix(tableId, indexColumnFamily().indexId()));
+ deleteByPrefix(writeBatch, columnFamily, indexPrefix(tableId,
indexId));
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
index 7ff1b893a1..152d241e55 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
@@ -73,7 +73,7 @@ class RocksDbIndexes {
}
}
- var indexCfsToDestroy = new ArrayList<IndexColumnFamily>();
+ var indexCfsToDestroy = new ArrayList<ColumnFamily>();
for (IndexColumnFamily indexColumnFamily :
rocksDb.sortedIndexes(tableId)) {
int indexId = indexColumnFamily.indexId();
@@ -83,9 +83,11 @@ class RocksDbIndexes {
var descriptor = (StorageSortedIndexDescriptor)
indexDescriptorSupplier.get(indexId);
if (descriptor == null) {
+ rocksDb.removeSortedIndex(indexId, cf);
+
deleteByPrefix(writeBatch, cf, indexPrefix(tableId,
indexId));
- indexCfsToDestroy.add(indexColumnFamily);
+ indexCfsToDestroy.add(cf);
} else {
sortedIndices.put(indexId,
SortedIndex.restoreExisting(tableId, cf, descriptor, rocksDb.meta));
}
@@ -94,7 +96,7 @@ class RocksDbIndexes {
rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
if (!indexCfsToDestroy.isEmpty()) {
- rocksDb.scheduleIndexCfsDestroy(indexCfsToDestroy);
+ rocksDb.scheduleIndexCfsDestroyIfNeeded(indexCfsToDestroy);
}
}
}
@@ -174,6 +176,8 @@ class RocksDbIndexes {
}
if (sortedIdx != null) {
+ rocksDb.removeSortedIndex(indexId, sortedIdx.columnFamily());
+
sortedIdx.destroy(writeBatch);
}
@@ -181,7 +185,7 @@ class RocksDbIndexes {
}
if (sortedIdx != null) {
-
rocksDb.scheduleIndexCfsDestroy(List.of(sortedIdx.indexColumnFamily()));
+
rocksDb.scheduleIndexCfsDestroyIfNeeded(List.of(sortedIdx.columnFamily()));
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
index c683d971c3..df97ab2c54 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -66,6 +66,6 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> {
@Override
RocksDbSortedIndexStorage createStorage(int partitionId) {
- return new RocksDbSortedIndexStorage(descriptor, tableId, partitionId,
indexColumnFamily().columnFamily(), indexMetaStorage);
+ return new RocksDbSortedIndexStorage(descriptor, tableId(),
partitionId, columnFamily(), indexMetaStorage);
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
index 8d31ae7477..26dad1528e 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -77,10 +78,8 @@ public final class SharedRocksDbInstance {
final Map<Integer, Integer> indexIdToTableId = new
ConcurrentHashMap<>();
- SortedIndexColumnFamily(ColumnFamily columnFamily, int indexId, int
tableId) {
+ SortedIndexColumnFamily(ColumnFamily columnFamily) {
this.columnFamily = columnFamily;
-
- indexIdToTableId.put(indexId, tableId);
}
SortedIndexColumnFamily(ColumnFamily columnFamily, Map<Integer,
Integer> indexIdToTableId) {
@@ -272,12 +271,12 @@ public final class SharedRocksDbInstance {
try {
SortedIndexColumnFamily result = sortedIndexCfsByName.compute(new
ByteArray(cfName), (unused, sortedIndexCf) -> {
if (sortedIndexCf == null) {
- return new
SortedIndexColumnFamily(createSortedIndexCf(cfName), indexId, tableId);
- } else {
- sortedIndexCf.indexIdToTableId.put(indexId, tableId);
-
- return sortedIndexCf;
+ sortedIndexCf = new
SortedIndexColumnFamily(createSortedIndexCf(cfName));
}
+
+ sortedIndexCf.indexIdToTableId.put(indexId, tableId);
+
+ return sortedIndexCf;
});
return result.columnFamily;
@@ -287,70 +286,95 @@ public final class SharedRocksDbInstance {
}
/**
- * Schedules a drop of a column family after destroying an index, if it
was the last index managed by that CF.
+ * Removes the given sorted index from this instance. This prevents this
index to be returned by {@link #sortedIndexes} call.
*/
- public CompletableFuture<Void>
scheduleIndexCfsDestroy(List<IndexColumnFamily> indexColumnFamilies) {
- assert !indexColumnFamilies.isEmpty();
+ public void removeSortedIndex(int indexId, ColumnFamily cf) {
+ var cfNameBytes = new ByteArray(cf.nameBytes());
- return flusher.awaitFlush(false)
- .thenRunAsync(() ->
indexColumnFamilies.forEach(this::destroySortedIndexCfIfNeeded),
engine.threadPool());
+ sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused, indexCf)
-> {
+ indexCf.indexIdToTableId.remove(indexId);
+
+ return indexCf;
+ });
}
- void destroySortedIndexCfIfNeeded(IndexColumnFamily indexColumnFamily) {
- if (!busyLock.enterBusy()) {
- throw new StorageClosedException();
- }
+ /**
+ * Schedules a drop of a column family after destroying an index, if it
was the last index managed by that CF.
+ */
+ public CompletableFuture<Void>
scheduleIndexCfsDestroyIfNeeded(List<ColumnFamily> columnFamilies) {
+ assert !columnFamilies.isEmpty();
- var cfNameBytes = new
ByteArray(indexColumnFamily.columnFamily().nameBytes());
+ return flusher.awaitFlush(false)
+ .thenRunAsync(() -> {
+ if (!busyLock.enterBusy()) {
+ throw new StorageClosedException();
+ }
+
+ try {
+
columnFamilies.forEach(this::destroySortedIndexCfIfNeeded);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }, engine.threadPool());
+ }
- try {
- sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused,
indexCf) -> {
- indexCf.indexIdToTableId.remove(indexColumnFamily.indexId());
+ void destroySortedIndexCfIfNeeded(ColumnFamily columnFamily) {
+ var cfNameBytes = new ByteArray(columnFamily.nameBytes());
- if (!indexCf.indexIdToTableId.isEmpty()) {
- return indexCf;
- }
+ sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused, indexCf)
-> {
+ if (!indexCf.indexIdToTableId.isEmpty()) {
+ return indexCf;
+ }
- destroyColumnFamily(indexCf.columnFamily);
+ destroyColumnFamily(indexCf.columnFamily);
- return null;
- });
- } finally {
- busyLock.leaveBusy();
- }
+ return null;
+ });
}
/**
* Removes all data associated with the given table ID in this storage.
*/
- public void destroyTable(int tableId) {
+ public void destroyTable(int targetTableId) {
try (WriteBatch writeBatch = new WriteBatch()) {
byte[] tableIdBytes = ByteBuffer.allocate(Integer.BYTES)
.order(KEY_BYTE_ORDER)
- .putInt(tableId)
+ .putInt(targetTableId)
.array();
deleteByPrefix(writeBatch, partitionCf, tableIdBytes);
deleteByPrefix(writeBatch, gcQueueCf, tableIdBytes);
deleteByPrefix(writeBatch, hashIndexCf, tableIdBytes);
- List<IndexColumnFamily> sortedIndexCfs = sortedIndexes(tableId);
-
- for (IndexColumnFamily indexColumnFamily : sortedIndexCfs) {
- deleteByPrefix(writeBatch, indexColumnFamily.columnFamily(),
tableIdBytes);
- }
-
deleteByPrefix(writeBatch, meta.columnFamily(),
metaPrefix(PARTITION_META_PREFIX, tableIdBytes));
deleteByPrefix(writeBatch, meta.columnFamily(),
metaPrefix(PARTITION_CONF_PREFIX, tableIdBytes));
deleteByPrefix(writeBatch, meta.columnFamily(),
metaPrefix(INDEX_ROW_ID_PREFIX, tableIdBytes));
+ var cfsToRemove = new ArrayList<ColumnFamily>();
+
+ for (SortedIndexColumnFamily indexCf :
sortedIndexCfsByName.values()) {
+ Iterator<Integer> it =
indexCf.indexIdToTableId.values().iterator();
+
+ while (it.hasNext()) {
+ int tableId = it.next();
+
+ if (targetTableId == tableId) {
+ it.remove();
+
+ deleteByPrefix(writeBatch, indexCf.columnFamily,
tableIdBytes);
+
+ cfsToRemove.add(indexCf.columnFamily);
+ }
+ }
+ }
+
db.write(DFLT_WRITE_OPTS, writeBatch);
- if (!sortedIndexCfs.isEmpty()) {
- scheduleIndexCfsDestroy(sortedIndexCfs);
+ if (!cfsToRemove.isEmpty()) {
+ scheduleIndexCfsDestroyIfNeeded(cfsToRemove);
}
} catch (RocksDBException e) {
- throw new StorageException("Failed to destroy table data.
[tableId={}]", e, tableId);
+ throw new StorageException("Failed to destroy table data.
[tableId={}]", e, targetTableId);
}
}
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
index b753b9c452..a3ee3ba3de 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
@@ -126,19 +126,23 @@ class SharedRocksDbInstanceTest extends
IgniteAbstractTest {
assertThat(foo, is(not(sameInstance(bar))));
assertThat(quux, is((sameInstance(baz))));
- rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(1, foo));
+ rocksDb.removeSortedIndex(1, foo);
+ rocksDb.destroySortedIndexCfIfNeeded(foo);
assertTrue(cfExists(fooName));
- rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(2, bar));
+ rocksDb.removeSortedIndex(2, bar);
+ rocksDb.destroySortedIndexCfIfNeeded(bar);
assertFalse(cfExists(barName));
- rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(3, baz));
+ rocksDb.removeSortedIndex(3, baz);
+ rocksDb.destroySortedIndexCfIfNeeded(baz);
assertTrue(cfExists(fooName));
- rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(4, quux));
+ rocksDb.removeSortedIndex(4, quux);
+ rocksDb.destroySortedIndexCfIfNeeded(quux);
assertFalse(cfExists(fooName));
}
@@ -277,6 +281,38 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest
{
assertThat(getIndexFuture.join().stream().map(IndexColumnFamily::indexId).collect(toList()),
contains(0));
}
+ @Test
+ void testRemoveSortedIndex() {
+ int tableId = 0;
+
+ int indexId = 0;
+
+ byte[] fooName = sortedIndexCfName(List.of(
+ new StorageSortedIndexColumnDescriptor("a", NativeTypes.INT64,
true, true)
+ ));
+
+ ColumnFamily cf = rocksDb.getOrCreateSortedIndexCf(fooName, indexId,
tableId);
+
+ rocksDb.removeSortedIndex(indexId, cf);
+
+ assertThat(rocksDb.sortedIndexes(tableId), is(empty()));
+ }
+
+ @Test
+ void testTableDestroyRemovesSortedIndexes() {
+ int tableId = 0;
+
+ byte[] fooName = sortedIndexCfName(List.of(
+ new StorageSortedIndexColumnDescriptor("a", NativeTypes.INT64,
true, true)
+ ));
+
+ rocksDb.getOrCreateSortedIndexCf(fooName, 0, tableId);
+
+ rocksDb.destroyTable(tableId);
+
+ assertThat(rocksDb.sortedIndexes(tableId), is(empty()));
+ }
+
private boolean cfExists(byte[] cfName) {
try {
// Check Column Family existence by trying to create a new one
with the same name.