This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c660f2a496 IGNITE-15931 Implement storage for tx states (#1056)
c660f2a496 is described below

commit c660f2a4960505298d676ce68b9b8dbbb834b7d0
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Sep 6 20:18:46 2022 +0300

    IGNITE-15931 Implement storage for tx states (#1056)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |  15 ++
 modules/transactions/pom.xml                       |   5 +
 .../java/org/apache/ignite/internal/tx/TxMeta.java |  64 ++++++
 .../internal/tx/storage/state/TxStateStorage.java  | 125 +++++++++++
 .../state/rocksdb/TxStateRocksDbStorage.java       | 238 +++++++++++++++++++++
 .../tx/storage/state/TxStateStorageTest.java       | 234 ++++++++++++++++++++
 6 files changed, 681 insertions(+)

diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index fcfb2c8e30..08d71125cc 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -169,4 +169,19 @@ public class ErrorGroups {
         /** Index not found. */
         public static final int INDEX_NOT_FOUND_ERR = 
INDEX_ERR_GROUP.registerErrorCode(2);
     }
+
+    /** Transactions error group. */
+    public static class Transactions {
+        /** Transactions error group. */
+        public static final ErrorGroup TX_ERR_GROUP = 
ErrorGroup.newGroup("TX", 3);
+
+        /** Error on creation of tx state storage. */
+        public static int TX_STATE_STORAGE_CREATE_ERR = 
TX_ERR_GROUP.registerErrorCode(1);
+
+        /** Error on destruction of tx state storage. */
+        public static int TX_STATE_STORAGE_DESTROY_ERR = 
TX_ERR_GROUP.registerErrorCode(2);
+
+        /** Error of tx state storage. */
+        public static int TX_STATE_STORAGE_ERR = 
TX_ERR_GROUP.registerErrorCode(3);
+    }
 }
diff --git a/modules/transactions/pom.xml b/modules/transactions/pom.xml
index 26155ec1ec..2ae6d8ffd5 100644
--- a/modules/transactions/pom.xml
+++ b/modules/transactions/pom.xml
@@ -58,6 +58,11 @@
       <artifactId>ignite-raft-client</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-rocksdb-common</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.ignite</groupId>
       <artifactId>ignite-core</artifactId>
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
new file mode 100644
index 0000000000..75e34ada37
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static java.util.Collections.unmodifiableList;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.ignite.hlc.HybridTimestamp;
+
+/** Transaction meta. */
+public class TxMeta implements Serializable {
+    /** Serial version UID. */
+    private static final long serialVersionUID = -172513482743911860L;
+
+    /** Tx state. */
+    private final TxState txState;
+
+    /** The list of enlisted partitions. */
+    private final List<String> enlistedPartitions;
+
+    /** Commit timestamp. */
+    private final HybridTimestamp commitTimestamp;
+
+    /**
+     * The constructor.
+     *
+     * @param txState Tx state.
+     * @param enlistedPartitions The list of enlisted partitions.
+     * @param commitTimestamp Commit timestamp.
+     */
+    public TxMeta(TxState txState, List<String> enlistedPartitions, 
HybridTimestamp commitTimestamp) {
+        this.txState = txState;
+        this.enlistedPartitions = enlistedPartitions;
+        this.commitTimestamp = commitTimestamp;
+    }
+
+    public TxState txState() {
+        return txState;
+    }
+
+    public List<String> enlistedPartitions() {
+        return unmodifiableList(enlistedPartitions);
+    }
+
+    public HybridTimestamp commitTimestamp() {
+        return commitTimestamp;
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
new file mode 100644
index 0000000000..ba3e26e9f6
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import java.nio.file.Path;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage for transaction meta, {@link TxMeta}.
+ */
+public interface TxStateStorage extends AutoCloseable {
+    /**
+     * Start the storage.
+     *
+     * @throws IgniteInternalException with {@link 
Transactions#TX_STATE_STORAGE_CREATE_ERR} error code in case when
+     *                                 creation of the storage has failed.
+     */
+    void start();
+
+    /**
+     * Whether the storage is started.
+     *
+     * @return {@code true} if the storage is started, {@code false} otherwise.
+     */
+    boolean isStarted();
+
+    /**
+     * Stop the storage.
+     */
+    void stop() throws Exception;
+
+    /**
+     * Get tx meta by tx id.
+     *
+     * @param txId Tx id.
+     * @return Tx meta.
+     * @throws IgniteInternalException with {@link 
Transactions#TX_STATE_STORAGE_ERR} error code in case when
+     *                                 the operation has failed.
+     */
+    TxMeta get(UUID txId);
+
+    /**
+     * Put the tx meta into the storage.
+     *
+     * @param txId Tx id.
+     * @param txMeta Tx meta.
+     * @throws IgniteInternalException with {@link 
Transactions#TX_STATE_STORAGE_ERR} error code in case when
+     *                                 the operation has failed.
+     */
+    void put(UUID txId, TxMeta txMeta);
+
+    /**
+     * Atomically change the tx meta in the storage.
+     *
+     * @param txId Tx id.
+     * @param txStateExpected Tx state that is expected to be in the storage.
+     * @param txMeta Tx meta.
+     * @return Whether the CAS operation is successful.
+     * @throws IgniteInternalException with {@link 
Transactions#TX_STATE_STORAGE_ERR} error code in case when
+     *                                 the operation has failed.
+     */
+    boolean compareAndSet(UUID txId, @NotNull TxState txStateExpected, 
@NotNull TxMeta txMeta);
+
+    /**
+     * Remove the tx meta from the storage.
+     *
+     * @param txId Tx id.
+     * @throws IgniteInternalException with {@link 
Transactions#TX_STATE_STORAGE_ERR} error code in case when
+     *                                 the operation has failed.
+     */
+    void remove(UUID txId);
+
+    /**
+     * Create a cursor to scan all data in the storage.
+     *
+     * @return Cursor.
+     */
+    Cursor<IgniteBiTuple<UUID, TxMeta>> scan();
+
+    /**
+     * Removes all data from the storage and frees all resources.
+     *
+     * @throws IgniteInternalException with {@link 
Transactions#TX_STATE_STORAGE_DESTROY_ERR} error code in case when
+     *                                 the operation has failed.
+     */
+    void destroy();
+
+    /**
+     * Create a snapshot of the storage's current state in the specified 
directory.
+     *
+     * @param snapshotPath Snapshot path.
+     * @return Future which completes when the snapshot operation is complete.
+     */
+    CompletableFuture<Void> snapshot(Path snapshotPath);
+
+    /**
+     * Restore a storage from the snapshot.
+     *
+     * @param snapshotPath Snapshot path.
+     */
+    void restoreSnapshot(Path snapshotPath);
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
new file mode 100644
index 0000000000..8139f82090
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state.rocksdb;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_CREATE_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_DESTROY_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_ERR;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
+import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Transaction;
+import org.rocksdb.TransactionDB;
+import org.rocksdb.TransactionDBOptions;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Tx state storage implementation based on RocksDB.
+ */
+public class TxStateRocksDbStorage implements TxStateStorage, AutoCloseable {
+    /** Database path. */
+    private final Path dbPath;
+
+    /** RocksDB database. */
+    private volatile TransactionDB db;
+
+    /** RockDB options. */
+    @Nullable
+    private volatile Options options;
+
+    /** Database options. */
+    private volatile TransactionDBOptions txDbOptions;
+
+    /** Thread-pool for snapshot operations execution. */
+    private final ExecutorService snapshotExecutor;
+
+    /** Snapshot manager. */
+    private volatile RocksSnapshotManager snapshotManager;
+
+    /** Snapshot restore lock. */
+    private final Object snapshotRestoreLock = new Object();
+
+    /** Whether is started. */
+    private boolean isStarted;
+
+    /**
+     * The constructor.
+     *
+     * @param dbPath Database path.
+     * @param snapshotExecutor Snapshot thread pool.
+     */
+    public TxStateRocksDbStorage(Path dbPath, ExecutorService 
snapshotExecutor) {
+        this.dbPath = dbPath;
+        this.snapshotExecutor = snapshotExecutor;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        try {
+            this.options = new Options().setCreateIfMissing(true);
+
+            this.txDbOptions = new TransactionDBOptions();
+
+            this.db = TransactionDB.open(options, txDbOptions, 
dbPath.toString());
+
+            ColumnFamily defaultCf = ColumnFamily.wrap(db, 
db.getDefaultColumnFamily());
+
+            snapshotManager = new RocksSnapshotManager(db, 
List.of(fullRange(defaultCf)), snapshotExecutor);
+
+            isStarted = true;
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_CREATE_ERR, 
"Failed to start transaction state storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStarted() {
+        return isStarted;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws Exception {
+        IgniteUtils.closeAll(options, txDbOptions, db);
+
+        db = null;
+        options = null;
+        txDbOptions = null;
+
+        isStarted = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public TxMeta get(UUID txId) {
+        try {
+            byte[] txMetaBytes = db.get(toBytes(txId));
+
+            return txMetaBytes == null ? null : (TxMeta) 
fromBytes(txMetaBytes);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to 
get a value from the transaction state storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void put(UUID txId, TxMeta txMeta) {
+        try {
+            db.put(toBytes(txId), toBytes(txMeta));
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to 
put a value into the transaction state storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean compareAndSet(UUID txId, @NotNull TxState 
txStateExpected, @NotNull TxMeta txMeta) {
+        requireNonNull(txStateExpected);
+        requireNonNull(txMeta);
+
+        byte[] txIdBytes = toBytes(txId);
+
+        try (Transaction rocksTx = db.beginTransaction(new WriteOptions())) {
+            byte[] txMetaExistingBytes = rocksTx.get(new ReadOptions(), 
toBytes(txId));
+            TxMeta txMetaExisting = (TxMeta) fromBytes(txMetaExistingBytes);
+
+            if (txMetaExisting.txState() == txStateExpected) {
+                rocksTx.put(txIdBytes, toBytes(txMeta));
+
+                rocksTx.commit();
+
+                return true;
+            } else {
+                rocksTx.rollback();
+
+                return false;
+            }
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException(
+                TX_STATE_STORAGE_ERR,
+                "Failed perform CAS operation over a value in transaction 
state storage",
+                e
+            );
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove(UUID txId) {
+        try {
+            db.delete(toBytes(txId));
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_ERR, "Failed to 
remove a value from the transaction state storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
+        RocksIterator rocksIterator = db.newIterator();
+        rocksIterator.seekToFirst();
+
+        RocksIteratorAdapter<IgniteBiTuple<UUID, TxMeta>> iteratorAdapter = 
new RocksIteratorAdapter<>(rocksIterator) {
+            @Override protected IgniteBiTuple<UUID, TxMeta> decodeEntry(byte[] 
keyBytes, byte[] valueBytes) {
+                UUID key = (UUID) fromBytes(keyBytes);
+                TxMeta txMeta = (TxMeta) fromBytes(valueBytes);
+
+                return new IgniteBiTuple<>(key, txMeta);
+            }
+        };
+
+        return Cursor.fromIterator(iteratorAdapter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void destroy() {
+        try (Options options = new Options()) {
+            close();
+
+            RocksDB.destroyDB(dbPath.toString(), options);
+        } catch (Exception e) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_DESTROY_ERR, 
"Failed to destroy the transaction state storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Void> snapshot(Path snapshotPath) {
+        return snapshotManager.createSnapshot(snapshotPath);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void restoreSnapshot(Path snapshotPath) {
+        synchronized (snapshotRestoreLock) {
+            destroy();
+
+            start();
+
+            snapshotManager.restoreSnapshot(snapshotPath);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        stop();
+    }
+}
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
new file mode 100644
index 0000000000..5506960e6f
--- /dev/null
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tx storage test.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class TxStateStorageTest {
+    @WorkDirectory
+    protected Path workDir;
+
+    @Test
+    public void testPutGetRemove() throws Exception {
+        try (TxStateStorage storage = createStorage()) {
+            storage.start();
+
+            List<UUID> txIds = new ArrayList<>();
+
+            for (int i = 0; i < 100; i++) {
+                UUID txId = UUID.randomUUID();
+
+                txIds.add(txId);
+
+                storage.put(txId, new TxMeta(TxState.PENDING, 
generateEnlistedPartitions(i), generateTimestamp(txId)));
+            }
+
+            for (int i = 0; i < 100; i++) {
+                TxMeta txMeta = storage.get(txIds.get(i));
+                TxMeta txMetaExpected = new TxMeta(TxState.PENDING, 
generateEnlistedPartitions(i), generateTimestamp(txIds.get(i)));
+                assertTxMetaEquals(txMetaExpected, txMeta);
+            }
+
+            for (int i = 0; i < 100; i++) {
+                if (i % 2 == 0) {
+                    storage.remove(txIds.get(i));
+                }
+            }
+
+            for (int i = 0; i < 100; i++) {
+                if (i % 2 == 0) {
+                    TxMeta txMeta = storage.get(txIds.get(i));
+                    assertNull(txMeta);
+                } else {
+                    TxMeta txMeta = storage.get(txIds.get(i));
+                    TxMeta txMetaExpected = new TxMeta(TxState.PENDING, 
generateEnlistedPartitions(i), generateTimestamp(txIds.get(i)));
+                    assertTxMetaEquals(txMetaExpected, txMeta);
+                }
+            }
+        }
+    }
+
+    private List<String> generateEnlistedPartitions(int c) {
+        return IntStream.range(0, 
c).mapToObj(String::valueOf).collect(toList());
+    }
+
+    private HybridTimestamp generateTimestamp(UUID uuid) {
+        long physical = Math.abs(uuid.getMostSignificantBits());
+        if (physical == 0) {
+            physical++;
+        }
+
+        int logical = 
Math.abs(Long.valueOf(uuid.getLeastSignificantBits()).intValue());
+
+        return new HybridTimestamp(physical, logical);
+    }
+
+    @Test
+    public void testCas() throws Exception {
+        try (TxStateStorage storage = createStorage()) {
+            storage.start();
+
+            UUID txId = UUID.randomUUID();
+
+            TxMeta txMeta0 = new TxMeta(TxState.PENDING, new ArrayList<>(), 
generateTimestamp(txId));
+            TxMeta txMeta1 = new TxMeta(TxState.COMMITED, new ArrayList<>(), 
generateTimestamp(txId));
+            TxMeta txMeta2 = new TxMeta(TxState.COMMITED, new ArrayList<>(), 
generateTimestamp(UUID.randomUUID()));
+
+            storage.put(txId, txMeta0);
+
+            assertTxMetaEquals(storage.get(txId), txMeta0);
+
+            assertFalse(storage.compareAndSet(txId, txMeta1.txState(), 
txMeta2));
+            assertTrue(storage.compareAndSet(txId, txMeta0.txState(), 
txMeta2));
+
+            assertTxMetaEquals(storage.get(txId), txMeta2);
+        }
+    }
+
+    @Test
+    public void testScan() throws Exception {
+        try (TxStateStorage storage = createStorage()) {
+            storage.start();
+
+            Map<UUID, TxMeta> txs = new HashMap<>();
+
+            for (int i = 0; i < 100; i++) {
+                UUID txId = UUID.randomUUID();
+                TxMeta txMeta = new TxMeta(TxState.PENDING, 
generateEnlistedPartitions(i), generateTimestamp(txId));
+                txs.put(txId, txMeta);
+                storage.put(txId, txMeta);
+            }
+
+            try (Cursor<IgniteBiTuple<UUID, TxMeta>> scanCursor = 
storage.scan()) {
+                assertTrue(scanCursor.hasNext());
+
+                while (scanCursor.hasNext()) {
+                    IgniteBiTuple<UUID, TxMeta> txData = scanCursor.next();
+                    TxMeta txMeta = txs.remove(txData.getKey());
+
+                    assertNotNull(txMeta);
+                    assertNotNull(txData);
+                    assertNotNull(txData.getValue());
+                    assertTxMetaEquals(txMeta, txData.getValue());
+                }
+
+                assertTrue(txs.isEmpty());
+            }
+        }
+    }
+
+    @Test
+    public void testSnapshot() throws Exception {
+        try (TxStateStorage storage = createStorage()) {
+            storage.start();
+
+            List<UUID> inSnapshot = new ArrayList<>();
+
+            for (int i = 0; i < 100; i++) {
+                UUID txId = UUID.randomUUID();
+
+                storage.put(txId, new TxMeta(TxState.COMMITED, new 
ArrayList<>(), generateTimestamp(txId)));
+
+                inSnapshot.add(txId);
+            }
+
+            Path snapshotDirPath = workDir.resolve("snapshot");
+            Files.createDirectories(snapshotDirPath);
+
+            try {
+                storage.snapshot(snapshotDirPath).join();
+
+                List<UUID> notInSnapshot = new ArrayList<>();
+
+                for (int i = 0; i < 100; i++) {
+                    UUID txId = UUID.randomUUID();
+
+                    storage.put(txId, new TxMeta(TxState.COMMITED, new 
ArrayList<>(), generateTimestamp(txId)));
+
+                    notInSnapshot.add(txId);
+                }
+
+                for (int i = 0; i < 100; i++) {
+                    UUID txId = notInSnapshot.get(i);
+
+                    assertTxMetaEquals(new TxMeta(TxState.COMMITED, new 
ArrayList<>(), generateTimestamp(txId)), storage.get(txId));
+                }
+
+                storage.restoreSnapshot(snapshotDirPath);
+
+                for (int i = 0; i < 100; i++) {
+                    UUID txId = inSnapshot.get(i);
+
+                    assertTxMetaEquals(new TxMeta(TxState.COMMITED, new 
ArrayList<>(), generateTimestamp(txId)), storage.get(txId));
+                }
+
+                for (int i = 0; i < 100; i++) {
+                    UUID txId = notInSnapshot.get(i);
+
+                    assertNull(storage.get(txId));
+                }
+            } finally {
+                Files.walk(snapshotDirPath)
+                        .sorted(Comparator.reverseOrder())
+                        .map(Path::toFile)
+                        .forEach(File::delete);
+            }
+        }
+    }
+
+    private static void assertTxMetaEquals(TxMeta txMeta0, TxMeta txMeta1) {
+        assertEquals(txMeta0.txState(), txMeta1.txState());
+        assertEquals(txMeta0.commitTimestamp(), txMeta1.commitTimestamp());
+        assertEquals(txMeta0.enlistedPartitions(), 
txMeta1.enlistedPartitions());
+    }
+
+    private TxStateStorage createStorage() {
+        return new TxStateRocksDbStorage(workDir, 
Executors.newSingleThreadExecutor());
+    }
+}

Reply via email to