This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bbdb28a993 IGNITE-21760 Remove destroyed RocksDB tables on recovery
(#3488)
bbdb28a993 is described below
commit bbdb28a993556c42c201f7f899fa3d525e16f398
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Mar 27 10:47:31 2024 +0200
IGNITE-21760 Remove destroyed RocksDB tables on recovery (#3488)
---
check-rules/spotbugs-excludes.xml | 10 +++
.../storage/engine/AbstractStorageEngineTest.java | 2 +-
.../ignite/internal/storage/rocksdb/HashIndex.java | 13 ++-
.../ignite/internal/storage/rocksdb/Index.java | 20 ++---
.../storage/rocksdb/RocksDbDataRegion.java | 16 ++--
.../internal/storage/rocksdb/RocksDbIndexes.java | 49 +++---------
.../storage/rocksdb/RocksDbMetaStorage.java | 22 ++---
.../storage/rocksdb/RocksDbStorageEngine.java | 93 ++++++++++++----------
.../storage/rocksdb/RocksDbTableStorage.java | 29 +------
.../internal/storage/rocksdb/SortedIndex.java | 19 +----
.../rocksdb/index/AbstractRocksDbIndexStorage.java | 11 ++-
.../rocksdb/index/RocksDbHashIndexStorage.java | 2 +-
.../rocksdb/index/RocksDbSortedIndexStorage.java | 2 +-
.../rocksdb/instance/IndexColumnFamily.java | 40 ++++++++++
.../rocksdb/instance/SharedRocksDbInstance.java | 78 +++++++++++++++---
.../rocksdb/engine/RocksDbStorageEngineTest.java | 7 --
.../instance/SharedRocksDbInstanceTest.java | 41 +++++++---
17 files changed, 262 insertions(+), 192 deletions(-)
diff --git a/check-rules/spotbugs-excludes.xml
b/check-rules/spotbugs-excludes.xml
index 3d4554aca2..a25c91874a 100644
--- a/check-rules/spotbugs-excludes.xml
+++ b/check-rules/spotbugs-excludes.xml
@@ -206,6 +206,16 @@
<Class
name="org.apache.ignite.internal.tx.impl.TransactionInflights$TxContext"/>
<Field name="inflights"/>
</Match>
+ <Match>
+ <!-- Public byte array constants, not expected to be modified. -->
+ <Bug pattern="MS_MUTABLE_ARRAY"/>
+ <Class
name="org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage"/>
+ <Or>
+ <Field name="PARTITION_META_PREFIX"/>
+ <Field name="PARTITION_CONF_PREFIX"/>
+ <Field name="INDEX_ROW_ID_PREFIX"/>
+ </Or>
+ </Match>
<!-- end of false-positive exclusions -->
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
index d7752d3fe6..f8eab538bb 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
@@ -112,7 +112,7 @@ public abstract class AbstractStorageEngineTest extends
BaseMvStoragesTest {
}
@Test
- protected void testDropMvTableOnRecovery() throws Exception {
+ void testDropMvTableOnRecovery() throws Exception {
assumeFalse(storageEngine.isVolatile());
int tableId = 1;
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
index 571e0f5853..bd45c252f3 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/HashIndex.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.storage.rocksdb;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import
org.apache.ignite.internal.storage.rocksdb.index.RocksDbHashIndexStorage;
-import
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
/**
* Class that represents a Hash Index defined for all partitions of a Table.
@@ -29,8 +29,13 @@ class HashIndex extends Index<RocksDbHashIndexStorage> {
private final RocksDbMetaStorage indexMetaStorage;
- HashIndex(SharedRocksDbInstance rocksDb, int tableId,
StorageHashIndexDescriptor descriptor, RocksDbMetaStorage indexMetaStorage) {
- super(tableId, descriptor.id(), rocksDb.hashIndexCf());
+ HashIndex(
+ int tableId,
+ ColumnFamily indexCf,
+ StorageHashIndexDescriptor descriptor,
+ RocksDbMetaStorage indexMetaStorage
+ ) {
+ super(tableId, descriptor.id(), indexCf);
this.descriptor = descriptor;
this.indexMetaStorage = indexMetaStorage;
@@ -38,6 +43,6 @@ class HashIndex extends Index<RocksDbHashIndexStorage> {
@Override
RocksDbHashIndexStorage createStorage(int partitionId) {
- return new RocksDbHashIndexStorage(descriptor, tableId, partitionId,
indexCf, indexMetaStorage);
+ return new RocksDbHashIndexStorage(descriptor, tableId, partitionId,
indexColumnFamily().columnFamily(), indexMetaStorage);
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
index c184cf9a70..58c23aefaa 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/Index.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.StorageException;
import
org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.instance.IndexColumnFamily;
import org.apache.ignite.internal.storage.util.StorageState;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;
@@ -37,16 +38,17 @@ import org.rocksdb.WriteBatch;
abstract class Index<S extends AbstractRocksDbIndexStorage> {
final int tableId;
- final int indexId;
-
- final ColumnFamily indexCf;
+ private final IndexColumnFamily indexColumnFamily;
private final ConcurrentMap<Integer, S> storageByPartitionId = new
ConcurrentHashMap<>();
- Index(int tableId, int indexId, ColumnFamily indexCf) {
+ Index(int tableId, int indexId, ColumnFamily cf) {
this.tableId = tableId;
- this.indexId = indexId;
- this.indexCf = indexCf;
+ this.indexColumnFamily = new IndexColumnFamily(indexId, cf);
+ }
+
+ IndexColumnFamily indexColumnFamily() {
+ return indexColumnFamily;
}
/**
@@ -74,7 +76,7 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> {
try {
IgniteUtils.closeAll(storageByPartitionId.values().stream().map(index ->
index::close));
} catch (Exception e) {
- throw new StorageException("Failed to close index storages: " +
indexId, e);
+ throw new StorageException("Failed to close index storages: " +
indexColumnFamily.indexId(), e);
}
}
@@ -85,7 +87,7 @@ abstract class Index<S extends AbstractRocksDbIndexStorage> {
try {
IgniteUtils.closeAll(storageByPartitionId.values().stream().map(index ->
index::transitionToDestroyedState));
} catch (Exception e) {
- throw new StorageException("Failed to transition index storages to
the DESTROYED state: " + indexId, e);
+ throw new StorageException("Failed to transition index storages to
the DESTROYED state: " + indexColumnFamily.indexId(), e);
}
}
@@ -111,6 +113,6 @@ abstract class Index<S extends AbstractRocksDbIndexStorage>
{
void destroy(WriteBatch writeBatch) throws RocksDBException {
transitionToDestroyedState();
- deleteByPrefix(writeBatch, indexCf, indexPrefix(tableId, indexId));
+ deleteByPrefix(writeBatch, indexColumnFamily.columnFamily(),
indexPrefix(tableId, indexColumnFamily().indexId()));
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
index fb7a73512f..20581f50a5 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
@@ -21,7 +21,6 @@ import static
org.apache.ignite.internal.storage.rocksdb.configuration.schema.Ro
import static
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionConfigurationSchema.ROCKSDB_LRU_CACHE;
import java.util.Locale;
-import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionConfiguration;
import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionView;
import org.apache.ignite.internal.util.IgniteUtils;
import org.rocksdb.Cache;
@@ -34,7 +33,7 @@ import org.rocksdb.WriteBufferManager;
*/
public class RocksDbDataRegion {
/** Region configuration. */
- private final RocksDbDataRegionConfiguration cfg;
+ private final RocksDbDataRegionView dataRegionView;
/** RocksDB cache instance. */
private Cache cache;
@@ -45,18 +44,16 @@ public class RocksDbDataRegion {
/**
* Constructor.
*
- * @param cfg Data region configuration.
+ * @param dataRegionView Data region configuration.
*/
- public RocksDbDataRegion(RocksDbDataRegionConfiguration cfg) {
- this.cfg = cfg;
+ public RocksDbDataRegion(RocksDbDataRegionView dataRegionView) {
+ this.dataRegionView = dataRegionView;
}
/**
* Start the rocksDb data region.
*/
public void start() {
- RocksDbDataRegionView dataRegionView = cfg.value();
-
long writeBufferSize = dataRegionView.writeBufferSize();
long totalCacheSize = dataRegionView.size() + writeBufferSize;
@@ -73,7 +70,10 @@ public class RocksDbDataRegion {
break;
default:
- assert false : dataRegionView.cache();
+ throw new AssertionError(String.format(
+ "Unknown data region cache type: [dataRegion=%s,
cacheType=%s]",
+ dataRegionView.name(), dataRegionView.cache()
+ ));
}
writeBufferManager = new WriteBufferManager(writeBufferSize, cache);
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
index dee24e0369..7ff1b893a1 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbIndexes.java
@@ -27,7 +27,6 @@ import static
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbI
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;
@@ -39,6 +38,7 @@ import
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import
org.apache.ignite.internal.storage.rocksdb.index.AbstractRocksDbIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.instance.IndexColumnFamily;
import
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.RocksDBException;
@@ -69,41 +69,32 @@ class RocksDbIndexes {
if (descriptor == null) {
deleteByPrefix(writeBatch, rocksDb.hashIndexCf(),
indexPrefix(tableId, indexId));
} else {
- hashIndices.put(indexId, new HashIndex(rocksDb, tableId,
descriptor, rocksDb.meta));
+ hashIndices.put(indexId, new HashIndex(tableId,
rocksDb.hashIndexCf(), descriptor, rocksDb.meta));
}
}
- var indexCfsToDestroy = new ArrayList<Map.Entry<Integer,
ColumnFamily>>();
+ var indexCfsToDestroy = new ArrayList<IndexColumnFamily>();
- for (Map.Entry<Integer, ColumnFamily> e :
rocksDb.sortedIndexes(tableId).entrySet()) {
- int indexId = e.getKey();
+ for (IndexColumnFamily indexColumnFamily :
rocksDb.sortedIndexes(tableId)) {
+ int indexId = indexColumnFamily.indexId();
- ColumnFamily indexCf = e.getValue();
+ ColumnFamily cf = indexColumnFamily.columnFamily();
var descriptor = (StorageSortedIndexDescriptor)
indexDescriptorSupplier.get(indexId);
if (descriptor == null) {
- deleteByPrefix(writeBatch, indexCf, indexPrefix(tableId,
indexId));
+ deleteByPrefix(writeBatch, cf, indexPrefix(tableId,
indexId));
- indexCfsToDestroy.add(e);
+ indexCfsToDestroy.add(indexColumnFamily);
} else {
- sortedIndices.put(indexId,
SortedIndex.restoreExisting(rocksDb, tableId, indexCf, descriptor,
rocksDb.meta));
+ sortedIndices.put(indexId,
SortedIndex.restoreExisting(tableId, cf, descriptor, rocksDb.meta));
}
}
rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
if (!indexCfsToDestroy.isEmpty()) {
- rocksDb.flusher.awaitFlush(false)
- .thenRunAsync(() -> {
- for (Map.Entry<Integer, ColumnFamily> e :
indexCfsToDestroy) {
- int indexId = e.getKey();
-
- ColumnFamily indexCf = e.getValue();
-
-
rocksDb.destroySortedIndexCfIfNeeded(indexCf.nameBytes(), indexId);
- }
- }, rocksDb.engine.threadPool());
+ rocksDb.scheduleIndexCfsDestroy(indexCfsToDestroy);
}
}
}
@@ -120,7 +111,7 @@ class RocksDbIndexes {
HashIndexStorage getOrCreateHashIndex(int partitionId,
StorageHashIndexDescriptor indexDescriptor) {
HashIndex hashIndex = hashIndices.computeIfAbsent(
indexDescriptor.id(),
- id -> new HashIndex(rocksDb, tableId, indexDescriptor,
rocksDb.meta)
+ id -> new HashIndex(tableId, rocksDb.hashIndexCf(),
indexDescriptor, rocksDb.meta)
);
return hashIndex.getOrCreateStorage(partitionId);
@@ -190,8 +181,7 @@ class RocksDbIndexes {
}
if (sortedIdx != null) {
- rocksDb.flusher.awaitFlush(false)
- .thenRunAsync(sortedIdx::destroySortedIndexCfIfNeeded,
rocksDb.engine.threadPool());
+
rocksDb.scheduleIndexCfsDestroy(List.of(sortedIdx.indexColumnFamily()));
}
}
@@ -205,21 +195,6 @@ class RocksDbIndexes {
}
}
- void destroyAllIndexes(WriteBatch writeBatch) throws RocksDBException {
- for (HashIndex hashIndex : hashIndices.values()) {
- hashIndex.destroy(writeBatch);
- }
-
- for (SortedIndex sortedIndex : sortedIndices.values()) {
- sortedIndex.destroy(writeBatch);
- }
- }
-
- void scheduleAllIndexCfDestroy() {
- rocksDb.flusher.awaitFlush(false)
- .thenRunAsync(() ->
sortedIndices.values().forEach(SortedIndex::destroySortedIndexCfIfNeeded),
rocksDb.engine.threadPool());
- }
-
Stream<AbstractRocksDbIndexStorage> getAllStorages(int partitionId) {
return Stream.concat(
hashIndices.values().stream().map(index ->
index.getStorage(partitionId)),
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
index 193fcfc8cf..72e71e6c56 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMetaStorage.java
@@ -42,17 +42,17 @@ public class RocksDbMetaStorage {
* Prefix to store partition meta information, such as last applied index
and term.
* Key format is {@code [prefix, tableId, partitionId]} in BE.
*/
- static final byte[] PARTITION_META_PREFIX = {0};
+ public static final byte[] PARTITION_META_PREFIX = {0};
/**
* Prefix to store partition configuration. Key format is {@code [prefix,
tableId, partitionId]} in BE.
*/
- static final byte[] PARTITION_CONF_PREFIX = {1};
+ public static final byte[] PARTITION_CONF_PREFIX = {1};
/**
- * Prefix to store next row id to build in index. Key format is {@code
[prefix, indexId, partitionId]} in BE.
+ * Prefix to store next row id to build in index. Key format is {@code
[prefix, tableId, indexId, partitionId]} in BE.
*/
- private static final byte[] INDEX_ROW_ID_PREFIX = {2};
+ public static final byte[] INDEX_ROW_ID_PREFIX = {2};
private final ColumnFamily metaColumnFamily;
@@ -73,9 +73,9 @@ public class RocksDbMetaStorage {
* @param indexId Index ID.
* @param partitionId Partition ID.
*/
- public @Nullable RowId getNextRowIdToBuild(int indexId, int partitionId) {
+ public @Nullable RowId getNextRowIdToBuild(int tableId, int indexId, int
partitionId) {
try {
- byte[] lastBuiltRowIdBytes =
metaColumnFamily.get(createKey(INDEX_ROW_ID_PREFIX, indexId, partitionId));
+ byte[] lastBuiltRowIdBytes =
metaColumnFamily.get(createKey(INDEX_ROW_ID_PREFIX, tableId, indexId,
partitionId));
if (lastBuiltRowIdBytes == null) {
return initialRowIdToBuild(partitionId);
@@ -103,9 +103,11 @@ public class RocksDbMetaStorage {
* @param indexId Index ID.
* @param rowId Row ID.
*/
- public void putNextRowIdToBuild(AbstractWriteBatch writeBatch, int
indexId, int partitionId, @Nullable RowId rowId) {
+ public void putNextRowIdToBuild(AbstractWriteBatch writeBatch, int
tableId, int indexId, int partitionId, @Nullable RowId rowId) {
try {
- writeBatch.put(metaColumnFamily.handle(),
createKey(INDEX_ROW_ID_PREFIX, indexId, partitionId),
indexLastBuildRowId(rowId));
+ byte[] key = createKey(INDEX_ROW_ID_PREFIX, tableId, indexId,
partitionId);
+
+ writeBatch.put(metaColumnFamily.handle(), key,
indexLastBuildRowId(rowId));
} catch (RocksDBException e) {
throw new StorageException(
"Failed to save next row ID to build: [partitionId={},
indexId={}, rowId={}]",
@@ -118,9 +120,9 @@ public class RocksDbMetaStorage {
/**
* Removes the "next row ID to build" information for the given
partition's index.
*/
- public void removeNextRowIdToBuild(AbstractWriteBatch writeBatch, int
indexId, int partitionId) {
+ public void removeNextRowIdToBuild(AbstractWriteBatch writeBatch, int
tableId, int indexId, int partitionId) {
try {
- writeBatch.delete(metaColumnFamily.handle(),
createKey(INDEX_ROW_ID_PREFIX, indexId, partitionId));
+ writeBatch.delete(metaColumnFamily.handle(),
createKey(INDEX_ROW_ID_PREFIX, tableId, indexId, partitionId));
} catch (RocksDBException e) {
throw new StorageException(
"Failed to remove next row ID to build: [partitionId={},
indexId={}]",
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index b2eb6c9e36..134262982b 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.storage.rocksdb;
-import static
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.nio.file.Path;
@@ -28,16 +27,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
import
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
-import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionConfiguration;
import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataRegionView;
import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
import
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance;
@@ -55,6 +53,22 @@ public class RocksDbStorageEngine implements StorageEngine {
private static final IgniteLogger LOG =
Loggers.forClass(RocksDbStorageEngine.class);
+ private static class RocksDbStorage implements ManuallyCloseable {
+ final RocksDbDataRegion dataRegion;
+
+ final SharedRocksDbInstance rocksDbInstance;
+
+ RocksDbStorage(RocksDbDataRegion dataRegion, SharedRocksDbInstance
rocksDbInstance) {
+ this.dataRegion = dataRegion;
+ this.rocksDbInstance = rocksDbInstance;
+ }
+
+ @Override
+ public void close() throws Exception {
+ IgniteUtils.closeAllManually(rocksDbInstance::stop,
dataRegion::stop);
+ }
+ }
+
static {
RocksDB.loadLibrary();
}
@@ -67,14 +81,12 @@ public class RocksDbStorageEngine implements StorageEngine {
private final ScheduledExecutorService scheduledPool;
- private final Map<String, RocksDbDataRegion> regions = new
ConcurrentHashMap<>();
-
/**
- * Mapping from the data region name to the shared RocksDB instance. Map
is filled lazily.
- * Most likely, the association of shared instances with regions will be
removed/revisited in the future.
+ * Mapping from the data region name to the shared RocksDB instance. Most
likely, the association of shared
+ * instances with regions will be removed/revisited in the future.
*/
// TODO IGNITE-19762 Think of proper way to use regions and storages.
- private final Map<String, SharedRocksDbInstance> sharedInstances = new
ConcurrentHashMap<>();
+ private final Map<String, RocksDbStorage> storageByRegionName = new
ConcurrentHashMap<>();
/**
* Constructor.
@@ -125,40 +137,49 @@ public class RocksDbStorageEngine implements
StorageEngine {
@Override
public void start() throws StorageException {
- registerDataRegion(DEFAULT_DATA_REGION_NAME);
+ registerDataRegion(engineConfig.defaultRegion().value());
// TODO: IGNITE-17066 Add handling deleting/updating data regions
configuration
engineConfig.regions().listenElements(new
ConfigurationNamedListListener<>() {
@Override
public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<RocksDbDataRegionView> ctx) {
- registerDataRegion(ctx.newName(RocksDbDataRegionView.class));
+ RocksDbDataRegionView newValue = ctx.newValue();
+
+ assert newValue != null;
+
+ registerDataRegion(newValue);
return nullCompletedFuture();
}
});
}
- private void registerDataRegion(String name) {
- RocksDbDataRegionConfiguration dataRegionConfig =
DEFAULT_DATA_REGION_NAME.equals(name)
- ? engineConfig.defaultRegion()
- : engineConfig.regions().get(name);
+ private void registerDataRegion(RocksDbDataRegionView dataRegionView) {
+ String regionName = dataRegionView.name();
- var region = new RocksDbDataRegion(dataRegionConfig);
+ var region = new RocksDbDataRegion(dataRegionView);
region.start();
- RocksDbDataRegion previousRegion =
regions.put(dataRegionConfig.name().value(), region);
+ SharedRocksDbInstance rocksDbInstance = newRocksDbInstance(regionName,
region);
- assert previousRegion == null : dataRegionConfig.name().value();
+ RocksDbStorage previousStorage = storageByRegionName.put(regionName,
new RocksDbStorage(region, rocksDbInstance));
+
+ assert previousStorage == null : regionName;
+ }
+
+ private SharedRocksDbInstance newRocksDbInstance(String regionName,
RocksDbDataRegion region) {
+ try {
+ return new SharedRocksDbInstanceCreator().create(this, region,
storagePath.resolve("rocksdb-" + regionName));
+ } catch (Exception e) {
+ throw new StorageException("Failed to create new RocksDB
instance", e);
+ }
}
@Override
public void stop() throws StorageException {
try {
- IgniteUtils.closeAll(Stream.concat(
- regions.values().stream().map(region -> region::stop),
- sharedInstances.values().stream().map(instance ->
instance::stop)
- ));
+ IgniteUtils.closeAllManually(storageByRegionName.values());
} catch (Exception e) {
throw new StorageException("Error when stopping the rocksdb
engine", e);
}
@@ -177,34 +198,24 @@ public class RocksDbStorageEngine implements
StorageEngine {
StorageTableDescriptor tableDescriptor,
StorageIndexDescriptorSupplier indexDescriptorSupplier
) throws StorageException {
- RocksDbDataRegion dataRegion =
regions.get(tableDescriptor.getDataRegion());
+ String regionName = tableDescriptor.getDataRegion();
- int tableId = tableDescriptor.getId();
+ RocksDbStorage storage = storageByRegionName.get(regionName);
- assert dataRegion != null : "tableId=" + tableId + ", dataRegion=" +
tableDescriptor.getDataRegion();
+ assert storage != null :
+ String.format("RocksDB instance has not yet been created for
[tableId=%d, region=%s]", tableDescriptor.getId(), regionName);
- SharedRocksDbInstance sharedInstance =
sharedInstances.computeIfAbsent(tableDescriptor.getDataRegion(), name -> {
- try {
- return new SharedRocksDbInstanceCreator().create(
- this,
- dataRegion,
- storagePath.resolve("rocksdb-" + name)
- );
- } catch (Exception e) {
- throw new StorageException("Failed to create new RocksDB data
region", e);
- }
- });
+ var tableStorage = new RocksDbTableStorage(storage.rocksDbInstance,
tableDescriptor, indexDescriptorSupplier);
- var storage = new RocksDbTableStorage(sharedInstance, tableDescriptor,
indexDescriptorSupplier);
+ tableStorage.start();
- storage.start();
-
- return storage;
+ return tableStorage;
}
@Override
- // TODO: IGNITE-21760 Implement
public void dropMvTable(int tableId) {
- throw new
UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-21760");
+ for (RocksDbStorage rocksDbStorage : storageByRegionName.values()) {
+ rocksDbStorage.rocksDbInstance.destroyTable(tableId);
+ }
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index b0aa9de923..3f7cd6ac93 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -18,13 +18,8 @@
package org.apache.ignite.internal.storage.rocksdb;
import static java.util.stream.Collectors.toList;
-import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX;
-import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX;
-import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.createKey;
import static
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.DFLT_WRITE_OPTS;
-import static
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance.deleteByPrefix;
import static
org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage;
-import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -179,7 +174,7 @@ public class RocksDbTableStorage implements MvTableStorage {
}
if (destroy) {
- destroyTableData();
+ rocksDb.destroyTable(getTableId());
}
});
}
@@ -199,28 +194,6 @@ public class RocksDbTableStorage implements MvTableStorage
{
return stop(true);
}
- private void destroyTableData() {
- try (WriteBatch writeBatch = new WriteBatch()) {
- int tableId = getTableId();
-
- byte[] tablePrefix = createKey(BYTE_EMPTY_ARRAY, tableId);
-
- deleteByPrefix(writeBatch, rocksDb.partitionCf, tablePrefix);
- deleteByPrefix(writeBatch, rocksDb.gcQueueCf, tablePrefix);
-
- indexes.destroyAllIndexes(writeBatch);
-
- deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(),
createKey(PARTITION_META_PREFIX, tableId));
- deleteByPrefix(writeBatch, rocksDb.meta.columnFamily(),
createKey(PARTITION_CONF_PREFIX, tableId));
-
- rocksDb.db.write(DFLT_WRITE_OPTS, writeBatch);
-
- indexes.scheduleAllIndexCfDestroy();
- } catch (RocksDBException e) {
- throw new StorageException("Failed to destroy table data.
[tableId={}]", e, getTableId());
- }
- }
-
@Override
public CompletableFuture<MvPartitionStorage> createMvPartition(int
partitionId) throws StorageException {
return inBusyLock(busyLock, () ->
mvPartitionStorages.create(partitionId, partId -> {
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
index c54d2914e4..c683d971c3 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/SortedIndex.java
@@ -30,12 +30,9 @@ import
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstance
class SortedIndex extends Index<RocksDbSortedIndexStorage> {
private final StorageSortedIndexDescriptor descriptor;
- private final SharedRocksDbInstance rocksDb;
-
private final RocksDbMetaStorage indexMetaStorage;
private SortedIndex(
- SharedRocksDbInstance rocksDb,
int tableId,
ColumnFamily indexCf,
StorageSortedIndexDescriptor descriptor,
@@ -43,7 +40,6 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> {
) {
super(tableId, descriptor.id(), indexCf);
- this.rocksDb = rocksDb;
this.descriptor = descriptor;
this.indexMetaStorage = indexMetaStorage;
}
@@ -56,29 +52,20 @@ class SortedIndex extends Index<RocksDbSortedIndexStorage> {
) {
ColumnFamily indexCf =
rocksDb.getOrCreateSortedIndexCf(sortedIndexCfName(descriptor.columns()),
descriptor.id(), tableId);
- return new SortedIndex(rocksDb, tableId, indexCf, descriptor,
indexMetaStorage);
+ return new SortedIndex(tableId, indexCf, descriptor, indexMetaStorage);
}
static SortedIndex restoreExisting(
- SharedRocksDbInstance rocksDb,
int tableId,
ColumnFamily indexCf,
StorageSortedIndexDescriptor descriptor,
RocksDbMetaStorage indexMetaStorage
) {
- return new SortedIndex(rocksDb, tableId, indexCf, descriptor,
indexMetaStorage);
+ return new SortedIndex(tableId, indexCf, descriptor, indexMetaStorage);
}
@Override
RocksDbSortedIndexStorage createStorage(int partitionId) {
- return new RocksDbSortedIndexStorage(descriptor, tableId, partitionId,
indexCf, indexMetaStorage);
- }
-
- /**
- * Signals the shared RocksDB instance that this index has been destroyed
and all shared resources (like the Column Family) can be
- * de-allocated.
- */
- void destroySortedIndexCfIfNeeded() {
- rocksDb.destroySortedIndexCfIfNeeded(indexCf.nameBytes(), indexId);
+ return new RocksDbSortedIndexStorage(descriptor, tableId, partitionId,
indexColumnFamily().columnFamily(), indexMetaStorage);
}
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
index 11943fde99..deff59acaa 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
@@ -61,6 +61,8 @@ public abstract class AbstractRocksDbIndexStorage implements
IndexStorage {
/** Common prefix for keys in all index storages, containing IDs of
different entities. */
public static final int PREFIX_WITH_IDS_LENGTH = TABLE_ID_SIZE +
INDEX_ID_SIZE + PARTITION_ID_SIZE;
+ private final int tableId;
+
protected final int indexId;
protected final int partitionId;
@@ -76,12 +78,13 @@ public abstract class AbstractRocksDbIndexStorage
implements IndexStorage {
/** Row ID for which the index needs to be built, {@code null} means that
the index building has completed. */
private volatile @Nullable RowId nextRowIdToBuild;
- AbstractRocksDbIndexStorage(int indexId, int partitionId,
RocksDbMetaStorage indexMetaStorage) {
+ AbstractRocksDbIndexStorage(int tableId, int indexId, int partitionId,
RocksDbMetaStorage indexMetaStorage) {
+ this.tableId = tableId;
this.indexId = indexId;
this.indexMetaStorage = indexMetaStorage;
this.partitionId = partitionId;
- nextRowIdToBuild = indexMetaStorage.getNextRowIdToBuild(indexId,
partitionId);
+ nextRowIdToBuild = indexMetaStorage.getNextRowIdToBuild(tableId,
indexId, partitionId);
}
@Override
@@ -100,7 +103,7 @@ public abstract class AbstractRocksDbIndexStorage
implements IndexStorage {
WriteBatchWithIndex writeBatch =
PartitionDataHelper.requireWriteBatch();
- indexMetaStorage.putNextRowIdToBuild(writeBatch, indexId,
partitionId, rowId);
+ indexMetaStorage.putNextRowIdToBuild(writeBatch, tableId, indexId,
partitionId, rowId);
nextRowIdToBuild = rowId;
@@ -229,7 +232,7 @@ public abstract class AbstractRocksDbIndexStorage
implements IndexStorage {
public final void destroyData(WriteBatch writeBatch) throws
RocksDBException {
clearIndex(writeBatch);
- indexMetaStorage.removeNextRowIdToBuild(writeBatch, indexId,
partitionId);
+ indexMetaStorage.removeNextRowIdToBuild(writeBatch, tableId, indexId,
partitionId);
nextRowIdToBuild = initialRowIdToBuild(partitionId);
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index cdcf971341..1e18c03d3d 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -82,7 +82,7 @@ public class RocksDbHashIndexStorage extends
AbstractRocksDbIndexStorage impleme
ColumnFamily indexCf,
RocksDbMetaStorage indexMetaStorage
) {
- super(descriptor.id(), partitionId, indexMetaStorage);
+ super(tableId, descriptor.id(), partitionId, indexMetaStorage);
this.descriptor = descriptor;
this.indexCf = indexCf;
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index 271702816f..b0b4c7316d 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -82,7 +82,7 @@ public class RocksDbSortedIndexStorage extends
AbstractRocksDbIndexStorage imple
ColumnFamily indexCf,
RocksDbMetaStorage indexMetaStorage
) {
- super(descriptor.id(), partitionId, indexMetaStorage);
+ super(tableId, descriptor.id(), partitionId, indexMetaStorage);
this.descriptor = descriptor;
this.indexCf = indexCf;
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexColumnFamily.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexColumnFamily.java
new file mode 100644
index 0000000000..c7c0c85256
--- /dev/null
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/IndexColumnFamily.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.instance;
+
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+
+/** A container class for an index ID and its Column Family instance. */
+public class IndexColumnFamily {
+ private final int indexId;
+
+ private final ColumnFamily columnFamily;
+
+ public IndexColumnFamily(int indexId, ColumnFamily columnFamily) {
+ this.indexId = indexId;
+ this.columnFamily = columnFamily;
+ }
+
+ public int indexId() {
+ return indexId;
+ }
+
+ public ColumnFamily columnFamily() {
+ return columnFamily;
+ }
+}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
index 0132462a1c..552646b01c 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
@@ -20,9 +20,14 @@ package org.apache.ignite.internal.storage.rocksdb.instance;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.toStringName;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.INDEX_ROW_ID_PREFIX;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_CONF_PREFIX;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage.PARTITION_META_PREFIX;
+import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
import static
org.apache.ignite.internal.storage.rocksdb.instance.SharedRocksDbInstanceCreator.sortedIndexCfOptions;
import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
+import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
@@ -30,6 +35,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,7 +51,6 @@ import
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.rocksdb.ColumnFamilyDescriptor;
-import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -241,13 +246,13 @@ public final class SharedRocksDbInstance {
/**
* Returns an "index ID - Column Family" mapping for all sorted indexes
that currently exist in the storage.
*/
- public Map<Integer, ColumnFamily> sortedIndexes(int targetTableId) {
- var result = new HashMap<Integer, ColumnFamily>();
+ public List<IndexColumnFamily> sortedIndexes(int targetTableId) {
+ var result = new ArrayList<IndexColumnFamily>();
for (SortedIndexColumnFamily indexCf : sortedIndexCfsByName.values()) {
indexCf.indexIdToTableId.forEach((indexId, tableId) -> {
if (tableId == targetTableId) {
- result.put(indexId, indexCf.columnFamily);
+ result.add(new IndexColumnFamily(indexId,
indexCf.columnFamily));
}
});
}
@@ -282,16 +287,25 @@ public final class SharedRocksDbInstance {
}
/**
- * Possibly drops the column family after destroying the index.
+ * Schedules a drop of a column family after destroying an index, if it
was the last index managed by that CF.
*/
- public void destroySortedIndexCfIfNeeded(byte[] cfName, int indexId) {
+ public CompletableFuture<Void>
scheduleIndexCfsDestroy(List<IndexColumnFamily> indexColumnFamilies) {
+ assert !indexColumnFamilies.isEmpty();
+
+ return flusher.awaitFlush(false)
+ .thenRunAsync(() ->
indexColumnFamilies.forEach(this::destroySortedIndexCfIfNeeded),
engine.threadPool());
+ }
+
+ void destroySortedIndexCfIfNeeded(IndexColumnFamily indexColumnFamily) {
if (!busyLock.enterBusy()) {
throw new StorageClosedException();
}
+ var cfNameBytes = new
ByteArray(indexColumnFamily.columnFamily().nameBytes());
+
try {
- sortedIndexCfsByName.computeIfPresent(new ByteArray(cfName),
(unused, indexCf) -> {
- indexCf.indexIdToTableId.remove(indexId);
+ sortedIndexCfsByName.computeIfPresent(cfNameBytes, (unused,
indexCf) -> {
+ indexCf.indexIdToTableId.remove(indexColumnFamily.indexId());
if (!indexCf.indexIdToTableId.isEmpty()) {
return indexCf;
@@ -306,6 +320,48 @@ public final class SharedRocksDbInstance {
}
}
+ /**
+ * Removes all data associated with the given table ID in this storage.
+ */
+ public void destroyTable(int tableId) {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ byte[] tableIdBytes = ByteBuffer.allocate(Integer.BYTES)
+ .order(KEY_BYTE_ORDER)
+ .putInt(tableId)
+ .array();
+
+ deleteByPrefix(writeBatch, partitionCf, tableIdBytes);
+ deleteByPrefix(writeBatch, gcQueueCf, tableIdBytes);
+ deleteByPrefix(writeBatch, hashIndexCf, tableIdBytes);
+
+ List<IndexColumnFamily> sortedIndexCfs = sortedIndexes(tableId);
+
+ for (IndexColumnFamily indexColumnFamily : sortedIndexCfs) {
+ deleteByPrefix(writeBatch, indexColumnFamily.columnFamily(),
tableIdBytes);
+ }
+
+ deleteByPrefix(writeBatch, meta.columnFamily(),
metaPrefix(PARTITION_META_PREFIX, tableIdBytes));
+ deleteByPrefix(writeBatch, meta.columnFamily(),
metaPrefix(PARTITION_CONF_PREFIX, tableIdBytes));
+ deleteByPrefix(writeBatch, meta.columnFamily(),
metaPrefix(INDEX_ROW_ID_PREFIX, tableIdBytes));
+
+ db.write(DFLT_WRITE_OPTS, writeBatch);
+
+ if (!sortedIndexCfs.isEmpty()) {
+ scheduleIndexCfsDestroy(sortedIndexCfs);
+ }
+ } catch (RocksDBException e) {
+ throw new StorageException("Failed to destroy table data.
[tableId={}]", e, tableId);
+ }
+ }
+
+ private static byte[] metaPrefix(byte[] metaPrefix, byte[] tableIdBytes) {
+ return ByteBuffer.allocate(metaPrefix.length + tableIdBytes.length)
+ .order(KEY_BYTE_ORDER)
+ .put(metaPrefix)
+ .put(tableIdBytes)
+ .array();
+ }
+
private ColumnFamily createSortedIndexCf(byte[] cfName) {
ColumnFamilyDescriptor cfDescriptor = new
ColumnFamilyDescriptor(cfName, sortedIndexCfOptions(cfName));
@@ -313,7 +369,7 @@ public final class SharedRocksDbInstance {
try {
columnFamily = ColumnFamily.create(db, cfDescriptor);
} catch (RocksDBException e) {
- throw new StorageException("Failed to create new RocksDB column
family: " + toStringName(cfDescriptor.getName()), e);
+ throw new StorageException("Failed to create new RocksDB column
family: " + toStringName(cfName), e);
}
flusher.addColumnFamily(columnFamily.handle());
@@ -322,9 +378,7 @@ public final class SharedRocksDbInstance {
}
private void destroyColumnFamily(ColumnFamily columnFamily) {
- ColumnFamilyHandle columnFamilyHandle = columnFamily.handle();
-
- flusher.removeColumnFamily(columnFamilyHandle);
+ flusher.removeColumnFamily(columnFamily.handle());
try {
columnFamily.destroy();
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
index c376e6d326..f908c5d121 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
@@ -25,7 +25,6 @@ import
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith;
/**
@@ -47,10 +46,4 @@ public class RocksDbStorageEngineTest extends
AbstractStorageEngineTest {
workDir
);
}
-
- @Override
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-21760")
- protected void testDropMvTableOnRecovery() throws Exception {
- super.testDropMvTableOnRecovery();
- }
}
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
index e0c85f22ec..fcd8b23454 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstanceTest.java
@@ -17,15 +17,14 @@
package org.apache.ignite.internal.storage.rocksdb.instance;
+import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
@@ -34,7 +33,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.Map;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
@@ -68,7 +66,7 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest {
engine.start();
- dataRegion = new RocksDbDataRegion(engineConfig.defaultRegion());
+ dataRegion = new
RocksDbDataRegion(engineConfig.defaultRegion().value());
dataRegion.start();
@@ -118,19 +116,19 @@ class SharedRocksDbInstanceTest extends
IgniteAbstractTest {
assertThat(foo, is(not(sameInstance(bar))));
assertThat(quux, is((sameInstance(baz))));
- rocksDb.destroySortedIndexCfIfNeeded(fooName, 1);
+ rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(1, foo));
assertTrue(cfExists(fooName));
- rocksDb.destroySortedIndexCfIfNeeded(barName, 2);
+ rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(2, bar));
assertFalse(cfExists(barName));
- rocksDb.destroySortedIndexCfIfNeeded(bazName, 3);
+ rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(3, baz));
assertTrue(cfExists(fooName));
- rocksDb.destroySortedIndexCfIfNeeded(quuxName, 4);
+ rocksDb.destroySortedIndexCfIfNeeded(new IndexColumnFamily(4, quux));
assertFalse(cfExists(fooName));
}
@@ -161,8 +159,25 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest
{
// Same index CF, different table.
ColumnFamily quux = rocksDb.getOrCreateSortedIndexCf(quuxName, 4, 1);
- assertThat(rocksDb.sortedIndexes(0), is(Map.of(1, foo, 2, bar, 3,
baz)));
- assertThat(rocksDb.sortedIndexes(1), is(Map.of(4, quux)));
+ assertThat(
+
rocksDb.sortedIndexes(0).stream().map(IndexColumnFamily::indexId).collect(toList()),
+ containsInAnyOrder(1, 2, 3)
+ );
+
+ assertThat(
+
rocksDb.sortedIndexes(0).stream().map(IndexColumnFamily::columnFamily).collect(toList()),
+ containsInAnyOrder(foo, bar, baz)
+ );
+
+ assertThat(
+
rocksDb.sortedIndexes(1).stream().map(IndexColumnFamily::indexId).collect(toList()),
+ containsInAnyOrder(4)
+ );
+
+ assertThat(
+
rocksDb.sortedIndexes(1).stream().map(IndexColumnFamily::columnFamily).collect(toList()),
+ containsInAnyOrder(quux)
+ );
// Put some data in the CF. We then check that the non-empty CF is
restored upon DB restart but the empty one is dropped.
byte[] key = ByteBuffer.allocate(Integer.BYTES * 2)
@@ -177,8 +192,8 @@ class SharedRocksDbInstanceTest extends IgniteAbstractTest {
rocksDb = createDb();
- assertThat(rocksDb.sortedIndexes(0), allOf(hasKey(1), not(hasKey(2)),
not(hasKey(3))));
- assertThat(rocksDb.sortedIndexes(1), is(anEmptyMap()));
+
assertThat(rocksDb.sortedIndexes(0).stream().map(IndexColumnFamily::indexId).collect(toList()),
contains(1));
+ assertThat(rocksDb.sortedIndexes(1), is(empty()));
assertTrue(cfExists(fooName));
assertFalse(cfExists(barName));