This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d1c4bbe4ea IGNITE-23301 Improved concurrency for coarse locks.
d1c4bbe4ea is described below

commit d1c4bbe4ea69b0ba78bc6e37e92811ab7159cfd3
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Thu Oct 24 12:51:12 2024 +0300

    IGNITE-23301 Improved concurrency for coarse locks.
---
 .../internal/util/IgniteStripedReadWriteLock.java  | 222 ++++++
 .../util/IgniteStripedReadWriteLockSelfTest.java   | 326 +++++++++
 .../benchmark/AbstractMultiNodeBenchmark.java      |  11 +-
 .../internal/benchmark/UpsertKvBenchmark.java      |  59 +-
 .../apache/ignite/distributed/ItLockTableTest.java |   4 +-
 .../internal/table/ItTransactionRecoveryTest.java  |  35 +
 .../table/distributed/raft/PartitionListener.java  |   9 +-
 .../replicator/PartitionReplicaListener.java       | 170 +++--
 .../org/apache/ignite/internal/tx/LockKey.java     |   2 +-
 .../org/apache/ignite/internal/tx/TxPriority.java  |  13 +
 .../ignite/internal/tx/impl/HeapLockManager.java   | 572 ++++++++++++---
 .../internal/tx/impl/HeapUnboundedLockManager.java | 775 ---------------------
 .../internal/tx/AbstractLockManagerTest.java       |   4 +-
 .../internal/tx/CoarseGrainedLockManagerTest.java  | 362 ++++++++++
 .../internal/tx/HeapUnboundedLockManagerTest.java  |  36 -
 .../tx/NoWaitDeadlockPreventionUnboundedTest.java  |  30 -
 .../tx/NoneDeadlockPreventionUnboundedTest.java    |  30 -
 .../ReversedDeadlockPreventionUnboundedTest.java   |  30 -
 .../tx/TimeoutDeadlockPreventionUnboundedTest.java |  30 -
 19 files changed, 1567 insertions(+), 1153 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
new file mode 100644
index 0000000000..796b557e1d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * ReadWriteLock with striping mechanics. Compared to {@link 
ReentrantReadWriteLock} it has slightly improved performance of
+ * {@link ReadWriteLock#readLock()} operations at the cost of {@link 
ReadWriteLock#writeLock()} operations and memory consumption. It also
+ * supports reentrancy semantics like {@link ReentrantReadWriteLock}.
+ */
+public class IgniteStripedReadWriteLock implements ReadWriteLock {
+    /** Index generator. */
+    private static final AtomicInteger IDX_GEN = new AtomicInteger();
+
+    /** Index. */
+    private static final ThreadLocal<Integer> IDX = ThreadLocal.withInitial(() 
-> IDX_GEN.incrementAndGet());
+
+    /** Locks. */
+    private final ReentrantReadWriteLock[] locks;
+
+    /** Composite write lock. */
+    private final WriteLock writeLock;
+
+    /**
+     * Creates a new instance with given concurrency level.
+     *
+     * @param concurrencyLvl Number of internal read locks.
+     */
+    public IgniteStripedReadWriteLock(int concurrencyLvl) {
+        locks = new ReentrantReadWriteLock[concurrencyLvl];
+
+        for (int i = 0; i < concurrencyLvl; i++) {
+            locks[i] = new ReentrantReadWriteLock();
+        }
+
+        writeLock = new WriteLock();
+    }
+
+    /**
+     * Gets current index.
+     *
+     * @return Index of current thread stripe.
+     */
+    private int curIdx() {
+        int idx = IDX.get();
+
+        return idx % locks.length;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Lock readLock() {
+        return locks[curIdx()].readLock();
+    }
+
+    /**
+     * Get a lock by stripe.
+     *
+     * @param idx Stripe index.
+     * @return The lock.
+     */
+    public Lock readLock(int idx) {
+        return locks[idx].readLock();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Lock writeLock() {
+        return writeLock;
+    }
+
+    /**
+     * Queries if the write lock is held by the current thread.
+     *
+     * @return {@code true} if the current thread holds the write lock and 
{@code false} otherwise
+     */
+    public boolean isWriteLockedByCurrentThread() {
+        return locks[locks.length - 1].isWriteLockedByCurrentThread();
+    }
+
+    /**
+     * Queries the number of reentrant read holds on this lock by the current 
thread.  A reader thread has a hold on a lock for each lock
+     * action that is not matched by an unlock action.
+     *
+     * @return the number of holds on the read lock by the current thread, or 
zero if the read lock is not held by the current thread
+     */
+    public int getReadHoldCount() {
+        return locks[curIdx()].getReadHoldCount();
+    }
+
+    /**
+     * Write lock.
+     */
+    private class WriteLock implements Lock {
+        /** {@inheritDoc} */
+        @Override
+        public void lock() {
+            try {
+                lock0(false);
+            } catch (InterruptedException ignore) {
+                assert false : "Should never happen";
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void lockInterruptibly() throws InterruptedException {
+            lock0(true);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void unlock() {
+            unlock0(locks.length - 1);
+        }
+
+        /**
+         * Internal lock routine.
+         *
+         * @param canInterrupt Whether to acquire the lock interruptibly.
+         * @throws InterruptedException If interrupted.
+         */
+        private void lock0(boolean canInterrupt) throws InterruptedException {
+            int i = 0;
+
+            try {
+                for (; i < locks.length; i++) {
+                    if (canInterrupt) {
+                        locks[i].writeLock().lockInterruptibly();
+                    } else {
+                        locks[i].writeLock().lock();
+                    }
+                }
+            } catch (InterruptedException e) {
+                unlock0(i - 1);
+
+                throw e;
+            }
+        }
+
+        /**
+         * Internal unlock routine.
+         *
+         * @param fromIdx Start index.
+         */
+        private void unlock0(int fromIdx) {
+            for (int i = fromIdx; i >= 0; i--) {
+                locks[i].writeLock().unlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean tryLock() {
+            int i = 0;
+
+            try {
+                for (; i < locks.length; i++) {
+                    if (!locks[i].writeLock().tryLock()) {
+                        break;
+                    }
+                }
+            } finally {
+                if (0 < i && i < locks.length) {
+                    unlock0(i - 1);
+                }
+            }
+
+            return i == locks.length;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean tryLock(long time, TimeUnit unit) throws 
InterruptedException {
+            int i = 0;
+
+            long end = unit.toNanos(time) + System.nanoTime();
+
+            try {
+                for (; i < locks.length && System.nanoTime() < end; i++) {
+                    if (!locks[i].writeLock().tryLock(time, unit)) {
+                        break;
+                    }
+                }
+            } finally {
+                if (0 < i && i < locks.length) {
+                    unlock0(i - 1);
+                }
+            }
+
+            return i == locks.length;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public Condition newCondition() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
+
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLockSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLockSelfTest.java
new file mode 100644
index 0000000000..30287e5e82
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLockSelfTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Test for {@link IgniteStripedReadWriteLockSelfTest}.
+ */
+@Timeout(value = 60, unit = TimeUnit.SECONDS)
+public class IgniteStripedReadWriteLockSelfTest {
+    private static final int CONCURRENCY = 16;
+
+    /** The lock under test. */
+    private final IgniteStripedReadWriteLock lock = new 
IgniteStripedReadWriteLock(CONCURRENCY);
+
+    /** Executor service used to run tasks in threads different from the main 
test thread. */
+    private final ExecutorService executor = Executors.newCachedThreadPool();
+
+    /**
+     * Cleans up after a test.
+     */
+    @AfterEach
+    void cleanup() {
+        releaseReadLockHeldByCurrentThread();
+        releaseWriteLockHeldByCurrentThread();
+
+        IgniteUtils.shutdownAndAwaitTermination(executor, 3, TimeUnit.SECONDS);
+    }
+
+    private void releaseReadLockHeldByCurrentThread() {
+        while (true) {
+            try {
+                lock.readLock().unlock();
+            } catch (IllegalMonitorStateException e) {
+                // released our read lock completely
+                break;
+            }
+        }
+    }
+
+    private void releaseWriteLockHeldByCurrentThread() {
+        while (lock.isWriteLockedByCurrentThread()) {
+            lock.writeLock().unlock();
+        }
+    }
+
+    @Test
+    void readLockDoesNotAllowWriteLockToBeAcquired() {
+        lock.readLock().lock();
+
+        assertThatWriteLockAcquireAttemptBlocksForever();
+
+        lock.readLock().unlock();
+    }
+
+    private void assertThatWriteLockAcquireAttemptBlocksForever() {
+        Future<?> future = executor.submit(() -> lock.writeLock().lock());
+
+        assertThrows(TimeoutException.class, () -> future.get(100, 
TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    void readLockDoesNotAllowWriteLockToBeAcquiredWithTimeout() throws 
Exception {
+        lock.readLock().lock();
+
+        Boolean acquired = callWithTimeout(() -> lock.writeLock().tryLock(1, 
TimeUnit.MILLISECONDS));
+        assertThat(acquired, is(false));
+
+        lock.readLock().unlock();
+    }
+
+    @Test
+    void readLockAllowsReadLockToBeAcquired() throws Exception {
+        lock.readLock().lock();
+
+        assertThatReadLockCanBeAcquired();
+    }
+
+    private void assertThatReadLockCanBeAcquired() throws 
InterruptedException, ExecutionException, TimeoutException {
+        runWithTimeout(() -> lock.readLock().lock());
+    }
+
+    private <T> T callWithTimeout(Callable<T> call) throws ExecutionException, 
InterruptedException, TimeoutException {
+        Future<T> future = executor.submit(call);
+        return getWithTimeout(future);
+    }
+
+    private void runWithTimeout(Runnable runnable) throws ExecutionException, 
InterruptedException, TimeoutException {
+        Future<?> future = executor.submit(runnable);
+        getWithTimeout(future);
+    }
+
+    private static <T> T getWithTimeout(Future<? extends T> future) throws 
ExecutionException,
+            InterruptedException, TimeoutException {
+        return future.get(10, TimeUnit.SECONDS);
+    }
+
+    @Test
+    void writeLockDoesNotAllowReadLockToBeAcquired() {
+        lock.writeLock().lock();
+
+        assertThatReadLockAcquireAttemptBlocksForever();
+
+        lock.writeLock().unlock();
+    }
+
+    private void assertThatReadLockAcquireAttemptBlocksForever() {
+        Future<?> readLockAttemptFuture = executor.submit(() -> 
lock.readLock().lock());
+
+        assertThrows(TimeoutException.class, () -> 
readLockAttemptFuture.get(100, TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    void writeLockDoesNotAllowWriteLockToBeAcquired() {
+        lock.writeLock().lock();
+
+        assertThatWriteLockAcquireAttemptBlocksForever();
+
+        lock.writeLock().unlock();
+    }
+
+    @Test
+    void readUnlockReleasesTheLock() throws Exception {
+        lock.readLock().lock();
+        lock.readLock().unlock();
+
+        runWithTimeout(lock::writeLock);
+    }
+
+    @Test
+    void writeUnlockReleasesTheLock() throws Exception {
+        lock.writeLock().lock();
+        lock.writeLock().unlock();
+
+        assertThatReadLockCanBeAcquired();
+    }
+
+    @Test
+    void testWriteLockReentry() {
+        lock.writeLock();
+
+        lock.writeLock();
+
+        assertTrue(lock.writeLock().tryLock());
+    }
+
+    @Test
+    void testWriteLockReentryWithTryWriteLock() {
+        lock.writeLock().tryLock();
+
+        assertTrue(lock.writeLock().tryLock());
+    }
+
+    @Test
+    void testWriteLockReentryWithTimeout() throws Exception {
+        lock.writeLock().tryLock(1, TimeUnit.MILLISECONDS);
+        lock.writeLock().tryLock(1, TimeUnit.MILLISECONDS);
+
+        assertTrue(lock.writeLock().tryLock());
+    }
+
+    @Test
+    void testReadLockReentry() {
+        lock.readLock();
+
+        lock.readLock();
+
+        assertTrue(lock.readLock().tryLock());
+    }
+
+    @Test
+    void shouldAllowAcquireAndReleaseReadLockWhileHoldingWriteLock() {
+        lock.writeLock().lock();
+
+        lock.readLock().lock();
+        lock.readLock().unlock();
+
+        lock.writeLock().unlock();
+    }
+
+    @Test
+    void shouldAllowInterleavingHoldingReadAndWriteLocks() {
+        lock.writeLock().lock();
+
+        lock.readLock().lock();
+
+        lock.writeLock().unlock();
+
+        assertFalse(lock.writeLock().tryLock());
+
+        lock.readLock().unlock();
+
+        // Test that we can operate with write locks now.
+        lock.writeLock().lock();
+        lock.writeLock().unlock();
+    }
+
+    @Test
+    void readLockReleasedLessTimesThanAcquiredShouldStillBeTaken() {
+        lock.readLock().lock();
+        lock.readLock().lock();
+        lock.readLock().unlock();
+
+        assertThatWriteLockAcquireAttemptBlocksForever();
+
+        lock.readLock().lock();
+    }
+
+    @Test
+    void writeLockReleasedLessTimesThanAcquiredShouldStillBeTaken() {
+        lock.writeLock().lock();
+        lock.writeLock().lock();
+        lock.writeLock().unlock();
+
+        assertThatReadLockAcquireAttemptBlocksForever();
+
+        lock.writeLock().unlock();
+    }
+
+    @Test
+    void shouldThrowOnReadUnlockingWhenNotHoldingReadLock() {
+        assertThrows(IllegalMonitorStateException.class, () -> 
lock.readLock().unlock());
+    }
+
+    @Test
+    void shouldThrowOnWriteUnlockingWhenNotHoldingWriteLock() {
+        assertThrows(IllegalMonitorStateException.class,  () -> 
lock.writeLock().unlock());
+    }
+
+    @Test
+    void readLockAcquiredWithTryReadLockDoesNotAllowWriteLockToBeAcquired() {
+        lock.readLock().tryLock();
+
+        assertThatWriteLockAcquireAttemptBlocksForever();
+
+        lock.readLock().unlock();
+    }
+
+    @Test
+    void tryReadLockShouldReturnTrueWhenReadLockWasAcquiredSuccessfully() {
+        assertTrue(lock.readLock().tryLock());
+    }
+
+    @Test
+    void tryReadLockShouldReturnFalseWhenReadLockCouldNotBeAcquired() throws 
Exception {
+        lock.writeLock().lock();
+
+        Boolean acquired = callWithTimeout(() -> lock.readLock().tryLock());
+
+        assertThat(acquired, is(false));
+    }
+
+    @Test
+    void writeLockAcquiredWithTryWriteLockDoesNotAllowWriteLockToBeAcquired() {
+        lock.writeLock().tryLock();
+
+        assertThatReadLockAcquireAttemptBlocksForever();
+
+        lock.writeLock().unlock();
+    }
+
+    @Test
+    void tryWriteLockShouldReturnTrueWhenWriteLockWasAcquiredSuccessfully() {
+        assertTrue(lock.writeLock().tryLock());
+    }
+
+    @Test
+    void tryWriteLockShouldReturnFalseWhenWriteLockCouldNotBeAcquired() throws 
Exception {
+        lock.writeLock().lock();
+
+        Boolean acquired = callWithTimeout(() -> lock.writeLock().tryLock());
+
+        assertThat(acquired, is(false));
+    }
+
+    @Test
+    void writeLockedByCurrentThreadShouldReturnTrueWhenLockedByCurrentThread() 
{
+        lock.writeLock().lock();
+
+        assertTrue(lock.isWriteLockedByCurrentThread());
+    }
+
+    @Test
+    void writeLockedByCurrentThreadShouldReturnFalseWhenNotLocked() {
+        assertFalse(lock.isWriteLockedByCurrentThread());
+    }
+
+    @Test
+    void 
writeLockedByCurrentThreadShouldReturnFalseWhenLockedByAnotherThread() throws 
Exception {
+        lock.writeLock();
+
+        Boolean lockedByCaller = 
callWithTimeout(lock::isWriteLockedByCurrentThread);
+        assertThat(lockedByCaller, is(false));
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index 23678e9954..e6cc47d0ac 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -197,7 +197,8 @@ public class AbstractMultiNodeBenchmark {
                 + "  },\n"
                 + "  clientConnector: { port:{} },\n"
                 + "  rest.port: {},\n"
-                + "  raft.fsync = " + fsync
+                + "  raft.fsync = " + fsync() + ",\n"
+                + "  system.partitionsLogPath = \"" + logPath() + "\""
                 + "}";
 
         for (int i = 0; i < nodes(); i++) {
@@ -237,6 +238,14 @@ public class AbstractMultiNodeBenchmark {
         return Files.createTempDirectory("tmpDirPrefix").toFile().toPath();
     }
 
+    protected String logPath() {
+        return "";
+    }
+
+    protected boolean fsync() {
+        return fsync;
+    }
+
     protected int nodes() {
         return 3;
     }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
index 8c7a5efb38..507f4d8387 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.benchmark;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.lang.IgniteSystemProperties;
 import org.apache.ignite.table.KeyValueView;
@@ -27,10 +31,10 @@ import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Threads;
 import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.runner.Runner;
@@ -46,15 +50,22 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
 @Threads(1)
 @Warmup(iterations = 10, time = 2)
 @Measurement(iterations = 20, time = 2)
-@BenchmarkMode(Mode.AverageTime)
-@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
 public class UpsertKvBenchmark extends AbstractMultiNodeBenchmark {
     private final Tuple tuple = Tuple.create();
 
-    private int id = 0;
-
     private static KeyValueView<Tuple, Tuple> kvView;
 
+    @Param({"1"})
+    private int batch;
+
+    @Param({"false"})
+    private boolean fsync;
+
+    @Param({"8"})
+    private int partitionCount;
+
     @Override
     public void nodeSetUp() throws Exception {
         
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK, 
"true");
@@ -67,23 +78,33 @@ public class UpsertKvBenchmark extends 
AbstractMultiNodeBenchmark {
      */
     @Setup
     public void setUp() {
-        kvView = publicIgnite.tables().table(TABLE_NAME).keyValueView();
+        kvView = igniteImpl.tables().table(TABLE_NAME).keyValueView();
         for (int i = 1; i < 11; i++) {
             tuple.set("field" + i, FIELD_VAL);
         }
     }
 
-    @TearDown
-    public void tearDown() {
-        System.out.println("Inserted " + id + " tuples");
-    }
-
     /**
      * Benchmark for KV upsert via embedded client.
      */
     @Benchmark
     public void upsert() {
-        kvView.put(null, Tuple.create().set("ycsb_key", id++), tuple);
+        List<CompletableFuture<Void>> futs = new ArrayList<>();
+
+        for (int i = 0; i < batch - 1; i++) {
+            CompletableFuture<Void> fut = kvView.putAsync(null, 
Tuple.create().set("ycsb_key", nextId()), tuple);
+            futs.add(fut);
+        }
+
+        for (CompletableFuture<Void> fut : futs) {
+            fut.join();
+        }
+
+        kvView.put(null, Tuple.create().set("ycsb_key", nextId()), tuple);
+    }
+
+    private int nextId() {
+        return ThreadLocalRandom.current().nextInt();
     }
 
     /**
@@ -92,11 +113,18 @@ public class UpsertKvBenchmark extends 
AbstractMultiNodeBenchmark {
     public static void main(String[] args) throws RunnerException {
         Options opt = new OptionsBuilder()
                 .include(".*" + UpsertKvBenchmark.class.getSimpleName() + ".*")
+                // .jvmArgsAppend("-Djmh.executor=VIRTUAL")
+                // .addProfiler(JavaFlightRecorderProfiler.class, 
"configName=profile.jfc")
                 .build();
 
         new Runner(opt).run();
     }
 
+    @Override
+    protected boolean fsync() {
+        return fsync;
+    }
+
     @Override
     protected int nodes() {
         return 1;
@@ -104,6 +132,11 @@ public class UpsertKvBenchmark extends 
AbstractMultiNodeBenchmark {
 
     @Override
     protected int partitionCount() {
-        return 8;
+        return partitionCount;
+    }
+
+    @Override
+    protected int replicaCount() {
+        return 1;
     }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
index c2aa7702f8..ee48aa527b 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager.LockState;
-import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
@@ -149,8 +148,7 @@ public class ItLockTableTest extends IgniteAbstractTest {
                         new HeapLockManager(
                                 DeadlockPreventionPolicy.NO_OP,
                                 HeapLockManager.SLOTS,
-                                CACHE_SIZE,
-                                new HeapUnboundedLockManager()),
+                                CACHE_SIZE),
                         clockService,
                         generator,
                         placementDriver,
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index a99541dca8..a4b0d121cf 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -237,6 +237,41 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
         assertTrue(waitForCondition(() -> txStoredState(commitPartNode, 
orphanTxId) == TxState.ABORTED, 10_000));
     }
 
+    @Test
+    public void testAbandonedTxWithCoarseLockIsAborted() throws Exception {
+        TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
+
+        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
+
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
+
+        IgniteImpl commitPartNode = findNodeByName(leaseholder);
+
+        log.info("Transaction commit partition is determined [node={}].", 
commitPartNode.name());
+
+        IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
+
+        log.info("Transaction coordinator is chosen [node={}].", 
txCrdNode.name());
+
+        RecordView<Tuple> view = 
txCrdNode.tables().table(TABLE_NAME).recordView();
+        view.upsert(null, Tuple.create().set("key", 42).set("val", "val1"));
+
+        // Start scan transaction.
+        InternalTransaction rwTx = (InternalTransaction) 
txCrdNode.transactions().begin();
+        scanSingleEntryAndLeaveCursorOpen(commitPartNode, 
unwrapTableImpl(txCrdNode.tables().table(TABLE_NAME)), rwTx);
+
+        txCrdNode.stop();
+
+        assertTrue(waitForCondition(
+                () -> node(0).clusterNodes().stream().filter(n -> 
txCrdNode.id().equals(n.id())).count() == 0,
+                10_000));
+
+        InternalTransaction conflictTx = (InternalTransaction) 
node(0).transactions().begin();
+        runConflictingTransaction(node(0), conflictTx);
+
+        assertTrue(waitForCondition(() -> txStoredState(commitPartNode, 
rwTx.id()) == TxState.ABORTED, 10_000));
+    }
+
     @Test
     public void testWriteIntentRecoverNoCoordinator() throws Exception {
         TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 91cfe46c9e..3788f5475d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -271,10 +271,7 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
                 storage.releasePartitionSnapshotsReadLock();
             }
 
-            // Completing the closure out of the partition snapshots lock to 
reduce possibility of deadlocks as it might
-            // trigger other actions taking same locks.
-            clo.result(result);
-
+            // Adjust safe time before completing update to reduce waiting.
             if (command instanceof SafeTimePropagatingCommand) {
                 SafeTimePropagatingCommand safeTimePropagatingCommand = 
(SafeTimePropagatingCommand) command;
 
@@ -283,6 +280,10 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
                 updateTrackerIgnoringTrackerClosedException(safeTime, 
safeTimePropagatingCommand.safeTime());
             }
 
+            // Completing the closure out of the partition snapshots lock to 
reduce possibility of deadlocks as it might
+            // trigger other actions taking same locks.
+            clo.result(result);
+
             updateTrackerIgnoringTrackerClosedException(storageIndexTracker, 
commandIndex);
         });
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 56c78a78b8..83eaed3668 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -347,6 +347,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     private final IndexMetaStorage indexMetaStorage;
 
+    private static final boolean SKIP_UPDATES =
+            
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK);
+
     /**
      * The constructor.
      *
@@ -1207,7 +1210,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());
 
-        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.S)
+        return lockManager.acquire(txId, new LockKey(replicationGroupId), 
LockMode.S)
                 .thenCompose(tblLock -> 
retrieveExactEntriesUntilCursorEmpty(txId, request.coordinatorId(), cursorId, 
batchCount));
     }
 
@@ -1266,25 +1269,16 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         BinaryTuple exactKey = request.exactKey().asBinaryTuple();
 
-        return lockManager.acquire(txId, new LockKey(indexId), 
LockMode.IS).thenCompose(idxLock -> { // Index IS lock
-            return lockManager.acquire(txId, new LockKey(tableId()), 
LockMode.IS) // Table IS lock
-                    .thenCompose(tblLock -> {
-                        return lockManager.acquire(txId, new LockKey(indexId, 
exactKey.byteBuffer()), LockMode.S)
-                                .thenCompose(indRowLock -> { // Hash index 
bucket S lock
-
-                                    Cursor<RowId> cursor = 
remotelyTriggeredResourceRegistry.<CursorResource>register(
-                                            cursorId,
-                                            txCoordinatorId,
-                                            () -> new 
CursorResource(indexStorage.get(exactKey))
-                                    ).cursor();
+        return lockManager.acquire(txId, new LockKey(indexId, 
exactKey.byteBuffer()), LockMode.S)
+                .thenCompose(indRowLock -> { // Hash index bucket S lock
+                    Cursor<RowId> cursor = 
remotelyTriggeredResourceRegistry.<CursorResource>register(cursorId, 
txCoordinatorId,
+                            () -> new 
CursorResource(indexStorage.get(exactKey))).cursor();
 
-                                    var result = new 
ArrayList<BinaryRow>(batchCount);
+                    var result = new ArrayList<BinaryRow>(batchCount);
 
-                                    return continueIndexLookup(txId, cursor, 
batchCount, result)
-                                            .thenApply(ignore -> 
closeCursorIfBatchNotFull(result, batchCount, cursorId));
-                                });
-                    });
-        });
+                    return continueIndexLookup(txId, cursor, batchCount, 
result).thenApply(
+                            ignore -> closeCursorIfBatchNotFull(result, 
batchCount, cursorId));
+                });
     }
 
     /**
@@ -1314,61 +1308,56 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         int flags = request.flags();
 
-        return lockManager.acquire(txId, new LockKey(indexId), 
LockMode.IS).thenCompose(idxLock -> { // Index IS lock
-            return lockManager.acquire(txId, new LockKey(tableId()), 
LockMode.IS) // Table IS lock
-                    .thenCompose(tblLock -> {
-                        var comparator = new 
BinaryTupleComparator(indexStorage.indexDescriptor().columns());
+        var comparator = new 
BinaryTupleComparator(indexStorage.indexDescriptor().columns());
 
-                        Predicate<IndexRow> isUpperBoundAchieved = indexRow -> 
{
-                            if (indexRow == null) {
-                                return true;
-                            }
+        Predicate<IndexRow> isUpperBoundAchieved = indexRow -> {
+            if (indexRow == null) {
+                return true;
+            }
 
-                            if (upperBound == null) {
-                                return false;
-                            }
+            if (upperBound == null) {
+                return false;
+            }
 
-                            ByteBuffer buffer = upperBound.byteBuffer();
+            ByteBuffer buffer = upperBound.byteBuffer();
 
-                            if ((flags & SortedIndexStorage.LESS_OR_EQUAL) != 
0) {
-                                byte boundFlags = buffer.get(0);
+            if ((flags & SortedIndexStorage.LESS_OR_EQUAL) != 0) {
+                byte boundFlags = buffer.get(0);
 
-                                buffer.put(0, (byte) (boundFlags | 
BinaryTupleCommon.EQUALITY_FLAG));
-                            }
+                buffer.put(0, (byte) (boundFlags | 
BinaryTupleCommon.EQUALITY_FLAG));
+            }
 
-                            return 
comparator.compare(indexRow.indexColumns().byteBuffer(), buffer) >= 0;
-                        };
+            return comparator.compare(indexRow.indexColumns().byteBuffer(), 
buffer) >= 0;
+        };
 
-                        Cursor<IndexRow> cursor = 
remotelyTriggeredResourceRegistry.<CursorResource>register(
-                                cursorId,
-                                request.coordinatorId(),
-                                () -> new CursorResource(indexStorage.scan(
-                                        lowerBound,
-                                        // We have to handle upperBound on a 
level of replication listener,
-                                        // for correctness of taking of a 
range lock.
-                                        null,
-                                        flags
-                                ))
-                        ).cursor();
+        Cursor<IndexRow> cursor = 
remotelyTriggeredResourceRegistry.<CursorResource>register(
+                cursorId,
+                request.coordinatorId(),
+                () -> new CursorResource(indexStorage.scan(
+                        lowerBound,
+                        // We have to handle upperBound on a level of 
replication listener,
+                        // for correctness of taking of a range lock.
+                        null,
+                        flags
+                ))
+        ).cursor();
 
-                        SortedIndexLocker indexLocker = (SortedIndexLocker) 
indexesLockers.get().get(indexId);
+        SortedIndexLocker indexLocker = (SortedIndexLocker) 
indexesLockers.get().get(indexId);
 
-                        int batchCount = request.batchSize();
+        int batchCount = request.batchSize();
 
-                        var result = new ArrayList<BinaryRow>(batchCount);
+        var result = new ArrayList<BinaryRow>(batchCount);
 
-                        return continueIndexScan(
-                                txId,
-                                schemaAwareIndexStorage,
-                                indexLocker,
-                                cursor,
-                                batchCount,
-                                result,
-                                isUpperBoundAchieved,
-                                tableVersionByTs(beginTimestamp(txId))
-                        ).thenApply(ignore -> 
closeCursorIfBatchNotFull(result, batchCount, cursorId));
-                    });
-        });
+        return continueIndexScan(
+                txId,
+                schemaAwareIndexStorage,
+                indexLocker,
+                cursor,
+                batchCount,
+                result,
+                isUpperBoundAchieved,
+                tableVersionByTs(beginTimestamp(txId))
+        ).thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount, 
cursorId));
     }
 
     /**
@@ -1479,7 +1468,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                     RowId rowId = currentRow.rowId();
 
-                    return lockManager.acquire(txId, new LockKey(tableId(), 
rowId), LockMode.S)
+                    return lockManager.acquire(txId, new 
LockKey(replicationGroupId, rowId), LockMode.S)
                             .thenComposeAsync(rowLock -> { // Table row S lock
                                 return resolvePlainReadResult(rowId, 
txId).thenCompose(resolvedReadResult -> {
                                     BinaryRow binaryRow = 
upgrade(binaryRow(resolvedReadResult), tableVersion);
@@ -1530,7 +1519,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         RowId rowId = indexCursor.next();
 
-        return lockManager.acquire(txId, new LockKey(tableId(), rowId), 
LockMode.S)
+        return lockManager.acquire(txId, new LockKey(replicationGroupId, 
rowId), LockMode.S)
                 .thenComposeAsync(rowLock -> { // Table row S lock
                     return resolvePlainReadResult(rowId, 
txId).thenCompose(resolvedReadResult -> {
                         if (resolvedReadResult != null && 
resolvedReadResult.binaryRow() != null) {
@@ -2738,18 +2727,20 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             );
 
             if (!cmd.full()) {
-                // We don't need to take the partition snapshots read lock, 
see #INTERNAL_DOC_PLACEHOLDER why.
-                storageUpdateHandler.handleUpdate(
-                        cmd.txId(),
-                        cmd.rowUuid(),
-                        cmd.tablePartitionId().asTablePartitionId(),
-                        cmd.rowToUpdate(),
-                        true,
-                        null,
-                        null,
-                        null,
-                        indexIdsAtRwTxBeginTs(txId)
-                );
+                if (!SKIP_UPDATES) {
+                    // We don't need to take the partition snapshots read 
lock, see #INTERNAL_DOC_PLACEHOLDER why.
+                    storageUpdateHandler.handleUpdate(
+                            cmd.txId(),
+                            cmd.rowUuid(),
+                            cmd.tablePartitionId().asTablePartitionId(),
+                            cmd.rowToUpdate(),
+                            true,
+                            null,
+                            null,
+                            null,
+                            indexIdsAtRwTxBeginTs(txId)
+                    );
+                }
 
                 CompletableFuture<UUID> fut = 
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
                         .thenApply(res -> cmd.txId());
@@ -2770,7 +2761,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     if (updateCommandResult != null && 
updateCommandResult.isPrimaryInPeersAndLearners()) {
                         return safeTime.waitFor(((UpdateCommand) 
res.getCommand()).safeTime()).thenApply(ignored -> null);
                     } else {
-                        if 
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
 {
+                        if (!SKIP_UPDATES) {
                             // We don't need to take the partition snapshots 
read lock, see #INTERNAL_DOC_PLACEHOLDER why.
                             storageUpdateHandler.handleUpdate(
                                     cmd.txId(),
@@ -3333,8 +3324,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Future completes with tuple {@link RowId} and collection of 
{@link Lock}.
      */
     private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> 
takeLocksForUpdate(BinaryRow binaryRow, RowId rowId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
-                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.X))
+        return lockManager.acquire(txId, new LockKey(replicationGroupId), 
LockMode.IX)
+                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(replicationGroupId, rowId), LockMode.X))
                 .thenCompose(ignored -> takePutLockOnIndexes(binaryRow, rowId, 
txId))
                 .thenApply(shortTermLocks -> new IgniteBiTuple<>(rowId, 
shortTermLocks));
     }
@@ -3347,7 +3338,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Future completes with tuple {@link RowId} and collection of 
{@link Lock}.
      */
     private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> 
takeLocksForInsert(BinaryRow binaryRow, RowId rowId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX) 
// IX lock on table
+        return lockManager.acquire(txId, new LockKey(replicationGroupId), 
LockMode.IX)
                 .thenCompose(ignored -> takePutLockOnIndexes(binaryRow, rowId, 
txId))
                 .thenApply(shortTermLocks -> new IgniteBiTuple<>(rowId, 
shortTermLocks));
     }
@@ -3405,11 +3396,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Future completes with {@link RowId} or {@code null} if there is 
no value for remove.
      */
     private CompletableFuture<RowId> takeLocksForDeleteExact(BinaryRow 
expectedRow, RowId rowId, BinaryRow actualRow, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX) 
// IX lock on table
-                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.S)) // S lock on RowId
+        return lockManager.acquire(txId, new LockKey(replicationGroupId), 
LockMode.IX)
+                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(replicationGroupId, rowId), LockMode.S)) // S lock on RowId
                 .thenCompose(ignored -> {
                     if (equalValues(actualRow, expectedRow)) {
-                        return lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.X) // X lock on RowId
+                        return lockManager.acquire(txId, new 
LockKey(replicationGroupId, rowId), LockMode.X) // X lock on RowId
                                 .thenCompose(ignored0 -> 
takeRemoveLockOnIndexes(actualRow, rowId, txId))
                                 .thenApply(exclusiveRowLock -> rowId);
                     }
@@ -3425,8 +3416,8 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Future completes with {@link RowId} or {@code null} if there is 
no value for the key.
      */
     private CompletableFuture<RowId> takeLocksForDelete(BinaryRow binaryRow, 
RowId rowId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX) 
// IX lock on table
-                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.X)) // X lock on RowId
+        return lockManager.acquire(txId, new LockKey(replicationGroupId), 
LockMode.IX)
+                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(replicationGroupId, rowId), LockMode.X)) // X lock on RowId
                 .thenCompose(ignored -> takeRemoveLockOnIndexes(binaryRow, 
rowId, txId))
                 .thenApply(ignored -> rowId);
     }
@@ -3438,8 +3429,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * @return Future completes with {@link RowId} or {@code null} if there is 
no value for the key.
      */
     private CompletableFuture<RowId> takeLocksForGet(RowId rowId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IS) 
// IS lock on table
-                .thenCompose(tblLock -> lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.S)) // S lock on RowId
+        return lockManager.acquire(txId, new LockKey(replicationGroupId, 
rowId), LockMode.S) // S lock on RowId
                 .thenApply(ignored -> rowId);
     }
 
@@ -3510,11 +3500,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      */
     private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> 
takeLocksForReplace(BinaryRow expectedRow, @Nullable BinaryRow oldRow,
             BinaryRow newRow, RowId rowId, UUID txId) {
-        return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
-                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.S))
+        return lockManager.acquire(txId, new LockKey(replicationGroupId), 
LockMode.IX)
+                .thenCompose(ignored -> lockManager.acquire(txId, new 
LockKey(replicationGroupId, rowId), LockMode.S))
                 .thenCompose(ignored -> {
                     if (oldRow != null && equalValues(oldRow, expectedRow)) {
-                        return lockManager.acquire(txId, new 
LockKey(tableId(), rowId), LockMode.X) // X lock on RowId
+                        return lockManager.acquire(txId, new 
LockKey(replicationGroupId, rowId), LockMode.X) // X lock on RowId
                                 .thenCompose(ignored1 -> 
takePutLockOnIndexes(newRow, rowId, txId))
                                 .thenApply(shortTermLocks -> new 
IgniteBiTuple<>(rowId, shortTermLocks));
                     }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java
index 9569c04459..bae2005914 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java
@@ -88,6 +88,6 @@ public class LockKey {
 
     @Override
     public String toString() {
-        return S.toString(this);
+        return S.toString(LockKey.class, this, "ctx", contextId, "key", key);
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxPriority.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxPriority.java
index 6f3459b4d4..8e9a08e092 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxPriority.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxPriority.java
@@ -26,4 +26,17 @@ import 
org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy;
 public enum TxPriority {
     LOW,
     NORMAL;
+
+    /** Enum values. */
+    private static final TxPriority[] VALS = values();
+
+    /**
+     * Efficiently gets enumerated value from its ordinal.
+     *
+     * @param ord Ordinal value.
+     * @return Enumerated value.
+     */
+    public static TxPriority fromOrdinal(int ord) {
+        return VALS[ord];
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
index 4d8f3d529e..60ddf4dad1 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.internal.tx.impl;
 
 import static java.util.Collections.emptyList;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.tx.event.LockEvent.LOCK_CONFLICT;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_TIMEOUT_ERR;
@@ -27,11 +30,12 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumMap;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -41,7 +45,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.event.AbstractEventProducer;
-import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.tostring.IgniteToStringExclude;
 import org.apache.ignite.internal.tostring.S;
@@ -54,7 +57,7 @@ import org.apache.ignite.internal.tx.LockMode;
 import org.apache.ignite.internal.tx.Waiter;
 import org.apache.ignite.internal.tx.event.LockEvent;
 import org.apache.ignite.internal.tx.event.LockEventParameters;
-import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.internal.util.IgniteStripedReadWriteLock;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -76,6 +79,11 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
      */
     public static final int SLOTS = 131072;
 
+    /**
+     * Striped lock concurrency.
+     */
+    private static final int CONCURRENCY = Math.max(1, 
Runtime.getRuntime().availableProcessors() / 2);
+
     /**
      * Empty slots.
      */
@@ -104,28 +112,25 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
     /**
      * Enlisted transactions.
      */
-    private final ConcurrentHashMap<UUID, ConcurrentLinkedQueue<LockState>> 
txMap = new ConcurrentHashMap<>(1024);
+    private final ConcurrentHashMap<UUID, ConcurrentLinkedQueue<Releasable>> 
txMap = new ConcurrentHashMap<>(1024);
 
     /**
-     * Parent lock manager.
-     * TODO asch Needs optimization 
https://issues.apache.org/jira/browse/IGNITE-20895
+     * Coarse locks.
      */
-    private final LockManager parentLockManager;
-
-    private final EventListener<LockEventParameters> 
parentLockConflictListener = this::parentLockConflictListener;
+    private final ConcurrentHashMap<Object, CoarseLockState> coarseMap = new 
ConcurrentHashMap<>();
 
     /**
      * Constructor.
      */
     public HeapLockManager() {
-        this(new WaitDieDeadlockPreventionPolicy(), SLOTS, SLOTS, new 
HeapUnboundedLockManager());
+        this(new WaitDieDeadlockPreventionPolicy(), SLOTS, SLOTS);
     }
 
     /**
      * Constructor.
      */
     public HeapLockManager(DeadlockPreventionPolicy deadlockPreventionPolicy) {
-        this(deadlockPreventionPolicy, SLOTS, SLOTS, new 
HeapUnboundedLockManager());
+        this(deadlockPreventionPolicy, SLOTS, SLOTS);
     }
 
     /**
@@ -135,12 +140,11 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
      * @param maxSize Raw slots size.
      * @param mapSize Lock map size.
      */
-    public HeapLockManager(DeadlockPreventionPolicy deadlockPreventionPolicy, 
int maxSize, int mapSize, LockManager parentLockManager) {
+    public HeapLockManager(DeadlockPreventionPolicy deadlockPreventionPolicy, 
int maxSize, int mapSize) {
         if (mapSize > maxSize) {
             throw new IllegalArgumentException("maxSize=" + maxSize + " < 
mapSize=" + mapSize);
         }
 
-        this.parentLockManager = Objects.requireNonNull(parentLockManager);
         this.deadlockPreventionPolicy = deadlockPreventionPolicy;
         this.delayedExecutor = deadlockPreventionPolicy.waitTimeout() > 0
                 ? 
CompletableFuture.delayedExecutor(deadlockPreventionPolicy.waitTimeout(), 
TimeUnit.MILLISECONDS)
@@ -158,14 +162,14 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
         }
 
         slots = tmp; // Atomic init.
-
-        parentLockManager.listen(LockEvent.LOCK_CONFLICT, 
parentLockConflictListener);
     }
 
     @Override
     public CompletableFuture<Lock> acquire(UUID txId, LockKey lockKey, 
LockMode lockMode) {
-        if (lockKey.contextId() == null) { // Treat this lock as a hierarchy 
lock.
-            return parentLockManager.acquire(txId, lockKey, lockMode);
+        if (lockKey.contextId() == null) { // Treat this lock as a 
hierarchy(coarse) lock.
+            CoarseLockState state = coarseMap.computeIfAbsent(lockKey, key -> 
new CoarseLockState(lockKey));
+
+            return state.acquire(txId, lockKey, lockMode);
         }
 
         while (true) {
@@ -186,6 +190,14 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
     @Override
     @TestOnly
     public void release(Lock lock) {
+        if (lock.lockKey().contextId() == null) {
+            CoarseLockState lockState2 = coarseMap.get(lock.lockKey());
+            if (lockState2 != null) {
+                lockState2.release(lock);
+            }
+            return;
+        }
+
         LockState state = lockState(lock.lockKey());
 
         if (state.tryRelease(lock.txId())) {
@@ -195,11 +207,8 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
 
     @Override
     public void release(UUID txId, LockKey lockKey, LockMode lockMode) {
-        // TODO: Delegation to parentLockManager might change after 
https://issues.apache.org/jira/browse/IGNITE-20895 
-        if (lockKey.contextId() == null) { // Treat this lock as a hierarchy 
lock.
-            parentLockManager.release(txId, lockKey, lockMode);
-
-            return;
+        if (lockKey.contextId() == null) {
+            throw new IllegalArgumentException("Coarse locks don't support 
downgrading");
         }
 
         LockState state = lockState(lockKey);
@@ -211,42 +220,48 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
 
     @Override
     public void releaseAll(UUID txId) {
-        ConcurrentLinkedQueue<LockState> states = this.txMap.remove(txId);
+        ConcurrentLinkedQueue<Releasable> states = this.txMap.remove(txId);
 
         if (states != null) {
-            for (LockState state : states) {
+            // Default size corresponds to average number of entities used by 
transaction. Estimate it to 5.
+            List<Releasable> delayed = new ArrayList<>(4);
+            for (Releasable state : states) {
+                if (state.coarse()) {
+                    delayed.add(state); // Delay release.
+                    continue;
+                }
+
                 if (state.tryRelease(txId)) {
-                    LockKey key = state.key; // State may be already 
invalidated.
+                    LockKey key = state.key(); // State may be already 
invalidated.
                     if (key != null) {
-                        locks.compute(key, (k, v) -> adjustLockState(state, 
v));
+                        locks.compute(key, (k, v) -> 
adjustLockState((LockState) state, v));
                     }
                 }
             }
-        }
 
-        parentLockManager.releaseAll(txId);
+            // Unlock coarse locks after all.
+            for (Releasable state : delayed) {
+                state.tryRelease(txId);
+            }
+        }
     }
 
     @Override
     public Iterator<Lock> locks(UUID txId) {
-        ConcurrentLinkedQueue<LockState> lockStates = txMap.get(txId);
-
-        // TODO: Delegation to parentLockManager might change after 
https://issues.apache.org/jira/browse/IGNITE-20895
-        if (lockStates == null) {
-            return parentLockManager.locks(txId);
-        }
+        ConcurrentLinkedQueue<Releasable> lockStates = txMap.get(txId);
 
         List<Lock> result = new ArrayList<>();
 
-        for (LockState lockState : lockStates) {
-            Waiter waiter = lockState.waiter(txId);
-
-            if (waiter != null) {
-                result.add(new Lock(lockState.key, waiter.lockMode(), txId));
+        if (lockStates != null) {
+            for (Releasable lockState : lockStates) {
+                Lock lock = lockState.lock(txId);
+                if (lock != null) {
+                    result.add(lock);
+                }
             }
         }
 
-        return CollectionUtils.concat(result.iterator(), 
parentLockManager.locks(txId));
+        return result.iterator();
     }
 
     /**
@@ -302,11 +317,17 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
             }
         }
 
-        return parentLockManager.isEmpty();
-    }
+        for (CoarseLockState value : coarseMap.values()) {
+            if (!value.slockOwners.isEmpty()) {
+                return false;
+            }
 
-    private CompletableFuture<Boolean> 
parentLockConflictListener(LockEventParameters params) {
-        return fireEvent(LockEvent.LOCK_CONFLICT, params).thenApply(v -> 
false);
+            if (!value.ixlockOwners.isEmpty()) {
+                return false;
+            }
+        }
+
+        return true;
     }
 
     @Nullable
@@ -328,10 +349,375 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
         }
     }
 
+    private void track(UUID txId, Releasable val) {
+        txMap.compute(txId, (k, v) -> {
+            if (v == null) {
+                v = new ConcurrentLinkedQueue<>();
+            }
+
+            v.add(val);
+
+            return v;
+        });
+    }
+
+    /**
+     * Create lock exception with given parameters.
+     *
+     * @param locker Locker.
+     * @param holder Lock holder.
+     * @return Lock exception.
+     */
+    private static LockException lockException(UUID locker, UUID holder) {
+        return new LockException(ACQUIRE_LOCK_ERR,
+                "Failed to acquire a lock due to a possible deadlock [locker=" 
+ locker + ", holder=" + holder + ']');
+    }
+
+    /**
+     * Create lock exception when lock holder is believed to be missing.
+     *
+     * @param locker Locker.
+     * @param holder Lock holder.
+     * @return Lock exception.
+     */
+    private static LockException abandonedLockException(UUID locker, UUID 
holder) {
+        return new LockException(ACQUIRE_LOCK_ERR,
+                "Failed to acquire an abandoned lock due to a possible 
deadlock [locker=" + locker + ", holder=" + holder + ']');
+    }
+
+    /**
+     * Create coarse lock exception.
+     *
+     * @param locker Locker.
+     * @param holder Lock holder.
+     * @param abandoned If locker is abandoned.
+     * @return Lock exception.
+     */
+    private static LockException coarseLockException(UUID locker, UUID holder, 
boolean abandoned) {
+        return new LockException(ACQUIRE_LOCK_ERR,
+                "Failed to acquire the intention table lock due to a conflict 
[locker=" + locker + ", holder=" + holder + ", abandoned="
+                        + abandoned + ']');
+    }
+
+    /**
+     * Common interface for releasing transaction locks.
+     */
+    interface Releasable {
+        /**
+         * Tries to release a lock.
+         *
+         * @param txId Tx id.
+         * @return {@code True} if lock state requires cleanup after release.
+         */
+        boolean tryRelease(UUID txId);
+
+        /**
+         * Gets associated lock key.
+         *
+         * @return Lock key.
+         */
+        LockKey key();
+
+        /**
+         * Returns the lock which is requested by given tx.
+         *
+         * @param txId Tx id.
+         * @return The lock or null if no lock exist.
+         */
+        @Nullable Lock lock(UUID txId);
+
+        /**
+         * Returns lock type.
+         *
+         * @return The type.
+         */
+        boolean coarse();
+    }
+
+    /**
+     * Coarse lock.
+     */
+    public class CoarseLockState implements Releasable {
+        private final IgniteStripedReadWriteLock stripedLock = new 
IgniteStripedReadWriteLock(CONCURRENCY);
+        private final ConcurrentHashMap<UUID, Lock> ixlockOwners = new 
ConcurrentHashMap<>();
+        private final Map<UUID, IgniteBiTuple<Lock, CompletableFuture<Lock>>> 
slockWaiters = new HashMap<>();
+        private final ConcurrentHashMap<UUID, Lock> slockOwners = new 
ConcurrentHashMap<>();
+        private final LockKey lockKey;
+        private final Comparator<UUID> txComparator;
+
+        CoarseLockState(LockKey lockKey) {
+            this.lockKey = lockKey;
+            txComparator =
+                    deadlockPreventionPolicy.txIdComparator() != null ? 
deadlockPreventionPolicy.txIdComparator() : UUID::compareTo;
+        }
+
+        @Override
+        public boolean tryRelease(UUID txId) {
+            Lock lock = lock(txId);
+
+            release(lock);
+
+            return false;
+        }
+
+        @Override
+        public LockKey key() {
+            return lockKey;
+        }
+
+        @Override
+        public Lock lock(UUID txId) {
+            Lock lock = ixlockOwners.get(txId);
+
+            if (lock != null) {
+                return lock;
+            }
+
+            int idx = Math.floorMod(spread(txId.hashCode()), CONCURRENCY);
+
+            stripedLock.readLock(idx).lock();
+
+            try {
+                lock = slockOwners.get(txId);
+
+                if (lock != null) {
+                    return lock;
+                }
+
+                IgniteBiTuple<Lock, CompletableFuture<Lock>> tuple = 
slockWaiters.get(txId);
+
+                if (tuple != null) {
+                    return tuple.get1();
+                }
+            } finally {
+                stripedLock.readLock(idx).unlock();
+            }
+
+            return null;
+        }
+
+        @Override
+        public boolean coarse() {
+            return true;
+        }
+
+        /**
+         * Acquires a lock.
+         *
+         * @param txId Tx id.
+         * @param lockKey Lock key.
+         * @param lockMode Lock mode.
+         * @return The future.
+         */
+        public CompletableFuture<Lock> acquire(UUID txId, LockKey lockKey, 
LockMode lockMode) {
+            switch (lockMode) {
+                case S:
+                    stripedLock.writeLock().lock();
+
+                    try {
+                        // IX-locks can't be modified under the striped write 
lock.
+                        if (!ixlockOwners.isEmpty()) {
+                            if (ixlockOwners.containsKey(txId)) {
+                                if (ixlockOwners.size() == 1) {
+                                    // Safe to upgrade.
+                                    track(txId, this); // Double track.
+                                    Lock lock = new Lock(lockKey, lockMode, 
txId);
+                                    slockOwners.putIfAbsent(txId, lock);
+                                    return completedFuture(lock);
+                                } else {
+                                    // Attempt to upgrade to SIX in the 
presence of concurrent transactions. Deny lock attempt.
+                                    for (Lock lock : ixlockOwners.values()) {
+                                        if (!lock.txId().equals(txId)) {
+                                            return notifyAndFail(txId, 
lock.txId());
+                                        }
+                                    }
+                                }
+
+                                assert false : "Should not reach here";
+                            }
+
+                            // Validate reordering with IX locks if prevention 
is enabled.
+                            if (deadlockPreventionPolicy.usePriority()) {
+                                for (Lock lock : ixlockOwners.values()) {
+                                    // Allow only high priority transactions 
to wait.
+                                    if (txComparator.compare(lock.txId(), 
txId) < 0) {
+                                        return notifyAndFail(txId, 
lock.txId());
+                                    }
+                                }
+                            }
+
+                            track(txId, this);
+
+                            CompletableFuture<Lock> fut = new 
CompletableFuture<>();
+                            IgniteBiTuple<Lock, CompletableFuture<Lock>> prev 
= slockWaiters.putIfAbsent(txId,
+                                    new IgniteBiTuple<>(new Lock(lockKey, 
lockMode, txId), fut));
+                            return prev == null ? fut : prev.get2();
+                        } else {
+                            Lock lock = new Lock(lockKey, lockMode, txId);
+                            Lock prev = slockOwners.putIfAbsent(txId, lock);
+
+                            if (prev == null) {
+                                track(txId, this); // Do not track on reenter.
+                            }
+
+                            return completedFuture(lock);
+                        }
+                    } finally {
+                        stripedLock.writeLock().unlock();
+                    }
+
+                case IX:
+                    int idx = Math.floorMod(spread(txId.hashCode()), 
CONCURRENCY);
+
+                    stripedLock.readLock(idx).lock();
+
+                    try {
+                        // S-locks can't be modified under the striped read 
lock.
+                        if (!slockOwners.isEmpty()) {
+                            if (slockOwners.containsKey(txId)) {
+                                if (slockOwners.size() == 1) {
+                                    // Safe to upgrade.
+                                    track(txId, this); // Double track.
+                                    Lock lock = new Lock(lockKey, lockMode, 
txId);
+                                    ixlockOwners.putIfAbsent(txId, lock);
+                                    return completedFuture(lock);
+                                } else {
+                                    // Attempt to upgrade to SIX in the 
presence of concurrent transactions. Deny lock attempt.
+                                    for (Lock lock : slockOwners.values()) {
+                                        if (!lock.txId().equals(txId)) {
+                                            return notifyAndFail(txId, 
lock.txId());
+                                        }
+                                    }
+                                }
+
+                                assert false : "Should not reach here";
+                            }
+
+                            // IX locks never allowed to wait.
+                            UUID holderTx = 
slockOwners.keySet().iterator().next();
+                            return notifyAndFail(txId, holderTx);
+                        } else {
+                            Lock lock = new Lock(lockKey, lockMode, txId);
+                            Lock prev = ixlockOwners.putIfAbsent(txId, lock); 
// Avoid overwrite existing lock.
+
+                            if (prev == null) {
+                                track(txId, this); // Do not track on reenter.
+                            }
+
+                            return completedFuture(lock);
+                        }
+                    } finally {
+                        stripedLock.readLock(idx).unlock();
+                    }
+
+                default:
+                    assert false : "Unsupported coarse lock mode: " + lockMode;
+
+                    return null; // Should not be here.
+            }
+        }
+
+        /**
+         * Triggers event and fails.
+         *
+         * @param txId Tx id.
+         * @param holderId Holder tx id.
+         * @return Failed future.
+         */
+        CompletableFuture<Lock> notifyAndFail(UUID txId, UUID holderId) {
+            CompletableFuture<Void> res = fireEvent(LOCK_CONFLICT, new 
LockEventParameters(txId, holderId));
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-21153
+            return failedFuture(coarseLockException(txId, holderId, 
res.isCompletedExceptionally()));
+        }
+
+        /**
+         * Releases the lock. Should be called from {@link #releaseAll(UUID)}.
+         *
+         * @param lock The lock.
+         */
+        public void release(@Nullable Lock lock) {
+            if (lock == null) {
+                return;
+            }
+
+            switch (lock.lockMode()) {
+                case S:
+                    IgniteBiTuple<Lock, CompletableFuture<Lock>> waiter = null;
+
+                    stripedLock.writeLock().lock();
+
+                    try {
+                        Lock removed = slockOwners.remove(lock.txId());
+
+                        if (removed == null) {
+                            waiter = slockWaiters.remove(lock.txId());
+
+                            if (waiter != null) {
+                                removed = waiter.get1();
+                            }
+                        }
+
+                        assert removed != null : "Attempt to release not 
requested lock: " + lock.txId();
+                    } finally {
+                        stripedLock.writeLock().unlock();
+                    }
+
+                    if (waiter != null) {
+                        waiter.get2().complete(waiter.get1());
+                    }
+
+                    break;
+                case IX:
+                    int idx = Math.floorMod(spread(lock.txId().hashCode()), 
CONCURRENCY);
+
+                    Map<UUID, IgniteBiTuple<Lock, CompletableFuture<Lock>>> 
wakeups;
+
+                    stripedLock.readLock(idx).lock();
+
+                    try {
+                        var removed = ixlockOwners.remove(lock.txId());
+
+                        assert removed != null : "Attempt to release not 
acquired lock: " + lock.txId();
+
+                        if (slockWaiters.isEmpty()) {
+                            return; // Nothing to do.
+                        }
+
+                        if (!ixlockOwners.isEmpty()) {
+                            assert slockOwners.isEmpty() || 
slockOwners.containsKey(lock.txId());
+
+                            return; // Nothing to do.
+                        }
+
+                        // No race here because no new locks can be acquired 
after releaseAll due to 2-phase locking protocol.
+
+                        // Promote waiters to owners.
+                        wakeups = new HashMap<>(slockWaiters);
+
+                        slockWaiters.clear();
+
+                        for (IgniteBiTuple<Lock, CompletableFuture<Lock>> 
value : wakeups.values()) {
+                            slockOwners.put(value.getKey().txId(), 
value.getKey());
+                        }
+                    } finally {
+                        stripedLock.readLock(idx).unlock();
+                    }
+
+                    for (Entry<UUID, IgniteBiTuple<Lock, 
CompletableFuture<Lock>>> entry : wakeups.entrySet()) {
+                        
entry.getValue().get2().complete(entry.getValue().get1());
+                    }
+
+                    break;
+                default:
+                    assert false : "Unsupported coarse unlock mode: " + 
lock.lockMode();
+            }
+        }
+    }
+
     /**
-     * A lock state.
+     * Key lock.
      */
-    public class LockState {
+    public class LockState implements Releasable {
         /** Waiters. */
         private final TreeMap<UUID, WaiterImpl> waiters;
 
@@ -348,6 +734,27 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
             this.waiters = new TreeMap<>(txComparator);
         }
 
+        @Override
+        public LockKey key() {
+            return key;
+        }
+
+        @Override
+        public Lock lock(UUID txId) {
+            Waiter waiter = waiters.get(txId);
+
+            if (waiter != null) {
+                return new Lock(key, waiter.lockMode(), txId);
+            }
+
+            return null;
+        }
+
+        @Override
+        public boolean coarse() {
+            return false;
+        }
+
         /**
          * Attempts to acquire a lock for the specified {@code key} in 
specified lock mode.
          *
@@ -393,7 +800,7 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
 
                     // Put to wait queue, track.
                     if (prev == null) {
-                        track(waiter.txId);
+                        track(waiter.txId, this);
                     }
 
                     return new IgniteBiTuple<>(waiter.fut, waiter.lockMode());
@@ -406,7 +813,7 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
                 } else {
                     // Lock granted, track.
                     if (prev == null) {
-                        track(waiter.txId);
+                        track(waiter.txId, this);
                     }
                 }
             }
@@ -437,15 +844,15 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
         private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean 
skipFail) {
             for (Map.Entry<UUID, WaiterImpl> entry : 
waiters.tailMap(waiter.txId(), false).entrySet()) {
                 WaiterImpl tmp = entry.getValue();
-                LockMode mode = lockedMode(tmp);
+                LockMode mode = tmp.lockMode;
 
                 if (mode != null && 
!mode.isCompatible(waiter.intendedLockMode())) {
                     if (conflictFound(waiter.txId(), tmp.txId())) {
-                        waiter.fail(abandonedLockException(waiter, tmp));
+                        waiter.fail(abandonedLockException(waiter.txId, 
tmp.txId));
 
                         return true;
                     } else if (!deadlockPreventionPolicy.usePriority() && 
deadlockPreventionPolicy.waitTimeout() == 0) {
-                        waiter.fail(lockException(waiter, tmp));
+                        waiter.fail(lockException(waiter.txId, tmp.txId));
 
                         return true;
                     }
@@ -456,17 +863,17 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
 
             for (Map.Entry<UUID, WaiterImpl> entry : 
waiters.headMap(waiter.txId()).entrySet()) {
                 WaiterImpl tmp = entry.getValue();
-                LockMode mode = lockedMode(tmp);
+                LockMode mode = tmp.lockMode;
 
                 if (mode != null && 
!mode.isCompatible(waiter.intendedLockMode())) {
                     if (skipFail) {
                         return false;
                     } else if (conflictFound(waiter.txId(), tmp.txId())) {
-                        waiter.fail(abandonedLockException(waiter, tmp));
+                        waiter.fail(abandonedLockException(waiter.txId, 
tmp.txId));
 
                         return true;
                     } else if (deadlockPreventionPolicy.waitTimeout() == 0) {
-                        waiter.fail(lockException(waiter, tmp));
+                        waiter.fail(lockException(waiter.txId, tmp.txId));
 
                         return true;
                     } else {
@@ -480,37 +887,14 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
             return true;
         }
 
-        /**
-         * Create lock exception with given parameters.
-         *
-         * @param locker Locker.
-         * @param holder Lock holder.
-         * @return Lock exception.
-         */
-        private LockException lockException(WaiterImpl locker, WaiterImpl 
holder) {
-            return new LockException(ACQUIRE_LOCK_ERR,
-                    "Failed to acquire a lock due to a possible deadlock 
[locker=" + locker + ", holder=" + holder + ']');
-        }
-
-        /**
-         * Create lock exception when lock holder is believed to be missing.
-         *
-         * @param locker Locker.
-         * @param holder Lock holder.
-         * @return Lock exception.
-         */
-        private LockException abandonedLockException(WaiterImpl locker, 
WaiterImpl holder) {
-            return new LockException(ACQUIRE_LOCK_ERR,
-                    "Failed to acquire an abandoned lock due to a possible 
deadlock [locker=" + locker + ", holder=" + holder + ']');
-        }
-
         /**
          * Attempts to release a lock for the specified {@code key} in 
exclusive mode.
          *
          * @param txId Transaction id.
          * @return {@code True} if the queue is empty.
          */
-        boolean tryRelease(UUID txId) {
+        @Override
+        public boolean tryRelease(UUID txId) {
             Collection<WaiterImpl> toNotify;
 
             synchronized (waiters) {
@@ -639,22 +1023,6 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
             });
         }
 
-        /**
-         * Gets a lock mode for this waiter.
-         *
-         * @param waiter Waiter.
-         * @return Lock mode, which is held by the waiter or {@code null}, if 
the waiter holds nothing.
-         */
-        private LockMode lockedMode(WaiterImpl waiter) {
-            LockMode mode = null;
-
-            if (waiter.locked()) {
-                mode = waiter.lockMode();
-            }
-
-            return mode;
-        }
-
         /**
          * Returns a collection of timestamps that is associated with the 
specified {@code key}.
          *
@@ -678,18 +1046,6 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
             }
         }
 
-        private void track(UUID txId) {
-            txMap.compute(txId, (k, v) -> {
-                if (v == null) {
-                    v = new ConcurrentLinkedQueue<>();
-                }
-
-                v.add(this);
-
-                return v;
-            });
-        }
-
         /**
          * Notifies about the lock conflict found between transactions.
          *
@@ -697,7 +1053,7 @@ public class HeapLockManager extends 
AbstractEventProducer<LockEvent, LockEventP
          * @param holderTx Transaction which holds the lock.
          */
         private boolean conflictFound(UUID acquirerTx, UUID holderTx) {
-            CompletableFuture<Void> eventResult = 
fireEvent(LockEvent.LOCK_CONFLICT, new LockEventParameters(acquirerTx, 
holderTx));
+            CompletableFuture<Void> eventResult = fireEvent(LOCK_CONFLICT, new 
LockEventParameters(acquirerTx, holderTx));
             // No async handling is expected.
             // TODO: https://issues.apache.org/jira/browse/IGNITE-21153
             assert eventResult.isDone() : "Async lock conflict handling is not 
supported";
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapUnboundedLockManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapUnboundedLockManager.java
deleted file mode 100644
index 0c05ecb27f..0000000000
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapUnboundedLockManager.java
+++ /dev/null
@@ -1,775 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.tx.impl;
-
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
-import static 
org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_TIMEOUT_ERR;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.internal.event.AbstractEventProducer;
-import org.apache.ignite.internal.lang.IgniteBiTuple;
-import org.apache.ignite.internal.tostring.IgniteToStringExclude;
-import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.internal.tx.DeadlockPreventionPolicy;
-import org.apache.ignite.internal.tx.Lock;
-import org.apache.ignite.internal.tx.LockException;
-import org.apache.ignite.internal.tx.LockKey;
-import org.apache.ignite.internal.tx.LockManager;
-import org.apache.ignite.internal.tx.LockMode;
-import org.apache.ignite.internal.tx.Waiter;
-import org.apache.ignite.internal.tx.event.LockEvent;
-import org.apache.ignite.internal.tx.event.LockEventParameters;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * A {@link LockManager} which uses unbounded hashtable implementation. 
Suitable for holding coarse-grained locks.
- */
-public class HeapUnboundedLockManager extends AbstractEventProducer<LockEvent, 
LockEventParameters> implements LockManager {
-    /** Locks. */
-    private final ConcurrentHashMap<LockKey, LockState> locks = new 
ConcurrentHashMap<>();
-
-    /** Prevention policy. */
-    private final DeadlockPreventionPolicy deadlockPreventionPolicy;
-
-    /** Executor that is used to fail waiters after timeout. */
-    private final Executor delayedExecutor;
-
-    /**
-     * Constructor.
-     */
-    public HeapUnboundedLockManager() {
-        this(new WaitDieDeadlockPreventionPolicy());
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param deadlockPreventionPolicy Deadlock prevention policy.
-     */
-    public HeapUnboundedLockManager(DeadlockPreventionPolicy 
deadlockPreventionPolicy) {
-        this.deadlockPreventionPolicy = deadlockPreventionPolicy;
-        this.delayedExecutor = deadlockPreventionPolicy.waitTimeout() > 0
-                ? 
CompletableFuture.delayedExecutor(deadlockPreventionPolicy.waitTimeout(), 
TimeUnit.MILLISECONDS)
-                : null;
-    }
-
-    @Override
-    public CompletableFuture<Lock> acquire(UUID txId, LockKey lockKey, 
LockMode lockMode) {
-        while (true) {
-            LockState state = lockState(lockKey);
-
-            IgniteBiTuple<CompletableFuture<Void>, LockMode> futureTuple = 
state.tryAcquire(txId, lockMode);
-
-            if (futureTuple.get1() == null) {
-                continue; // Obsolete state.
-            }
-
-            LockMode newLockMode = futureTuple.get2();
-
-            return futureTuple.get1().thenApply(res -> new Lock(lockKey, 
newLockMode, txId));
-        }
-    }
-
-    @Override
-    public void release(Lock lock) {
-        LockState state = lockState(lock.lockKey());
-
-        if (state.tryRelease(lock.txId())) {
-            locks.remove(lock.lockKey(), state);
-        }
-    }
-
-    @Override
-    public void release(UUID txId, LockKey lockKey, LockMode lockMode) {
-        LockState state = lockState(lockKey);
-
-        if (state.tryRelease(txId, lockMode)) {
-            locks.remove(lockKey, state);
-        }
-    }
-
-    @Override
-    public void releaseAll(UUID txId) {
-        Iterator<Lock> locks = locks(txId);
-
-        while (locks.hasNext()) {
-            Lock lock = locks.next();
-            release(lock);
-        }
-    }
-
-    @Override
-    public Iterator<Lock> locks(UUID txId) {
-        // Decently ok to do full scan here because coarse-grained locks are a 
few in quantity.
-        List<Lock> result = new ArrayList<>();
-
-        for (Map.Entry<LockKey, LockState> entry : locks.entrySet()) {
-            Waiter waiter = entry.getValue().waiter(txId);
-
-            if (waiter != null) {
-                result.add(
-                        new Lock(
-                                entry.getKey(),
-                                waiter.lockMode(),
-                                txId
-                        )
-                );
-            }
-        }
-
-        return result.iterator();
-    }
-
-    /**
-     * Returns the lock state for the key.
-     *
-     * @param key The key.
-     */
-    private LockState lockState(LockKey key) {
-        return locks.computeIfAbsent(key, k -> new 
LockState(deadlockPreventionPolicy, delayedExecutor));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Collection<UUID> queue(LockKey key) {
-        return lockState(key).queue();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Waiter waiter(LockKey key, UUID txId) {
-        return lockState(key).waiter(txId);
-    }
-
-    /**
-     * A lock state.
-     */
-    private class LockState {
-        /** Waiters. */
-        private final TreeMap<UUID, WaiterImpl> waiters;
-
-        private final DeadlockPreventionPolicy deadlockPreventionPolicy;
-
-        /** Delayed executor for waiters timeout callback. */
-        private final Executor delayedExecutor;
-
-        /** Marked for removal flag. */
-        private volatile boolean markedForRemove = false;
-
-        public LockState(DeadlockPreventionPolicy deadlockPreventionPolicy, 
Executor delayedExecutor) {
-            Comparator<UUID> txComparator =
-                    deadlockPreventionPolicy.txIdComparator() != null ? 
deadlockPreventionPolicy.txIdComparator() : UUID::compareTo;
-
-            this.waiters = new TreeMap<>(txComparator);
-            this.deadlockPreventionPolicy = deadlockPreventionPolicy;
-            this.delayedExecutor = delayedExecutor;
-        }
-
-        /**
-         * Attempts to acquire a lock for the specified {@code key} in 
specified lock mode.
-         *
-         * @param txId Transaction id.
-         * @param lockMode Lock mode.
-         * @return The future or null if state is marked for removal and 
acquired lock mode.
-         */
-        public @Nullable IgniteBiTuple<CompletableFuture<Void>, LockMode> 
tryAcquire(UUID txId, LockMode lockMode) {
-            WaiterImpl waiter = new WaiterImpl(txId, lockMode);
-
-            synchronized (waiters) {
-                if (markedForRemove) {
-                    return new IgniteBiTuple(null, lockMode);
-                }
-
-                // We always replace the previous waiter with the new one. If 
the previous waiter has lock intention then incomplete
-                // lock future is copied to the new waiter. This guarantees 
that, if the previous waiter was locked concurrently, then
-                // it doesn't have any lock intentions, and the future is not 
copied to the new waiter. Otherwise, if there is lock
-                // intention, this means that the lock future contained in 
previous waiter, is not going to be completed and can be
-                // copied safely.
-                WaiterImpl prev = waiters.put(txId, waiter);
-
-                // Reenter
-                if (prev != null) {
-                    if (prev.locked() && 
prev.lockMode().allowReenter(lockMode)) {
-                        waiter.lock();
-
-                        waiter.upgrade(prev);
-
-                        return new IgniteBiTuple(completedFuture(null), 
prev.lockMode());
-                    } else {
-                        waiter.upgrade(prev);
-
-                        assert prev.lockMode() == waiter.lockMode() :
-                                "Lock modes are incorrect [prev=" + 
prev.lockMode() + ", new=" + waiter.lockMode() + ']';
-                    }
-                }
-
-                if (!isWaiterReadyToNotify(waiter, false)) {
-                    if (deadlockPreventionPolicy.waitTimeout() > 0) {
-                        setWaiterTimeout(waiter);
-                    }
-
-                    return new IgniteBiTuple<>(waiter.fut, waiter.lockMode());
-                }
-
-                if (!waiter.locked()) {
-                    waiters.remove(waiter.txId());
-                } else if (waiter.hasLockIntent()) {
-                    waiter.refuseIntent(); // Restore old lock.
-                }
-            }
-
-            // Notify outside the monitor.
-            waiter.notifyLocked();
-
-            return new IgniteBiTuple(waiter.fut, waiter.lockMode());
-        }
-
-        /**
-         * Checks current waiter. It can change the internal state of the 
waiter.
-         *
-         * @param waiter Checked waiter.
-         * @return True if current waiter ready to notify, false otherwise.
-         */
-        private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean 
skipFail) {
-            for (Map.Entry<UUID, WaiterImpl> entry : 
waiters.tailMap(waiter.txId(), false).entrySet()) {
-                WaiterImpl tmp = entry.getValue();
-                LockMode mode = lockedMode(tmp);
-
-                if (mode != null && 
!mode.isCompatible(waiter.intendedLockMode())) {
-                    if (conflictFound(waiter.txId(), tmp.txId())) {
-                        waiter.fail(abandonedLockException(waiter, tmp));
-
-                        return true;
-                    } else if (!deadlockPreventionPolicy.usePriority() && 
deadlockPreventionPolicy.waitTimeout() == 0) {
-                        waiter.fail(lockException(waiter, tmp));
-
-                        return true;
-                    }
-
-                    return false;
-                }
-            }
-
-            for (Map.Entry<UUID, WaiterImpl> entry : 
waiters.headMap(waiter.txId()).entrySet()) {
-                WaiterImpl tmp = entry.getValue();
-                LockMode mode = lockedMode(tmp);
-
-                if (mode != null && 
!mode.isCompatible(waiter.intendedLockMode())) {
-                    if (skipFail) {
-                        return false;
-                    } else if (conflictFound(waiter.txId(), tmp.txId())) {
-                        waiter.fail(abandonedLockException(waiter, tmp));
-
-                        return true;
-                    }  else if (deadlockPreventionPolicy.waitTimeout() == 0) {
-                        waiter.fail(lockException(waiter, tmp));
-
-                        return true;
-                    } else {
-                        return false;
-                    }
-                }
-            }
-
-            waiter.lock();
-
-            return true;
-        }
-
-        /**
-         * Create lock exception with given parameters.
-         *
-         * @param locker Locker.
-         * @param holder Lock holder.
-         * @return Lock exception.
-         */
-        private LockException lockException(Waiter locker, Waiter holder) {
-            return new LockException(ACQUIRE_LOCK_ERR,
-                    "Failed to acquire a lock due to a possible deadlock 
[locker=" + locker + ", holder=" + holder + ']');
-        }
-
-        /**
-         * Create lock exception when lock holder is believed to be missing.
-         *
-         * @param locker Locker.
-         * @param holder Lock holder.
-         * @return Lock exception.
-         */
-        private LockException abandonedLockException(WaiterImpl locker, 
WaiterImpl holder) {
-            return new LockException(ACQUIRE_LOCK_ERR,
-                    "Failed to acquire an abandoned lock due to a possible 
deadlock [locker=" + locker + ", holder=" + holder + ']');
-        }
-
-        /**
-         * Attempts to release a lock for the specified {@code key} in 
exclusive mode.
-         *
-         * @param txId Transaction id.
-         * @return {@code True} if the queue is empty.
-         */
-        public boolean tryRelease(UUID txId) {
-            Collection<WaiterImpl> toNotify;
-
-            synchronized (waiters) {
-                toNotify = release(txId);
-            }
-
-            // Notify outside the monitor.
-            for (WaiterImpl waiter : toNotify) {
-                waiter.notifyLocked();
-            }
-
-            return markedForRemove;
-        }
-
-        /**
-         * Releases a specific lock of the key.
-         *
-         * @param txId Transaction id.
-         * @param lockMode Lock mode.
-         * @return If the value is true, no one waits of any lock of the key, 
false otherwise.
-         */
-        public boolean tryRelease(UUID txId, LockMode lockMode) {
-            List<WaiterImpl> toNotify = Collections.emptyList();
-            synchronized (waiters) {
-                WaiterImpl waiter = waiters.get(txId);
-
-                if (waiter != null) {
-                    assert lockMode.supremum(lockMode, waiter.lockMode()) == 
waiter.lockMode() :
-                            "The lock mode is not locked [mode=" + lockMode + 
", locked=" + waiter.lockMode() + ']';
-
-                    LockMode modeFromDowngrade = 
waiter.recalculateMode(lockMode);
-
-                    if (!waiter.locked() && !waiter.hasLockIntent()) {
-                        toNotify = release(txId);
-                    } else if (modeFromDowngrade != waiter.lockMode()) {
-                        toNotify = unlockCompatibleWaiters();
-                    }
-                }
-            }
-
-            // Notify outside the monitor.
-            for (WaiterImpl waiter : toNotify) {
-                waiter.notifyLocked();
-            }
-
-            return markedForRemove;
-        }
-
-        /**
-         * Releases all locks are held by a specific transaction.
-         * This method should be invoked synchronously.
-         *
-         * @param txId Transaction id.
-         * @return List of waiters to notify.
-         */
-        private List<WaiterImpl> release(UUID txId) {
-            waiters.remove(txId);
-
-            if (waiters.isEmpty()) {
-                markedForRemove = true;
-
-                return Collections.emptyList();
-            }
-
-            List<WaiterImpl> toNotify = unlockCompatibleWaiters();
-
-            return toNotify;
-        }
-
-        /**
-         * Unlock compatible waiters.
-         *
-         * @return List of waiters to notify.
-         */
-        private List<WaiterImpl> unlockCompatibleWaiters() {
-            if (!deadlockPreventionPolicy.usePriority() && 
deadlockPreventionPolicy.waitTimeout() == 0) {
-                return Collections.emptyList();
-            }
-
-            ArrayList<WaiterImpl> toNotify = new ArrayList<>();
-            Set<UUID> toFail = new HashSet<>();
-
-            for (Map.Entry<UUID, WaiterImpl> entry : waiters.entrySet()) {
-                WaiterImpl tmp = entry.getValue();
-
-                if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, true)) {
-                    assert !tmp.hasLockIntent() : "This waiter in not locked 
for notification [waiter=" + tmp + ']';
-
-                    toNotify.add(tmp);
-                }
-            }
-
-            if (deadlockPreventionPolicy.usePriority() && 
deadlockPreventionPolicy.waitTimeout() >= 0) {
-                for (Map.Entry<UUID, WaiterImpl> entry : waiters.entrySet()) {
-                    WaiterImpl tmp = entry.getValue();
-
-                    if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, 
false)) {
-                        assert tmp.hasLockIntent() : "Only failed waiter can 
be notified here [waiter=" + tmp + ']';
-
-                        toNotify.add(tmp);
-                        toFail.add(tmp.txId());
-                    }
-                }
-
-                for (UUID failTx : toFail) {
-                    var w = waiters.get(failTx);
-
-                    if (w.locked()) {
-                        w.refuseIntent();
-                    } else {
-                        waiters.remove(failTx);
-                    }
-                }
-            }
-
-            return toNotify;
-        }
-
-        /**
-         * Makes the waiter fail after specified timeout (in milliseconds), if 
intended lock was not acquired within this timeout.
-         *
-         * @param waiter Waiter.
-         */
-        private void setWaiterTimeout(WaiterImpl waiter) {
-            delayedExecutor.execute(() -> {
-                if (!waiter.fut.isDone()) {
-                    waiter.fut.completeExceptionally(new 
LockException(ACQUIRE_LOCK_TIMEOUT_ERR, "Failed to acquire a lock due to "
-                            + "timeout [txId=" + waiter.txId() + ", waiter=" + 
waiter
-                            + ", timeout=" + 
deadlockPreventionPolicy.waitTimeout() + ']'));
-                }
-            });
-        }
-
-        /**
-         * Gets a lock mode for this waiter.
-         *
-         * @param waiter Waiter.
-         * @return Lock mode, which is held by the waiter or {@code null}, if 
the waiter holds nothing.
-         */
-        private LockMode lockedMode(WaiterImpl waiter) {
-            LockMode mode = null;
-
-            if (waiter.locked()) {
-                mode = waiter.lockMode();
-            }
-
-            return mode;
-        }
-
-        /**
-         * Returns a collection of timestamps that is associated with the 
specified {@code key}.
-         *
-         * @return The waiters queue.
-         */
-        public Collection<UUID> queue() {
-            synchronized (waiters) {
-                return new ArrayList<>(waiters.keySet());
-            }
-        }
-
-        /**
-         * Returns a waiter for the specified {@code key}.
-         *
-         * @param txId Transaction id.
-         * @return The waiter.
-         */
-        public Waiter waiter(UUID txId) {
-            synchronized (waiters) {
-                return waiters.get(txId);
-            }
-        }
-
-        /**
-         * Notifies about the lock conflict found between transactions.
-         *
-         * @param acquirerTx Transaction which tries to acquire the lock.
-         * @param holderTx Transaction which holds the lock.
-         */
-        private boolean conflictFound(UUID acquirerTx, UUID holderTx) {
-            CompletableFuture<Void> eventResult = 
fireEvent(LockEvent.LOCK_CONFLICT, new LockEventParameters(acquirerTx, 
holderTx));
-            // No async handling is expected.
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-21153
-            assert eventResult.isDone() : "Async lock conflict handling is not 
supported";
-
-            return eventResult.isCompletedExceptionally();
-        }
-    }
-
-    /**
-     * A waiter implementation.
-     */
-    private static class WaiterImpl implements Comparable<WaiterImpl>, Waiter {
-        /**
-         * Holding locks by type.
-         * TODO: IGNITE-18350 Abandon the collection in favor of BitSet.
-         */
-        private final Map<LockMode, Integer> locks = new HashMap<>();
-
-        /**
-         * Lock modes are marked as intended, but have not taken yet. This is 
NOT specific to intention lock modes, such as IS and IX.
-         * TODO: IGNITE-18350 Abandon the collection in favor of BitSet.
-         */
-        private final Set<LockMode> intendedLocks = new HashSet<>();
-
-        /** Locked future. */
-        @IgniteToStringExclude
-        private CompletableFuture<Void> fut;
-
-        /** Waiter transaction id. */
-        private final UUID txId;
-
-        /** The lock mode to intend to hold. This is NOT specific to intention 
lock modes, such as IS and IX. */
-        private LockMode intendedLockMode;
-
-        /** The lock mode. */
-        private LockMode lockMode;
-
-        /**
-         * The filed has a value when the waiter couldn't lock a key.
-         */
-        private LockException ex;
-
-        /**
-         * The constructor.
-         *
-         * @param txId Transaction id.
-         * @param lockMode Lock mode.
-         */
-        WaiterImpl(UUID txId, LockMode lockMode) {
-            this.fut = new CompletableFuture<>();
-            this.txId = txId;
-            this.intendedLockMode = lockMode;
-
-            locks.put(lockMode, 1);
-            intendedLocks.add(lockMode);
-        }
-
-        /**
-         * Adds a lock mode.
-         *
-         * @param lockMode Lock mode.
-         * @param increment Value to increment amount.
-         */
-        void addLock(LockMode lockMode, int increment) {
-            locks.merge(lockMode, increment, Integer::sum);
-        }
-
-        /**
-         * Removes a lock mode.
-         *
-         * @param lockMode Lock mode.
-         * @return True if the lock mode was removed, false otherwise.
-         */
-        private boolean removeLock(LockMode lockMode) {
-            Integer counter = locks.get(lockMode);
-
-            if (counter == null || counter < 2) {
-                locks.remove(lockMode);
-
-                return true;
-            } else {
-                locks.put(lockMode, counter - 1);
-
-                return false;
-            }
-        }
-
-        /**
-         * Recalculates lock mode based of all locks which the waiter has 
taken.
-         *
-         * @param modeToRemove Mode without which, the recalculation will 
happen.
-         * @return Previous lock mode.
-         */
-        LockMode recalculateMode(LockMode modeToRemove) {
-            if (!removeLock(modeToRemove)) {
-                return lockMode;
-            }
-
-            return recalculate();
-        }
-
-        /**
-         * Recalculates lock supremums.
-         *
-         * @return Previous lock mode.
-         */
-        private LockMode recalculate() {
-            LockMode newIntendedLockMode = null;
-            LockMode newLockMode = null;
-
-            for (LockMode mode : locks.keySet()) {
-                assert locks.get(mode) > 0 : "Incorrect lock counter [txId=" + 
txId + ", mode=" + mode + "]";
-
-                if (intendedLocks.contains(mode)) {
-                    newIntendedLockMode = newIntendedLockMode == null ? mode : 
LockMode.supremum(newIntendedLockMode, mode);
-                } else {
-                    newLockMode = newLockMode == null ? mode : 
LockMode.supremum(newLockMode, mode);
-                }
-            }
-
-            LockMode mode = lockMode;
-
-            lockMode = newLockMode;
-            intendedLockMode = newLockMode != null && newIntendedLockMode != 
null ? LockMode.supremum(newLockMode, newIntendedLockMode)
-                    : newIntendedLockMode;
-
-            return mode;
-        }
-
-        /**
-         * Merge all locks that were held by another waiter to the current one.
-         *
-         * @param other Other waiter.
-         */
-        void upgrade(WaiterImpl other) {
-            intendedLocks.addAll(other.intendedLocks);
-
-            other.locks.entrySet().forEach(entry -> addLock(entry.getKey(), 
entry.getValue()));
-
-            recalculate();
-
-            if (other.hasLockIntent()) {
-                fut = other.fut;
-            }
-        }
-
-        /**
-         * Removes all locks that were intended to hold.
-         */
-        void refuseIntent() {
-            for (LockMode mode : intendedLocks) {
-                locks.remove(mode);
-            }
-
-            intendedLocks.clear();
-            intendedLockMode = null;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public int compareTo(WaiterImpl o) {
-            return txId.compareTo(o.txId);
-        }
-
-        /** Notifies a future listeners. */
-        private void notifyLocked() {
-            if (ex != null) {
-                fut.completeExceptionally(ex);
-            } else {
-                assert lockMode != null;
-
-                fut.complete(null);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public boolean locked() {
-            return this.lockMode != null;
-        }
-
-        /**
-         * Checks is the waiter has any intended to lock a key.
-         *
-         * @return True if the waiter has an intended lock, false otherwise.
-         */
-        public boolean hasLockIntent() {
-            return this.intendedLockMode != null;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public LockMode lockMode() {
-            return lockMode;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public LockMode intendedLockMode() {
-            return intendedLockMode;
-        }
-
-        /** Grant a lock. */
-        private void lock() {
-            lockMode = intendedLockMode;
-
-            intendedLockMode = null;
-
-            intendedLocks.clear();
-        }
-
-        /**
-         * Fails the lock waiter.
-         *
-         * @param e Lock exception.
-         */
-        private void fail(LockException e) {
-            ex = e;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public UUID txId() {
-            return txId;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public boolean equals(Object o) {
-            if (!(o instanceof WaiterImpl)) {
-                return false;
-            }
-
-            return compareTo((WaiterImpl) o) == 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public int hashCode() {
-            return txId.hashCode();
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public String toString() {
-            return S.toString(WaiterImpl.class, this, "isDone", fut.isDone());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public boolean isEmpty() {
-        return locks.isEmpty();
-    }
-}
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
index f790eb9044..17d56f8535 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
@@ -1070,7 +1070,7 @@ public abstract class AbstractLockManagerTest extends 
IgniteAbstractTest {
 
     @Test
     public void testLockIsReleased() {
-        LockKey key = new LockKey(0);
+        LockKey key = lockKey();
 
         UUID txId1 = TestTransactionIds.newTransactionId();
 
@@ -1097,7 +1097,7 @@ public abstract class AbstractLockManagerTest extends 
IgniteAbstractTest {
         UUID tx2 = TestTransactionIds.newTransactionId();
         UUID tx3 = TestTransactionIds.newTransactionId();
 
-        var key = new LockKey(0);
+        var key = lockKey();
 
         assertThat(lockManager.acquire(tx1, key, S), 
willCompleteSuccessfully());
         assertThat(lockManager.acquire(tx2, key, IS), 
willCompleteSuccessfully());
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
new file mode 100644
index 0000000000..d3348e7e06
--- /dev/null
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
+import org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy;
+import org.apache.ignite.internal.tx.test.TestTransactionIds;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests coarse lock modes. It allows IX, S locks and upgrade from S to SIX 
(S, then IX).
+ */
+public class CoarseGrainedLockManagerTest {
+    private final HeapLockManager lockManager = new HeapLockManager(new 
WaitDieDeadlockPreventionPolicy());
+
+    @AfterEach
+    void after() {
+        assertTrue(lockManager.isEmpty());
+    }
+
+    @Test
+    public void testSimple() {
+        UUID older = TestTransactionIds.newTransactionId();
+        UUID newer = TestTransactionIds.newTransactionId();
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(), 
LockMode.IX);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(older, lockKey(), 
LockMode.S);
+        assertFalse(fut2.isDone());
+
+        lockManager.releaseAll(newer);
+        fut2.join();
+
+        lockManager.releaseAll(older);
+    }
+
+    @Test
+    public void testSimpleInverse() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(), 
LockMode.S);
+        assertTrue(fut1.isDone());
+
+        UUID txId2 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, lockKey(), 
LockMode.IX);
+
+        assertThrowsWithCause(fut2::join, LockException.class);
+
+        lockManager.releaseAll(txId1);
+
+        fut2 = lockManager.acquire(txId2, lockKey(), LockMode.IX);
+        assertTrue(fut2.isDone());
+
+        lockManager.releaseAll(txId2);
+    }
+
+    @Test
+    public void testComplex() {
+        // Older.
+        UUID txId4 = TestTransactionIds.newTransactionId();
+        UUID txId5 = TestTransactionIds.newTransactionId();
+        // Newer.
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        UUID txId2 = TestTransactionIds.newTransactionId();
+        UUID txId3 = TestTransactionIds.newTransactionId();
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(), 
LockMode.IX);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, lockKey(), 
LockMode.IX);
+        assertTrue(fut2.isDone());
+
+        CompletableFuture<Lock> fut3 = lockManager.acquire(txId3, lockKey(), 
LockMode.IX);
+        assertTrue(fut3.isDone());
+
+        CompletableFuture<Lock> fut4 = lockManager.acquire(txId4, lockKey(), 
LockMode.S);
+        assertFalse(fut4.isDone());
+
+        CompletableFuture<Lock> fut5 = lockManager.acquire(txId5, lockKey(), 
LockMode.S);
+        assertFalse(fut5.isDone());
+
+        lockManager.releaseAll(txId1);
+        assertFalse(fut4.isDone());
+        assertFalse(fut5.isDone());
+
+        lockManager.releaseAll(txId2);
+        assertFalse(fut4.isDone());
+        assertFalse(fut5.isDone());
+
+        lockManager.releaseAll(txId3);
+        fut4.join();
+        fut5.join();
+
+        lockManager.releaseAll(txId4);
+        lockManager.releaseAll(txId5);
+    }
+
+    @Test
+    public void testComplexInverse() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(), 
LockMode.S);
+        assertTrue(fut1.isDone());
+
+        UUID txId2 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, lockKey(), 
LockMode.S);
+        assertTrue(fut2.isDone());
+
+        UUID txId3 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut3 = lockManager.acquire(txId3, lockKey(), 
LockMode.S);
+        assertTrue(fut3.isDone());
+
+        UUID txId4 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut4 = lockManager.acquire(txId4, lockKey(), 
LockMode.IX);
+        assertThrowsWithCause(fut4::join, LockException.class);
+
+        UUID txId5 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut5 = lockManager.acquire(txId5, lockKey(), 
LockMode.IX);
+        assertThrowsWithCause(fut5::join, LockException.class);
+
+        lockManager.releaseAll(txId1);
+        lockManager.releaseAll(txId2);
+        lockManager.releaseAll(txId3);
+
+        fut4 = lockManager.acquire(txId4, lockKey(), LockMode.IX);
+        fut4.join();
+
+        fut5 = lockManager.acquire(txId5, lockKey(), LockMode.IX);
+        fut5.join();
+
+        lockManager.releaseAll(txId4);
+        lockManager.releaseAll(txId5);
+    }
+
+    @Test
+    public void testUpgrade() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(), 
LockMode.IX);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, lockKey(), 
LockMode.S);
+        assertTrue(fut2.isDone());
+
+        lockManager.releaseAll(txId1);
+    }
+
+    @Test
+    public void testUpgradeReverse() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(), 
LockMode.S);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, lockKey(), 
LockMode.IX);
+        assertTrue(fut2.isDone());
+
+        lockManager.releaseAll(txId1);
+    }
+
+    @Test
+    public void testUpgradeMulti() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(), 
LockMode.IX);
+        assertTrue(fut1.isDone());
+
+        UUID txId2 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, lockKey(), 
LockMode.IX);
+        assertTrue(fut2.isDone());
+
+        CompletableFuture<Lock> fut3 = lockManager.acquire(txId1, lockKey(), 
LockMode.S);
+        assertThrowsWithCause(fut3::join, LockException.class);
+
+        lockManager.releaseAll(txId1);
+        lockManager.releaseAll(txId2);
+    }
+
+    @Test
+    public void testUpgradeReverseMulti() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(), 
LockMode.S);
+        assertTrue(fut1.isDone());
+
+        UUID txId2 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, lockKey(), 
LockMode.S);
+        assertTrue(fut2.isDone());
+
+        CompletableFuture<Lock> fut3 = lockManager.acquire(txId1, lockKey(), 
LockMode.IX);
+        assertThrowsWithCause(fut3::join, LockException.class);
+
+        lockManager.releaseAll(txId1);
+        lockManager.releaseAll(txId2);
+    }
+
+    @Test
+    public void testReenter() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(), 
LockMode.IX);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, lockKey(), 
LockMode.IX);
+        assertTrue(fut2.isDone());
+
+        lockManager.releaseAll(txId1);
+    }
+
+    @Test
+    public void testReenter2() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(), 
LockMode.S);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, lockKey(), 
LockMode.S);
+        assertTrue(fut2.isDone());
+
+        lockManager.releaseAll(txId1);
+    }
+
+    @Test
+    public void testUpgradeAndLockRequest() {
+        UUID older = TestTransactionIds.newTransactionId();
+        UUID newer = TestTransactionIds.newTransactionId();
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(), 
LockMode.IX);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(newer, lockKey(), 
LockMode.S);
+        assertTrue(fut2.isDone());
+
+
+        CompletableFuture<Lock> fut3 = lockManager.acquire(older, lockKey(), 
LockMode.S);
+        assertFalse(fut3.isDone());
+
+        lockManager.releaseAll(newer);
+
+        fut3.join();
+
+        lockManager.releaseAll(older);
+    }
+
+    @Test
+    public void testUpgradeAndLockRequestReverse() {
+        UUID older = TestTransactionIds.newTransactionId();
+        UUID newer = TestTransactionIds.newTransactionId();
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(), 
LockMode.S);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(newer, lockKey(), 
LockMode.IX);
+        assertTrue(fut2.isDone());
+
+        CompletableFuture<Lock> fut3 = lockManager.acquire(older, lockKey(), 
LockMode.S);
+        assertFalse(fut3.isDone());
+
+        lockManager.releaseAll(newer);
+
+        fut3.join();
+
+        lockManager.releaseAll(older);
+    }
+
+    @Test
+    public void testUpgradeAndLockRequest2() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(), 
LockMode.IX);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, lockKey(), 
LockMode.S);
+        assertTrue(fut2.isDone());
+
+        UUID txId2 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut3 = lockManager.acquire(txId2, lockKey(), 
LockMode.IX);
+        assertThrowsWithCause(fut3::join, LockException.class);
+
+        lockManager.releaseAll(txId1);
+    }
+
+    @Test
+    public void testUpgradeAndLockRequestReverse2() {
+        UUID txId1 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(), 
LockMode.S);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, lockKey(), 
LockMode.IX);
+        assertTrue(fut2.isDone());
+
+        UUID txId2 = TestTransactionIds.newTransactionId();
+        CompletableFuture<Lock> fut3 = lockManager.acquire(txId2, lockKey(), 
LockMode.IX);
+        assertThrowsWithCause(fut3::join, LockException.class);
+
+        lockManager.releaseAll(txId1);
+    }
+
+    @Test
+    public void testDeadlockAvoidance() {
+        UUID older = TestTransactionIds.newTransactionId();
+        UUID newer = TestTransactionIds.newTransactionId();
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(), 
LockMode.IX);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(older, lockKey2(), 
LockMode.IX);
+        assertTrue(fut2.isDone());
+
+        CompletableFuture<Lock> fut3 = lockManager.acquire(newer, lockKey2(), 
LockMode.S);
+        assertThrowsWithCause(fut3::join, LockException.class);
+
+        CompletableFuture<Lock> fut4 = lockManager.acquire(older, lockKey(), 
LockMode.S);
+        assertFalse(fut4.isDone());
+
+        lockManager.releaseAll(newer);
+
+        fut4.join();
+
+        lockManager.releaseAll(older);
+    }
+
+    @Test
+    public void testReleaseWaitingTx() {
+        UUID older = TestTransactionIds.newTransactionId();
+        UUID newer = TestTransactionIds.newTransactionId();
+
+        CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(), 
LockMode.IX);
+        assertTrue(fut1.isDone());
+
+        CompletableFuture<Lock> fut2 = lockManager.acquire(older, lockKey(), 
LockMode.S);
+        assertFalse(fut2.isDone());
+
+        lockManager.releaseAll(older);
+
+        fut2.join();
+
+        lockManager.releaseAll(newer);
+    }
+
+    private static LockKey lockKey() {
+        return new LockKey("test");
+    }
+
+    private static LockKey lockKey2() {
+        return new LockKey("test2");
+    }
+}
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapUnboundedLockManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapUnboundedLockManagerTest.java
deleted file mode 100644
index 24744f7504..0000000000
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapUnboundedLockManagerTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.tx;
-
-import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
-import org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy;
-
-/**
- * Test class for {@link HeapUnboundedLockManager}.
- */
-public class HeapUnboundedLockManagerTest extends AbstractLockManagerTest {
-    @Override
-    protected LockManager newInstance() {
-        return new HeapUnboundedLockManager(new 
WaitDieDeadlockPreventionPolicy());
-    }
-
-    @Override
-    protected LockKey lockKey() {
-        return new LockKey("test");
-    }
-}
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionUnboundedTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionUnboundedTest.java
deleted file mode 100644
index c335950e91..0000000000
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionUnboundedTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.tx;
-
-import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
-
-/**
- * NoWaitDeadlockPreventionUnboundedTest.
- */
-public class NoWaitDeadlockPreventionUnboundedTest extends 
NoWaitDeadlockPreventionTest {
-    @Override
-    protected LockManager lockManager() {
-        return new HeapUnboundedLockManager(deadlockPreventionPolicy());
-    }
-}
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionUnboundedTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionUnboundedTest.java
deleted file mode 100644
index 4dcdfadd00..0000000000
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionUnboundedTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.tx;
-
-import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
-
-/**
- * NoneDeadlockPreventionUnboundedTest.
- */
-public class NoneDeadlockPreventionUnboundedTest extends 
NoneDeadlockPreventionTest {
-    @Override
-    protected LockManager lockManager() {
-        return new HeapUnboundedLockManager(deadlockPreventionPolicy());
-    }
-}
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/ReversedDeadlockPreventionUnboundedTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/ReversedDeadlockPreventionUnboundedTest.java
deleted file mode 100644
index f0f66bb8ea..0000000000
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/ReversedDeadlockPreventionUnboundedTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.tx;
-
-import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
-
-/**
- * ReversedDeadlockPreventionUnboundedTest.
- */
-public class ReversedDeadlockPreventionUnboundedTest extends 
ReversedDeadlockPreventionTest {
-    @Override
-    protected LockManager lockManager() {
-        return new HeapUnboundedLockManager(deadlockPreventionPolicy());
-    }
-}
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TimeoutDeadlockPreventionUnboundedTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TimeoutDeadlockPreventionUnboundedTest.java
deleted file mode 100644
index 3bd121abd5..0000000000
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TimeoutDeadlockPreventionUnboundedTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.tx;
-
-import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
-
-/**
- * TimeoutDeadlockPreventionUnboundedTest.
- */
-public class TimeoutDeadlockPreventionUnboundedTest extends 
TimeoutDeadlockPreventionTest {
-    @Override
-    protected LockManager lockManager() {
-        return new HeapUnboundedLockManager(deadlockPreventionPolicy());
-    }
-}

Reply via email to