This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c660f2a496 IGNITE-15931 Implement storage for tx states (#1056)
c660f2a496 is described below
commit c660f2a4960505298d676ce68b9b8dbbb834b7d0
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Sep 6 20:18:46 2022 +0300
IGNITE-15931 Implement storage for tx states (#1056)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 15 ++
modules/transactions/pom.xml | 5 +
.../java/org/apache/ignite/internal/tx/TxMeta.java | 64 ++++++
.../internal/tx/storage/state/TxStateStorage.java | 125 +++++++++++
.../state/rocksdb/TxStateRocksDbStorage.java | 238 +++++++++++++++++++++
.../tx/storage/state/TxStateStorageTest.java | 234 ++++++++++++++++++++
6 files changed, 681 insertions(+)
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index fcfb2c8e30..08d71125cc 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -169,4 +169,19 @@ public class ErrorGroups {
/** Index not found. */
public static final int INDEX_NOT_FOUND_ERR =
INDEX_ERR_GROUP.registerErrorCode(2);
}
+
+ /** Transactions error group. */
+ public static class Transactions {
+ /** Transactions error group. */
+ public static final ErrorGroup TX_ERR_GROUP =
ErrorGroup.newGroup("TX", 3);
+
+ /** Error on creation of tx state storage. */
+ public static int TX_STATE_STORAGE_CREATE_ERR =
TX_ERR_GROUP.registerErrorCode(1);
+
+ /** Error on destruction of tx state storage. */
+ public static int TX_STATE_STORAGE_DESTROY_ERR =
TX_ERR_GROUP.registerErrorCode(2);
+
+ /** Error of tx state storage. */
+ public static int TX_STATE_STORAGE_ERR =
TX_ERR_GROUP.registerErrorCode(3);
+ }
}
diff --git a/modules/transactions/pom.xml b/modules/transactions/pom.xml
index 26155ec1ec..2ae6d8ffd5 100644
--- a/modules/transactions/pom.xml
+++ b/modules/transactions/pom.xml
@@ -58,6 +58,11 @@
<artifactId>ignite-raft-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-rocksdb-common</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
new file mode 100644
index 0000000000..75e34ada37
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static java.util.Collections.unmodifiableList;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.ignite.hlc.HybridTimestamp;
+
+/** Transaction meta. */
+public class TxMeta implements Serializable {
+ /** Serial version UID. */
+ private static final long serialVersionUID = -172513482743911860L;
+
+ /** Tx state. */
+ private final TxState txState;
+
+ /** The list of enlisted partitions. */
+ private final List<String> enlistedPartitions;
+
+ /** Commit timestamp. */
+ private final HybridTimestamp commitTimestamp;
+
+ /**
+ * The constructor.
+ *
+ * @param txState Tx state.
+ * @param enlistedPartitions The list of enlisted partitions.
+ * @param commitTimestamp Commit timestamp.
+ */
+ public TxMeta(TxState txState, List<String> enlistedPartitions,
HybridTimestamp commitTimestamp) {
+ this.txState = txState;
+ this.enlistedPartitions = enlistedPartitions;
+ this.commitTimestamp = commitTimestamp;
+ }
+
+ public TxState txState() {
+ return txState;
+ }
+
+ public List<String> enlistedPartitions() {
+ return unmodifiableList(enlistedPartitions);
+ }
+
+ public HybridTimestamp commitTimestamp() {
+ return commitTimestamp;
+ }
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
new file mode 100644
index 0000000000..ba3e26e9f6
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import java.nio.file.Path;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage for transaction meta, {@link TxMeta}.
+ */
+public interface TxStateStorage extends AutoCloseable {
+ /**
+ * Start the storage.
+ *
+ * @throws IgniteInternalException with {@link
Transactions#TX_STATE_STORAGE_CREATE_ERR} error code in case when
+ * creation of the storage has failed.
+ */
+ void start();
+
+ /**
+ * Whether the storage is started.
+ *
+ * @return {@code true} if the storage is started, {@code false} otherwise.
+ */
+ boolean isStarted();
+
+ /**
+ * Stop the storage.
+ */
+ void stop() throws Exception;
+
+ /**
+ * Get tx meta by tx id.
+ *
+ * @param txId Tx id.
+ * @return Tx meta.
+ * @throws IgniteInternalException with {@link
Transactions#TX_STATE_STORAGE_ERR} error code in case when
+ * the operation has failed.
+ */
+ TxMeta get(UUID txId);
+
+ /**
+ * Put the tx meta into the storage.
+ *
+ * @param txId Tx id.
+ * @param txMeta Tx meta.
+ * @throws IgniteInternalException with {@link
Transactions#TX_STATE_STORAGE_ERR} error code in case when
+ * the operation has failed.
+ */
+ void put(UUID txId, TxMeta txMeta);
+
+ /**
+ * Atomically change the tx meta in the storage.
+ *
+ * @param txId Tx id.
+ * @param txStateExpected Tx state that is expected to be in the storage.
+ * @param txMeta Tx meta.
+ * @return Whether the CAS operation is successful.
+ * @throws IgniteInternalException with {@link
Transactions#TX_STATE_STORAGE_ERR} error code in case when
+ * the operation has failed.
+ */
+ boolean compareAndSet(UUID txId, @NotNull TxState txStateExpected,
@NotNull TxMeta txMeta);
+
+ /**
+ * Remove the tx meta from the storage.
+ *
+ * @param txId Tx id.
+ * @throws IgniteInternalException with {@link
Transactions#TX_STATE_STORAGE_ERR} error code in case when
+ * the operation has failed.
+ */
+ void remove(UUID txId);
+
+ /**
+ * Create a cursor to scan all data in the storage.
+ *
+ * @return Cursor.
+ */
+ Cursor<IgniteBiTuple<UUID, TxMeta>> scan();
+
+ /**
+ * Removes all data from the storage and frees all resources.
+ *
+ * @throws IgniteInternalException with {@link
Transactions#TX_STATE_STORAGE_DESTROY_ERR} error code in case when
+ * the operation has failed.
+ */
+ void destroy();
+
+ /**
+ * Create a snapshot of the storage's current state in the specified
directory.
+ *
+ * @param snapshotPath Snapshot path.
+ * @return Future which completes when the snapshot operation is complete.
+ */
+ CompletableFuture<Void> snapshot(Path snapshotPath);
+
+ /**
+ * Restore a storage from the snapshot.
+ *
+ * @param snapshotPath Snapshot path.
+ */
+ void restoreSnapshot(Path snapshotPath);
+}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
new file mode 100644
index 0000000000..8139f82090
--- /dev/null
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state.rocksdb;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_CREATE_ERR;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_DESTROY_ERR;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_ERR;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
+import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Transaction;
+import org.rocksdb.TransactionDB;
+import org.rocksdb.TransactionDBOptions;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Tx state storage implementation based on RocksDB.
+ */
+public class TxStateRocksDbStorage implements TxStateStorage, AutoCloseable {
+ /** Database path. */
+ private final Path dbPath;
+
+ /** RocksDB database. */
+ private volatile TransactionDB db;
+
+ /** RockDB options. */
+ @Nullable
+ private volatile Options options;
+
+ /** Database options. */
+ private volatile TransactionDBOptions txDbOptions;
+
+ /** Thread-pool for snapshot operations execution. */
+ private final ExecutorService snapshotExecutor;
+
+ /** Snapshot manager. */
+ private volatile RocksSnapshotManager snapshotManager;
+
+ /** Snapshot restore lock. */
+ private final Object snapshotRestoreLock = new Object();
+
+ /** Whether is started. */
+ private boolean isStarted;
+
+ /**
+ * The constructor.
+ *
+ * @param dbPath Database path.
+ * @param snapshotExecutor Snapshot thread pool.
+ */
+ public TxStateRocksDbStorage(Path dbPath, ExecutorService
snapshotExecutor) {
+ this.dbPath = dbPath;
+ this.snapshotExecutor = snapshotExecutor;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() {
+ try {
+ this.options = new Options().setCreateIfMissing(true);
+
+ this.txDbOptions = new TransactionDBOptions();
+
+ this.db = TransactionDB.open(options, txDbOptions,
dbPath.toString());
+
+ ColumnFamily defaultCf = ColumnFamily.wrap(db,
db.getDefaultColumnFamily());
+
+ snapshotManager = new RocksSnapshotManager(db,
List.of(fullRange(defaultCf)), snapshotExecutor);
+
+ isStarted = true;
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(TX_STATE_STORAGE_CREATE_ERR,
"Failed to start transaction state storage", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isStarted() {
+ return isStarted;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws Exception {
+ IgniteUtils.closeAll(options, txDbOptions, db);
+
+ db = null;
+ options = null;
+ txDbOptions = null;
+
+ isStarted = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public TxMeta get(UUID txId) {
+ try {
+ byte[] txMetaBytes = db.get(toBytes(txId));
+
+ return txMetaBytes == null ? null : (TxMeta)
fromBytes(txMetaBytes);
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to
get a value from the transaction state storage", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void put(UUID txId, TxMeta txMeta) {
+ try {
+ db.put(toBytes(txId), toBytes(txMeta));
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to
put a value into the transaction state storage", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean compareAndSet(UUID txId, @NotNull TxState
txStateExpected, @NotNull TxMeta txMeta) {
+ requireNonNull(txStateExpected);
+ requireNonNull(txMeta);
+
+ byte[] txIdBytes = toBytes(txId);
+
+ try (Transaction rocksTx = db.beginTransaction(new WriteOptions())) {
+ byte[] txMetaExistingBytes = rocksTx.get(new ReadOptions(),
toBytes(txId));
+ TxMeta txMetaExisting = (TxMeta) fromBytes(txMetaExistingBytes);
+
+ if (txMetaExisting.txState() == txStateExpected) {
+ rocksTx.put(txIdBytes, toBytes(txMeta));
+
+ rocksTx.commit();
+
+ return true;
+ } else {
+ rocksTx.rollback();
+
+ return false;
+ }
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_ERR,
+ "Failed perform CAS operation over a value in transaction
state storage",
+ e
+ );
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(UUID txId) {
+ try {
+ db.delete(toBytes(txId));
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to
remove a value from the transaction state storage", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
+ RocksIterator rocksIterator = db.newIterator();
+ rocksIterator.seekToFirst();
+
+ RocksIteratorAdapter<IgniteBiTuple<UUID, TxMeta>> iteratorAdapter =
new RocksIteratorAdapter<>(rocksIterator) {
+ @Override protected IgniteBiTuple<UUID, TxMeta> decodeEntry(byte[]
keyBytes, byte[] valueBytes) {
+ UUID key = (UUID) fromBytes(keyBytes);
+ TxMeta txMeta = (TxMeta) fromBytes(valueBytes);
+
+ return new IgniteBiTuple<>(key, txMeta);
+ }
+ };
+
+ return Cursor.fromIterator(iteratorAdapter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void destroy() {
+ try (Options options = new Options()) {
+ close();
+
+ RocksDB.destroyDB(dbPath.toString(), options);
+ } catch (Exception e) {
+ throw new IgniteInternalException(TX_STATE_STORAGE_DESTROY_ERR,
"Failed to destroy the transaction state storage", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> snapshot(Path snapshotPath) {
+ return snapshotManager.createSnapshot(snapshotPath);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void restoreSnapshot(Path snapshotPath) {
+ synchronized (snapshotRestoreLock) {
+ destroy();
+
+ start();
+
+ snapshotManager.restoreSnapshot(snapshotPath);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ stop();
+ }
+}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
new file mode 100644
index 0000000000..5506960e6f
--- /dev/null
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tx storage test.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class TxStateStorageTest {
+ @WorkDirectory
+ protected Path workDir;
+
+ @Test
+ public void testPutGetRemove() throws Exception {
+ try (TxStateStorage storage = createStorage()) {
+ storage.start();
+
+ List<UUID> txIds = new ArrayList<>();
+
+ for (int i = 0; i < 100; i++) {
+ UUID txId = UUID.randomUUID();
+
+ txIds.add(txId);
+
+ storage.put(txId, new TxMeta(TxState.PENDING,
generateEnlistedPartitions(i), generateTimestamp(txId)));
+ }
+
+ for (int i = 0; i < 100; i++) {
+ TxMeta txMeta = storage.get(txIds.get(i));
+ TxMeta txMetaExpected = new TxMeta(TxState.PENDING,
generateEnlistedPartitions(i), generateTimestamp(txIds.get(i)));
+ assertTxMetaEquals(txMetaExpected, txMeta);
+ }
+
+ for (int i = 0; i < 100; i++) {
+ if (i % 2 == 0) {
+ storage.remove(txIds.get(i));
+ }
+ }
+
+ for (int i = 0; i < 100; i++) {
+ if (i % 2 == 0) {
+ TxMeta txMeta = storage.get(txIds.get(i));
+ assertNull(txMeta);
+ } else {
+ TxMeta txMeta = storage.get(txIds.get(i));
+ TxMeta txMetaExpected = new TxMeta(TxState.PENDING,
generateEnlistedPartitions(i), generateTimestamp(txIds.get(i)));
+ assertTxMetaEquals(txMetaExpected, txMeta);
+ }
+ }
+ }
+ }
+
+ private List<String> generateEnlistedPartitions(int c) {
+ return IntStream.range(0,
c).mapToObj(String::valueOf).collect(toList());
+ }
+
+ private HybridTimestamp generateTimestamp(UUID uuid) {
+ long physical = Math.abs(uuid.getMostSignificantBits());
+ if (physical == 0) {
+ physical++;
+ }
+
+ int logical =
Math.abs(Long.valueOf(uuid.getLeastSignificantBits()).intValue());
+
+ return new HybridTimestamp(physical, logical);
+ }
+
+ @Test
+ public void testCas() throws Exception {
+ try (TxStateStorage storage = createStorage()) {
+ storage.start();
+
+ UUID txId = UUID.randomUUID();
+
+ TxMeta txMeta0 = new TxMeta(TxState.PENDING, new ArrayList<>(),
generateTimestamp(txId));
+ TxMeta txMeta1 = new TxMeta(TxState.COMMITED, new ArrayList<>(),
generateTimestamp(txId));
+ TxMeta txMeta2 = new TxMeta(TxState.COMMITED, new ArrayList<>(),
generateTimestamp(UUID.randomUUID()));
+
+ storage.put(txId, txMeta0);
+
+ assertTxMetaEquals(storage.get(txId), txMeta0);
+
+ assertFalse(storage.compareAndSet(txId, txMeta1.txState(),
txMeta2));
+ assertTrue(storage.compareAndSet(txId, txMeta0.txState(),
txMeta2));
+
+ assertTxMetaEquals(storage.get(txId), txMeta2);
+ }
+ }
+
+ @Test
+ public void testScan() throws Exception {
+ try (TxStateStorage storage = createStorage()) {
+ storage.start();
+
+ Map<UUID, TxMeta> txs = new HashMap<>();
+
+ for (int i = 0; i < 100; i++) {
+ UUID txId = UUID.randomUUID();
+ TxMeta txMeta = new TxMeta(TxState.PENDING,
generateEnlistedPartitions(i), generateTimestamp(txId));
+ txs.put(txId, txMeta);
+ storage.put(txId, txMeta);
+ }
+
+ try (Cursor<IgniteBiTuple<UUID, TxMeta>> scanCursor =
storage.scan()) {
+ assertTrue(scanCursor.hasNext());
+
+ while (scanCursor.hasNext()) {
+ IgniteBiTuple<UUID, TxMeta> txData = scanCursor.next();
+ TxMeta txMeta = txs.remove(txData.getKey());
+
+ assertNotNull(txMeta);
+ assertNotNull(txData);
+ assertNotNull(txData.getValue());
+ assertTxMetaEquals(txMeta, txData.getValue());
+ }
+
+ assertTrue(txs.isEmpty());
+ }
+ }
+ }
+
+ @Test
+ public void testSnapshot() throws Exception {
+ try (TxStateStorage storage = createStorage()) {
+ storage.start();
+
+ List<UUID> inSnapshot = new ArrayList<>();
+
+ for (int i = 0; i < 100; i++) {
+ UUID txId = UUID.randomUUID();
+
+ storage.put(txId, new TxMeta(TxState.COMMITED, new
ArrayList<>(), generateTimestamp(txId)));
+
+ inSnapshot.add(txId);
+ }
+
+ Path snapshotDirPath = workDir.resolve("snapshot");
+ Files.createDirectories(snapshotDirPath);
+
+ try {
+ storage.snapshot(snapshotDirPath).join();
+
+ List<UUID> notInSnapshot = new ArrayList<>();
+
+ for (int i = 0; i < 100; i++) {
+ UUID txId = UUID.randomUUID();
+
+ storage.put(txId, new TxMeta(TxState.COMMITED, new
ArrayList<>(), generateTimestamp(txId)));
+
+ notInSnapshot.add(txId);
+ }
+
+ for (int i = 0; i < 100; i++) {
+ UUID txId = notInSnapshot.get(i);
+
+ assertTxMetaEquals(new TxMeta(TxState.COMMITED, new
ArrayList<>(), generateTimestamp(txId)), storage.get(txId));
+ }
+
+ storage.restoreSnapshot(snapshotDirPath);
+
+ for (int i = 0; i < 100; i++) {
+ UUID txId = inSnapshot.get(i);
+
+ assertTxMetaEquals(new TxMeta(TxState.COMMITED, new
ArrayList<>(), generateTimestamp(txId)), storage.get(txId));
+ }
+
+ for (int i = 0; i < 100; i++) {
+ UUID txId = notInSnapshot.get(i);
+
+ assertNull(storage.get(txId));
+ }
+ } finally {
+ Files.walk(snapshotDirPath)
+ .sorted(Comparator.reverseOrder())
+ .map(Path::toFile)
+ .forEach(File::delete);
+ }
+ }
+ }
+
+ private static void assertTxMetaEquals(TxMeta txMeta0, TxMeta txMeta1) {
+ assertEquals(txMeta0.txState(), txMeta1.txState());
+ assertEquals(txMeta0.commitTimestamp(), txMeta1.commitTimestamp());
+ assertEquals(txMeta0.enlistedPartitions(),
txMeta1.enlistedPartitions());
+ }
+
+ private TxStateStorage createStorage() {
+ return new TxStateRocksDbStorage(workDir,
Executors.newSingleThreadExecutor());
+ }
+}