This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 84fd894ff72 IGNITE-25886 Implement method to get on-disk tables/zones
IDs in tx state storage (#6238)
84fd894ff72 is described below
commit 84fd894ff72c0850590654dc0abfbe5b0c21636d
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Jul 15 14:01:03 2025 +0400
IGNITE-25886 Implement method to get on-disk tables/zones IDs in tx state
storage (#6238)
---
.../rebalance/ItRebalanceDistributedTest.java | 4 +-
.../replicator/ItReplicaLifecycleTest.java | 4 +-
.../partition/replicator/ZoneResourcesManager.java | 2 +-
.../internal/storage/engine/StorageEngine.java | 11 +-
.../VolatilePageMemoryStorageEngine.java | 7 ++
.../rocksdb/instance/SharedRocksDbInstance.java | 3 +-
.../internal/table/distributed/TableManager.java | 2 +-
.../distributed/storage/BrokenTxStateStorage.java | 2 +-
modules/transactions/build.gradle | 1 +
.../state/ThreadAssertingTxStateStorage.java | 4 +-
.../tx/storage/state/TxStatePartitionStorage.java | 3 +
.../internal/tx/storage/state/TxStateStorage.java | 10 +-
.../TxStateMetaRocksDbPartitionStorage.java | 49 +++++---
.../rocksdb/TxStateRocksDbPartitionStorage.java | 12 +-
.../state/rocksdb/TxStateRocksDbSharedStorage.java | 66 +++++++++--
.../state/rocksdb/TxStateRocksDbStorage.java | 6 +-
.../RocksDbTxStatePartitionStorageTest.java | 4 +-
...ageTest.java => RocksDbTxStateStorageTest.java} | 44 +-------
.../rocksdb/TxStateRocksDbSharedStorageTest.java | 123 +++++++++++++++++++++
.../test/TestTxStatePartitionStorageTest.java | 4 +-
...torageTest.java => TestTxStateStorageTest.java} | 6 +-
.../state/AbstractTxStatePartitionStorageTest.java | 6 +-
.../storage/state/AbstractTxStateStorageTest.java | 117 ++++++++++++++++++++
.../tx/storage/state/test/TestTxStateStorage.java | 2 +-
24 files changed, 394 insertions(+), 98 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index a6b17cf0b21..5bb0f6596ce 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1882,7 +1882,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
verify(internalTable.storage(),
timeout(AWAIT_TIMEOUT_MILLIS).atLeast(1))
.destroyPartition(partitionId);
verify(internalTable.txStateStorage(),
timeout(AWAIT_TIMEOUT_MILLIS).atLeast(1))
- .destroyTxStateStorage(partitionId);
+ .destroyPartitionStorage(partitionId);
}
}
@@ -1895,7 +1895,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
doAnswer(answer -> CompletableFuture.failedFuture(new
IgniteInternalException("From test")))
.when(internalTable.txStateStorage())
- .destroyTxStateStorage(partitionId);
+ .destroyPartitionStorage(partitionId);
}
private void prepareFinishHandleChangeStableAssignmentEventFuture(Node
node, String tableName, int partitionId) {
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 63ea22704f2..506dc816ee3 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -820,7 +820,7 @@ public class ItReplicaLifecycleTest extends
ItAbstractColocationTest {
verify(internalTable.storage(), never())
.destroyPartition(partitionId);
verify(internalTable.txStateStorage(), never())
- .destroyTxStateStorage(partitionId);
+ .destroyPartitionStorage(partitionId);
}
private static void checkDestroyPartitionStoragesInvokes(Node node, String
tableName, int partitionId) {
@@ -829,7 +829,7 @@ public class ItReplicaLifecycleTest extends
ItAbstractColocationTest {
verify(internalTable.storage(),
timeout(AWAIT_TIMEOUT_MILLIS).atLeast(1))
.destroyPartition(partitionId);
verify(internalTable.txStateStorage(), never())
- .destroyTxStateStorage(partitionId);
+ .destroyPartitionStorage(partitionId);
}
@Test
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index ebc2e4bb951..e2298bba0e0 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -177,7 +177,7 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
if (resources != null) {
resources.resourcesByPartitionId.remove(zonePartitionId.partitionId());
-
resources.txStateStorage.destroyTxStateStorage(zonePartitionId.partitionId());
+
resources.txStateStorage.destroyPartitionStorage(zonePartitionId.partitionId());
}
});
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
index 90b693885bf..d8b81334021 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.storage.engine;
-import static java.util.Collections.emptySet;
import static org.apache.ignite.internal.util.Constants.MiB;
import static
org.apache.ignite.internal.util.IgniteUtils.getTotalMemoryAvailable;
@@ -64,8 +63,10 @@ public interface StorageEngine {
MvTableStorage createMvTable(StorageTableDescriptor tableDescriptor,
StorageIndexDescriptorSupplier indexDescriptorSupplier);
/**
- * Destroys the table if it exists. The destruction is not guaranteed to
be durable (that is, if a node stops/crashes before
- * persisting this change to disk, the table storage might still be there
after node restart).
+ * Destroys the table if it exists.
+ *
+ * <p>The destruction is not guaranteed to be durable (that is, if a node
stops/crashes before persisting this change to disk,
+ * the table storage might still be there after node restart).
*
* @param tableId Table ID.
* @throws StorageException If an error has occurs while dropping the
table.
@@ -86,7 +87,5 @@ public interface StorageEngine {
* Returns IDs of tables for which there are MV partition storages on
disk. Those were created and flushed to disk; either
* destruction was not started for them, or it failed.
*/
- default Set<Integer> tableIdsOnDisk() {
- return emptySet();
- }
+ Set<Integer> tableIdsOnDisk();
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
index 4bbe33cc7ea..26f587b5299 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.storage.pagememory;
+import static java.util.Collections.emptySet;
import static
org.apache.ignite.internal.storage.configurations.StorageProfileConfigurationSchema.UNSPECIFIED_SIZE;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -191,6 +193,11 @@ public class VolatilePageMemoryStorageEngine extends
AbstractPageMemoryStorageEn
// No-op.
}
+ @Override
+ public Set<Integer> tableIdsOnDisk() {
+ return emptySet();
+ }
+
/**
* Creates, starts and adds a new data region to the engine.
*/
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
index 384d7a6915a..5f4481a8636 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/instance/SharedRocksDbInstance.java
@@ -47,7 +47,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.lang.ByteArray;
-import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite.internal.storage.StorageClosedException;
@@ -474,7 +473,7 @@ public final class SharedRocksDbInstance {
// the iteration space.
it.status();
} catch (RocksDBException e) {
- throw new IgniteInternalException(INTERNAL_ERR, "Cannot get table
IDs", e);
+ throw new StorageException(INTERNAL_ERR, "Cannot get table IDs",
e);
}
return Set.copyOf(tableIds);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 279aa1104bd..de4e67e5c50 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -3023,7 +3023,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
if (!nodeProperties.colocationEnabled()) {
if
(internalTable.txStateStorage().getPartitionStorage(partitionId) != null) {
- destroyFutures.add(runAsync(() ->
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor));
+ destroyFutures.add(runAsync(() ->
internalTable.txStateStorage().destroyPartitionStorage(partitionId),
ioExecutor));
}
destroyFutures.add(runAsync(() ->
destroyReplicationProtocolStorages(tablePartitionId, table), ioExecutor));
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/BrokenTxStateStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/BrokenTxStateStorage.java
index 0c78d07aea5..731059c7851 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/BrokenTxStateStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/BrokenTxStateStorage.java
@@ -43,7 +43,7 @@ public class BrokenTxStateStorage implements TxStateStorage {
}
@Override
- public void destroyTxStateStorage(int partitionId) {
+ public void destroyPartitionStorage(int partitionId) {
// No-op.
}
diff --git a/modules/transactions/build.gradle
b/modules/transactions/build.gradle
index 2fbf15f8080..2b7e830734a 100644
--- a/modules/transactions/build.gradle
+++ b/modules/transactions/build.gradle
@@ -57,6 +57,7 @@ dependencies {
testImplementation testFixtures(project(':ignite-storage-api'))
testImplementation testFixtures(project(':ignite-placement-driver-api'))
testImplementation testFixtures(project(':ignite-low-watermark'))
+ testImplementation testFixtures(project(':ignite-transactions'))
integrationTestImplementation project(':ignite-api')
integrationTestImplementation project(':ignite-cluster-management')
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java
index 03b4a58b3a6..103510931c5 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java
@@ -63,10 +63,10 @@ public class ThreadAssertingTxStateStorage implements
TxStateStorage {
}
@Override
- public void destroyTxStateStorage(int partitionId) {
+ public void destroyPartitionStorage(int partitionId) {
assertThreadAllowsToWrite();
- wrappedStorage.destroyTxStateStorage(partitionId);
+ wrappedStorage.destroyPartitionStorage(partitionId);
}
@Override
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStatePartitionStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStatePartitionStorage.java
index 6f62a8e517e..71ffdea8b82 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStatePartitionStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStatePartitionStorage.java
@@ -141,6 +141,9 @@ public interface TxStatePartitionStorage extends
ManuallyCloseable {
/**
* Closes and removes all data from the storage.
*
+ * <p>The destruction is not guaranteed to be durable (that is, if a node
stops/crashes before persisting this change to disk,
+ * the storage might still be there after node restart).
+ *
* <p>REQUIRED: For background tasks for storage, such as rebalancing, to
be completed by the time the method is called.
*/
void destroy();
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
index 3b64098a431..c513b8f7441 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
@@ -50,12 +50,15 @@ public interface TxStateStorage extends ManuallyCloseable {
TxStatePartitionStorage getPartitionStorage(int partitionId);
/**
- * Destroys transaction state storage.
+ * Destroys transaction state storage for a partition.
+ *
+ * <p>The destruction is not guaranteed to be durable (that is, if a node
stops/crashes before persisting this change to disk,
+ * the storage might still be there after node restart).
*
* @param partitionId Partition id.
* @throws IgniteInternalException In case when the operation has failed.
*/
- void destroyTxStateStorage(int partitionId);
+ void destroyPartitionStorage(int partitionId);
/**
* Starts the storage.
@@ -73,6 +76,9 @@ public interface TxStateStorage extends ManuallyCloseable {
/**
* Removes all data from the storage and frees all resources.
*
+ * <p>The destruction is not guaranteed to be durable (that is, if a node
stops/crashes before persisting this change to disk,
+ * the storage might still be there after node restart).
+ *
* @throws IgniteInternalException In case when the operation has failed.
*/
void destroy();
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorage.java
index 12ab2960cfc..13614760e93 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateMetaRocksDbPartitionStorage.java
@@ -17,15 +17,18 @@
package org.apache.ignite.internal.tx.storage.state.rocksdb;
-import static
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage.TABLE_OR_ZONE_PREFIX_SIZE_BYTES;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+import static
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage.BYTE_ORDER;
+import static
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage.TABLE_OR_ZONE_ID_SIZE_BYTES;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
+import java.util.List;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.storage.lease.LeaseInfoSerializer;
import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
@@ -33,13 +36,16 @@ import org.rocksdb.WriteBatch;
* A wrapper around a RocksDB column family to store TX storage meta
information.
*/
class TxStateMetaRocksDbPartitionStorage {
- /** Key length for the payload. Consists of a 1-byte prefix, tableId (4
bytes) and partitionId (2 bytes), in Big Endian. */
- private static final int KEY_SIZE_BYTES = TABLE_OR_ZONE_PREFIX_SIZE_BYTES
+ Short.BYTES + 1;
+ /** Prefix of the meta keys including table/zone ID. Consists of a 1-byte
prefix, and tableId/zoneId (4 bytes), in Big Endian. */
+ static final int TABLE_OR_ZONE_PREFIX_SIZE_BYTES = 1 +
TABLE_OR_ZONE_ID_SIZE_BYTES;
+
+ /** Key length for the payload. Consists of a 1-byte prefix,
tableId/zoneId (4 bytes) and partitionId (2 bytes), in Big Endian. */
+ private static final int KEY_SIZE_BYTES = TABLE_OR_ZONE_PREFIX_SIZE_BYTES
+ Short.BYTES;
/**
* Prefix to store meta information, such as last applied index and term.
*/
- private static final byte LAST_APPLIED_PREFIX = 0;
+ static final byte LAST_APPLIED_PREFIX = 0;
/**
* Prefix to last committed replication group configuration.
@@ -58,7 +64,7 @@ class TxStateMetaRocksDbPartitionStorage {
private final ColumnFamily columnFamily;
- private final int tableId;
+ private final int tableOrZoneId;
private final int partitionId;
@@ -79,10 +85,10 @@ class TxStateMetaRocksDbPartitionStorage {
@Nullable
private volatile LeaseInfo leaseInfo;
- TxStateMetaRocksDbPartitionStorage(ColumnFamily columnFamily, int tableId,
int partitionId) {
+ TxStateMetaRocksDbPartitionStorage(ColumnFamily columnFamily, int
tableOrZoneId, int partitionId) {
this.columnFamily = columnFamily;
this.partitionId = partitionId;
- this.tableId = tableId;
+ this.tableOrZoneId = tableOrZoneId;
lastAppliedKey = createKey(LAST_APPLIED_PREFIX);
confKey = createKey(CONF_PREFIX);
@@ -92,18 +98,26 @@ class TxStateMetaRocksDbPartitionStorage {
private byte[] createKey(byte prefix) {
return ByteBuffer.allocate(KEY_SIZE_BYTES)
- .order(ByteOrder.BIG_ENDIAN)
+ .order(BYTE_ORDER)
.put(prefix)
- .putInt(tableId)
+ .putInt(tableOrZoneId)
.putShort((short) partitionId)
.array();
}
+ private static byte[] createKeyPrefixForTableOrZone(byte prefix, int
tableOrZoneId) {
+ return ByteBuffer.allocate(TABLE_OR_ZONE_PREFIX_SIZE_BYTES)
+ .order(BYTE_ORDER)
+ .put(prefix)
+ .putInt(tableOrZoneId)
+ .array();
+ }
+
void start() throws RocksDBException {
byte[] lastAppliedBytes = columnFamily.get(lastAppliedKey);
if (lastAppliedBytes != null) {
- ByteBuffer buf =
ByteBuffer.wrap(lastAppliedBytes).order(ByteOrder.BIG_ENDIAN);
+ ByteBuffer buf =
ByteBuffer.wrap(lastAppliedBytes).order(BYTE_ORDER);
lastAppliedIndex = buf.getLong();
lastAppliedTerm = buf.getLong();
@@ -127,7 +141,7 @@ class TxStateMetaRocksDbPartitionStorage {
byte[] lastAppliedBytes = columnFamily.get(lastAppliedKey);
if (lastAppliedBytes != null) {
- ByteBuffer buf =
ByteBuffer.wrap(lastAppliedBytes).order(ByteOrder.BIG_ENDIAN);
+ ByteBuffer buf =
ByteBuffer.wrap(lastAppliedBytes).order(BYTE_ORDER);
this.lastAppliedIndex = buf.getLong();
this.lastAppliedTerm = buf.getLong();
@@ -190,7 +204,7 @@ class TxStateMetaRocksDbPartitionStorage {
private static byte[] indexAndTermToBytes(long lastAppliedIndex, long
lastAppliedTerm) {
return ByteBuffer.allocate(2 * Long.BYTES)
- .order(ByteOrder.BIG_ENDIAN)
+ .order(BYTE_ORDER)
.putLong(lastAppliedIndex)
.putLong(lastAppliedTerm)
.array();
@@ -207,4 +221,13 @@ class TxStateMetaRocksDbPartitionStorage {
config = null;
leaseInfo = null;
}
+
+ static void clearForTableOrZone(WriteBatch writeBatch, ColumnFamilyHandle
cf, int tableOrZoneId) throws RocksDBException {
+ for (byte prefixByte : List.of(LAST_APPLIED_PREFIX, CONF_PREFIX,
LEASE_INFO_PREFIX, SNAPSHOT_INFO_PREFIX)) {
+ byte[] start = createKeyPrefixForTableOrZone(prefixByte,
tableOrZoneId);
+ byte[] end = incrementPrefix(start);
+
+ writeBatch.deleteRange(cf, start, end);
+ }
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
index 6dd6d3575e0..7e89785ec55 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbPartitionStorage.java
@@ -17,10 +17,10 @@
package org.apache.ignite.internal.tx.storage.state.rocksdb;
-import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.util.Objects.requireNonNull;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.storage.util.StorageUtils.transitionToDestroyedState;
+import static
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage.BYTE_ORDER;
import static
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage.TABLE_OR_ZONE_PREFIX_SIZE_BYTES;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -284,7 +284,7 @@ public class TxStateRocksDbPartitionStorage implements
TxStatePartitionStorage {
throwExceptionIfStorageInProgressOfRebalance();
// This lower bound is the lowest possible key that goes after
"lastAppliedIndexAndTermKey".
- byte[] lowerBound = ByteBuffer.allocate(PREFIX_SIZE_BYTES +
1).order(BIG_ENDIAN)
+ byte[] lowerBound = ByteBuffer.allocate(PREFIX_SIZE_BYTES +
1).order(BYTE_ORDER)
.putInt(tableOrZoneId)
.putShort(shortPartitionId(partitionId))
.put((byte) 0)
@@ -368,7 +368,7 @@ public class TxStateRocksDbPartitionStorage implements
TxStatePartitionStorage {
}
private byte @Nullable [] readLastAppliedIndexAndTerm() throws
RocksDBException {
- byte[] lastAppliedIndexAndTermKey =
ByteBuffer.allocate(PREFIX_SIZE_BYTES).order(BIG_ENDIAN)
+ byte[] lastAppliedIndexAndTermKey =
ByteBuffer.allocate(PREFIX_SIZE_BYTES).order(BYTE_ORDER)
.putInt(tableOrZoneId)
.putShort(shortPartitionId(partitionId))
.array();
@@ -392,21 +392,21 @@ public class TxStateRocksDbPartitionStorage implements
TxStatePartitionStorage {
}
private byte[] partitionStartPrefix() {
- return ByteBuffer.allocate(PREFIX_SIZE_BYTES).order(BIG_ENDIAN)
+ return ByteBuffer.allocate(PREFIX_SIZE_BYTES).order(BYTE_ORDER)
.putInt(tableOrZoneId)
.putShort(shortPartitionId(partitionId))
.array();
}
private byte[] partitionEndPrefix() {
- return ByteBuffer.allocate(PREFIX_SIZE_BYTES).order(BIG_ENDIAN)
+ return ByteBuffer.allocate(PREFIX_SIZE_BYTES).order(BYTE_ORDER)
.putInt(tableOrZoneId)
.putShort(shortPartitionId(partitionId + 1))
.array();
}
private byte[] txIdToKey(UUID txId) {
- return ByteBuffer.allocate(FULL_KEY_SIZE_BYES).order(BIG_ENDIAN)
+ return ByteBuffer.allocate(FULL_KEY_SIZE_BYES).order(BYTE_ORDER)
.putInt(tableOrZoneId)
.putShort(shortPartitionId(partitionId))
.putLong(txId.getMostSignificantBits())
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java
index bd8851de00a..f520362a4b3 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java
@@ -20,15 +20,19 @@ package org.apache.ignite.internal.tx.storage.state.rocksdb;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage.TABLE_OR_ZONE_PREFIX_SIZE_BYTES;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -41,12 +45,17 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.flush.RocksDbFlusher;
import org.apache.ignite.internal.tx.storage.state.TxStateStorageException;
+import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.lang.ErrorGroups.Common;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
/**
@@ -68,6 +77,8 @@ public class TxStateRocksDbSharedStorage implements
IgniteComponent {
private static final int TX_STATE_STORAGE_FLUSH_DELAY = 100;
private static final IntSupplier TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER =
() -> TX_STATE_STORAGE_FLUSH_DELAY;
+ static final ByteOrder BYTE_ORDER = BIG_ENDIAN;
+
/** Rocks DB instance. */
private volatile RocksDB db;
@@ -219,7 +230,7 @@ public class TxStateRocksDbSharedStorage implements
IgniteComponent {
flusher.init(db, cfHandles);
} catch (Exception e) {
- throw new TxStateStorageException(Common.INTERNAL_ERR, "Could not
create transaction state storage", e);
+ throw new TxStateStorageException(INTERNAL_ERR, "Could not create
transaction state storage", e);
}
}
@@ -274,13 +285,54 @@ public class TxStateRocksDbSharedStorage implements
IgniteComponent {
* @param tableOrZoneId ID of the table or zone.
*/
public void destroyStorage(int tableOrZoneId) {
- byte[] start =
ByteBuffer.allocate(TABLE_OR_ZONE_PREFIX_SIZE_BYTES).order(BIG_ENDIAN).putInt(tableOrZoneId).array();
- byte[] end =
ByteBuffer.allocate(TABLE_OR_ZONE_PREFIX_SIZE_BYTES).order(BIG_ENDIAN).putInt(tableOrZoneId
+ 1).array();
+ byte[] dataStart =
ByteBuffer.allocate(TxStateRocksDbStorage.TABLE_OR_ZONE_PREFIX_SIZE_BYTES)
+ .order(BYTE_ORDER)
+ .putInt(tableOrZoneId)
+ .array();
+ byte[] dataEnd = incrementPrefix(dataStart);
- try {
- db.deleteRange(start, end);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.deleteRange(txStateColumnFamily.handle(), dataStart,
dataEnd);
+
+ TxStateMetaRocksDbPartitionStorage.clearForTableOrZone(writeBatch,
txStateMetaColumnFamily().handle(), tableOrZoneId);
+
+ db.write(writeOptions, writeBatch);
} catch (Exception e) {
throw new TxStateStorageException("Failed to destroy the
transaction state storage [tableOrZoneId={}]", e, tableOrZoneId);
}
}
+
+ /**
+ * Returns IDs of tables/zones for which there are tx state partition
storages on disk. Those were created and flushed to disk; either
+ * destruction was not started for them, or it failed.
+ */
+ public Set<Integer> tableOrZoneIdsOnDisk() {
+ Set<Integer> ids = new HashSet<>();
+
+ byte[] lastAppliedGlobalPrefix =
{TxStateMetaRocksDbPartitionStorage.LAST_APPLIED_PREFIX};
+
+ try (
+ var upperBound = new
Slice(incrementPrefix(lastAppliedGlobalPrefix));
+ var readOptions = new
ReadOptions().setIterateUpperBound(upperBound);
+ RocksIterator it =
txStateMetaColumnFamily.newIterator(readOptions)
+ ) {
+ it.seek(lastAppliedGlobalPrefix);
+
+ while (it.isValid()) {
+ byte[] key = it.key();
+ int tableOrZoneId = ByteUtils.bytesToInt(key,
lastAppliedGlobalPrefix.length);
+ ids.add(tableOrZoneId);
+
+ it.next();
+ }
+
+ // Doing this to make an exception thrown if the iteration was
stopped due to an error and not due to exhausting
+ // the iteration space.
+ it.status();
+ } catch (RocksDBException e) {
+ throw new TxStateStorageException(INTERNAL_ERR, "Cannot get
table/zone IDs", e);
+ }
+
+ return Set.copyOf(ids);
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index dbe8c204a87..9d2f58adefe 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -35,8 +35,10 @@ import org.jetbrains.annotations.Nullable;
* RocksDb implementation of {@link TxStateStorage}.
*/
public class TxStateRocksDbStorage implements TxStateStorage {
+ static final int TABLE_OR_ZONE_ID_SIZE_BYTES = Integer.BYTES;
+
/** Prefix length for the payload within a table/zone. Consists of
tableId/zoneId (4 bytes) in Big Endian. */
- static final int TABLE_OR_ZONE_PREFIX_SIZE_BYTES = Integer.BYTES;
+ static final int TABLE_OR_ZONE_PREFIX_SIZE_BYTES =
TABLE_OR_ZONE_ID_SIZE_BYTES;
/** Partition storages. */
private final AtomicReferenceArray<TxStateRocksDbPartitionStorage>
storages;
@@ -113,7 +115,7 @@ public class TxStateRocksDbStorage implements
TxStateStorage {
}
@Override
- public void destroyTxStateStorage(int partitionId) {
+ public void destroyPartitionStorage(int partitionId) {
checkPartitionId(partitionId);
TxStatePartitionStorage storage = storages.getAndSet(partitionId,
null);
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStatePartitionStorageTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStatePartitionStorageTest.java
index 2c1fd629b25..6b53518b0d3 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStatePartitionStorageTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStatePartitionStorageTest.java
@@ -61,7 +61,7 @@ public class RocksDbTxStatePartitionStorageTest extends
AbstractTxStatePartition
private ExecutorService executor;
@Override
- protected TxStateRocksDbStorage createTableStorage() {
+ protected TxStateRocksDbStorage createTableOrZoneStorage() {
return new TxStateRocksDbStorage(
TABLE_ID,
3,
@@ -111,7 +111,7 @@ public class RocksDbTxStatePartitionStorageTest extends
AbstractTxStatePartition
tableStorage.close();
- tableStorage = createTableStorage();
+ tableStorage = createTableOrZoneStorage();
tableStorage.start();
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStatePartitionStorageTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
similarity index 66%
copy from
modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStatePartitionStorageTest.java
copy to
modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
index 2c1fd629b25..88528eb8b71 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStatePartitionStorageTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/RocksDbTxStateStorageTest.java
@@ -18,29 +18,22 @@
package org.apache.ignite.internal.tx.storage.state.rocksdb;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static
org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage.REBALANCE_IN_PROGRESS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import java.nio.file.Path;
-import java.util.List;
-import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.failure.FailureProcessor;
-import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.tx.TxMeta;
-import
org.apache.ignite.internal.tx.storage.state.AbstractTxStatePartitionStorageTest;
-import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.internal.tx.storage.state.AbstractTxStateStorageTest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
/**
@@ -48,7 +41,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@ExtendWith(ExecutorServiceExtension.class)
@ExtendWith(WorkDirectoryExtension.class)
-public class RocksDbTxStatePartitionStorageTest extends
AbstractTxStatePartitionStorageTest {
+public class RocksDbTxStateStorageTest extends AbstractTxStateStorageTest {
@WorkDirectory
private Path workDir;
@@ -61,9 +54,9 @@ public class RocksDbTxStatePartitionStorageTest extends
AbstractTxStatePartition
private ExecutorService executor;
@Override
- protected TxStateRocksDbStorage createTableStorage() {
+ protected TxStateRocksDbStorage createTableOrZoneStorage() {
return new TxStateRocksDbStorage(
- TABLE_ID,
+ ZONE_ID,
3,
sharedStorage
);
@@ -92,33 +85,4 @@ public class RocksDbTxStatePartitionStorageTest extends
AbstractTxStatePartition
assertThat(sharedStorage.stopAsync(new ComponentContext()),
willCompleteSuccessfully());
}
-
- @Test
- void testRestartStorageInProgressOfRebalance() {
- TxStatePartitionStorage storage =
tableStorage.getOrCreatePartitionStorage(0);
-
- List<IgniteBiTuple<UUID, TxMeta>> rows = List.of(
- randomTxMetaTuple(1, UUID.randomUUID()),
- randomTxMetaTuple(1, UUID.randomUUID())
- );
-
- fillStorage(storage, rows);
-
- // We emulate the situation that the rebalancing did not have time to
end.
- storage.lastApplied(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
-
- assertThat(storage.flush(), willCompleteSuccessfully());
-
- tableStorage.close();
-
- tableStorage = createTableStorage();
-
- tableStorage.start();
-
- storage = tableStorage.getOrCreatePartitionStorage(0);
-
- checkMeta(storage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS,
GROUP_CONFIGURATION, LEASE_INFO, SNAPSHOT_INFO);
-
- checkStorageContainsRows(storage, rows);
- }
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorageTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorageTest.java
new file mode 100644
index 00000000000..93be50b6217
--- /dev/null
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorageTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state.rocksdb;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import java.nio.file.Path;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
+class TxStateRocksDbSharedStorageTest {
+ @WorkDirectory
+ private Path workDir;
+
+ @InjectExecutorService
+ private ScheduledExecutorService scheduler;
+
+ @InjectExecutorService
+ private ExecutorService executor;
+
+ private TxStateRocksDbSharedStorage sharedStorage;
+
+ @BeforeEach
+ void createAndStartSharedStorage() {
+ sharedStorage = new TxStateRocksDbSharedStorage(
+ workDir,
+ scheduler,
+ executor,
+ () -> {},
+ new FailureManager(new NoOpFailureHandler())
+ );
+
+ startSharedStorage();
+ }
+
+ private void startSharedStorage() {
+ assertThat(sharedStorage.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+ }
+
+ @AfterEach
+ void stopSharedStorage() {
+ if (sharedStorage != null) {
+ assertThat(sharedStorage.stopAsync(), willCompleteSuccessfully());
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void remembersCreatedTableOrZoneIdsOnDisk(boolean restart) {
+ createZoneTxStateStorageLeavingTraceOnDisk(1);
+ createZoneTxStateStorageLeavingTraceOnDisk(3);
+
+ if (restart) {
+ flushAndRestartSharedStorage();
+ }
+
+ assertThat(sharedStorage.tableOrZoneIdsOnDisk(), containsInAnyOrder(1,
3));
+ }
+
+ private void createZoneTxStateStorageLeavingTraceOnDisk(int zoneId) {
+ TxStateRocksDbStorage zoneStorage = new TxStateRocksDbStorage(zoneId,
2, sharedStorage);
+ zoneStorage.start();
+
+ TxStateRocksDbPartitionStorage partitionStorage =
zoneStorage.createPartitionStorage(0);
+ partitionStorage.start();
+
+ partitionStorage.committedGroupConfiguration(new byte[]{1, 2, 3}, 1,
1);
+ }
+
+ private void flushAndRestartSharedStorage() {
+ assertThat(sharedStorage.awaitFlush(true), willCompleteSuccessfully());
+
+ stopSharedStorage();
+ startSharedStorage();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void tableOrZoneIdsOnDiskGetRemovedOnPersistedDestruction(boolean restart)
{
+ createZoneTxStateStorageLeavingTraceOnDisk(1);
+ createZoneTxStateStorageLeavingTraceOnDisk(3);
+
+ sharedStorage.destroyStorage(1);
+
+ if (restart) {
+ flushAndRestartSharedStorage();
+ }
+
+ assertThat(sharedStorage.tableOrZoneIdsOnDisk(), contains(3));
+ }
+}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorageTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorageTest.java
index 9efb7759a0e..293cac400fd 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorageTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorageTest.java
@@ -22,10 +22,10 @@ import
org.apache.ignite.internal.tx.storage.state.AbstractTxStatePartitionStora
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
/**
- * Tx storage test for test implementation based on {@link ConcurrentHashMap}.
+ * Tx partition storage test for test implementation based on {@link
ConcurrentHashMap}.
*/
public class TestTxStatePartitionStorageTest extends
AbstractTxStatePartitionStorageTest {
- @Override protected TxStateStorage createTableStorage() {
+ @Override protected TxStateStorage createTableOrZoneStorage() {
return new TestTxStateStorage();
}
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorageTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorageTest.java
similarity index 86%
copy from
modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorageTest.java
copy to
modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorageTest.java
index 9efb7759a0e..2dd9c26410a 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStatePartitionStorageTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorageTest.java
@@ -18,14 +18,14 @@
package org.apache.ignite.internal.tx.storage.state.test;
import java.util.concurrent.ConcurrentHashMap;
-import
org.apache.ignite.internal.tx.storage.state.AbstractTxStatePartitionStorageTest;
+import org.apache.ignite.internal.tx.storage.state.AbstractTxStateStorageTest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
/**
* Tx storage test for test implementation based on {@link ConcurrentHashMap}.
*/
-public class TestTxStatePartitionStorageTest extends
AbstractTxStatePartitionStorageTest {
- @Override protected TxStateStorage createTableStorage() {
+public class TestTxStateStorageTest extends AbstractTxStateStorageTest {
+ @Override protected TxStateStorage createTableOrZoneStorage() {
return new TestTxStateStorage();
}
}
diff --git
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java
index 6fb009432e2..c7b1c3a1d90 100644
---
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java
+++
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStatePartitionStorageTest.java
@@ -68,7 +68,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
/**
- * Abstract tx storage test.
+ * Abstract tx state partition storage test.
*/
public abstract class AbstractTxStatePartitionStorageTest extends
BaseIgniteAbstractTest {
protected static final int TABLE_ID = 1;
@@ -84,11 +84,11 @@ public abstract class AbstractTxStatePartitionStorageTest
extends BaseIgniteAbst
/**
* Creates {@link TxStatePartitionStorage} to test.
*/
- protected abstract TxStateStorage createTableStorage();
+ protected abstract TxStateStorage createTableOrZoneStorage();
@BeforeEach
protected void beforeTest() {
- tableStorage = createTableStorage();
+ tableStorage = createTableOrZoneStorage();
tableStorage.start();
}
diff --git
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
new file mode 100644
index 00000000000..4c3a6419d84
--- /dev/null
+++
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.storage.lease.LeaseInfo;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Abstract tx state storage test.
+ */
+public abstract class AbstractTxStateStorageTest extends
BaseIgniteAbstractTest {
+ protected static final int ZONE_ID = 1;
+
+ protected static final byte[] GROUP_CONFIGURATION = {1, 2, 3};
+
+ protected static final byte[] SNAPSHOT_INFO = {4, 5, 6};
+
+ protected static final LeaseInfo LEASE_INFO = new LeaseInfo(1,
UUID.randomUUID(), "node");
+
+ protected TxStateStorage txStateStorage;
+
+ /**
+ * Creates {@link TxStatePartitionStorage} to test.
+ */
+ protected abstract TxStateStorage createTableOrZoneStorage();
+
+ @BeforeEach
+ protected void beforeTest() {
+ createAndStartStorage();
+ }
+
+ private void createAndStartStorage() {
+ txStateStorage = createTableOrZoneStorage();
+
+ txStateStorage.start();
+ }
+
+ @AfterEach
+ protected void afterTest() throws Exception {
+ txStateStorage.close();
+ }
+
+ @Test
+ public void partitionDestructionRemovesAllDataAndMetadata() {
+ int partitionIndex = 0;
+
+ TxStatePartitionStorage partitionStorage =
txStateStorage.getOrCreatePartitionStorage(partitionIndex);
+
+ partitionStorage.committedGroupConfiguration(GROUP_CONFIGURATION, 1,
1);
+ partitionStorage.leaseInfo(LEASE_INFO, 2, 1);
+ partitionStorage.snapshotInfo(SNAPSHOT_INFO);
+
+ TxMeta txMeta = new TxMeta(TxState.ABORTED, List.of(), null);
+ partitionStorage.putForRebalance(UUID.randomUUID(), txMeta);
+
+ txStateStorage.destroyPartitionStorage(partitionIndex);
+
+ TxStatePartitionStorage newPartitionStorage =
txStateStorage.createPartitionStorage(partitionIndex);
+
+ assertThat(newPartitionStorage.lastAppliedIndex(), is(0L));
+ assertThat(newPartitionStorage.lastAppliedTerm(), is(0L));
+ assertThat(newPartitionStorage.committedGroupConfiguration(),
is(nullValue()));
+ assertThat(newPartitionStorage.leaseInfo(), is(nullValue()));
+ assertThat(newPartitionStorage.snapshotInfo(), is(nullValue()));
+ }
+
+ @Test
+ public void wholeDestructionRemovesAllDataAndMetadata() {
+ int partitionIndex = 0;
+
+ TxStatePartitionStorage partitionStorage =
txStateStorage.getOrCreatePartitionStorage(partitionIndex);
+
+ partitionStorage.committedGroupConfiguration(GROUP_CONFIGURATION, 1,
1);
+ partitionStorage.leaseInfo(LEASE_INFO, 2, 1);
+ partitionStorage.snapshotInfo(SNAPSHOT_INFO);
+
+ TxMeta txMeta = new TxMeta(TxState.ABORTED, List.of(), null);
+ partitionStorage.putForRebalance(UUID.randomUUID(), txMeta);
+
+ txStateStorage.destroy();
+ createAndStartStorage();
+
+ TxStatePartitionStorage newPartitionStorage =
txStateStorage.createPartitionStorage(partitionIndex);
+
+ assertThat(newPartitionStorage.lastAppliedIndex(), is(0L));
+ assertThat(newPartitionStorage.lastAppliedTerm(), is(0L));
+ assertThat(newPartitionStorage.committedGroupConfiguration(),
is(nullValue()));
+ assertThat(newPartitionStorage.leaseInfo(), is(nullValue()));
+ assertThat(newPartitionStorage.snapshotInfo(), is(nullValue()));
+ }
+}
diff --git
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
index 3c0dc23de7e..c012c333b93 100644
---
a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
+++
b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
@@ -46,7 +46,7 @@ public class TestTxStateStorage implements TxStateStorage {
}
@Override
- public void destroyTxStateStorage(int partitionId) {
+ public void destroyPartitionStorage(int partitionId) {
TxStatePartitionStorage storage = storages.remove(partitionId);
if (storage != null) {