This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d1c4bbe4ea IGNITE-23301 Improved concurrency for coarse locks.
d1c4bbe4ea is described below
commit d1c4bbe4ea69b0ba78bc6e37e92811ab7159cfd3
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Thu Oct 24 12:51:12 2024 +0300
IGNITE-23301 Improved concurrency for coarse locks.
---
.../internal/util/IgniteStripedReadWriteLock.java | 222 ++++++
.../util/IgniteStripedReadWriteLockSelfTest.java | 326 +++++++++
.../benchmark/AbstractMultiNodeBenchmark.java | 11 +-
.../internal/benchmark/UpsertKvBenchmark.java | 59 +-
.../apache/ignite/distributed/ItLockTableTest.java | 4 +-
.../internal/table/ItTransactionRecoveryTest.java | 35 +
.../table/distributed/raft/PartitionListener.java | 9 +-
.../replicator/PartitionReplicaListener.java | 170 +++--
.../org/apache/ignite/internal/tx/LockKey.java | 2 +-
.../org/apache/ignite/internal/tx/TxPriority.java | 13 +
.../ignite/internal/tx/impl/HeapLockManager.java | 572 ++++++++++++---
.../internal/tx/impl/HeapUnboundedLockManager.java | 775 ---------------------
.../internal/tx/AbstractLockManagerTest.java | 4 +-
.../internal/tx/CoarseGrainedLockManagerTest.java | 362 ++++++++++
.../internal/tx/HeapUnboundedLockManagerTest.java | 36 -
.../tx/NoWaitDeadlockPreventionUnboundedTest.java | 30 -
.../tx/NoneDeadlockPreventionUnboundedTest.java | 30 -
.../ReversedDeadlockPreventionUnboundedTest.java | 30 -
.../tx/TimeoutDeadlockPreventionUnboundedTest.java | 30 -
19 files changed, 1567 insertions(+), 1153 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
new file mode 100644
index 0000000000..796b557e1d
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLock.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * ReadWriteLock with striping mechanics. Compared to {@link
ReentrantReadWriteLock} it has slightly improved performance of
+ * {@link ReadWriteLock#readLock()} operations at the cost of {@link
ReadWriteLock#writeLock()} operations and memory consumption. It also
+ * supports reentrancy semantics like {@link ReentrantReadWriteLock}.
+ */
+public class IgniteStripedReadWriteLock implements ReadWriteLock {
+ /** Index generator. */
+ private static final AtomicInteger IDX_GEN = new AtomicInteger();
+
+ /** Index. */
+ private static final ThreadLocal<Integer> IDX = ThreadLocal.withInitial(()
-> IDX_GEN.incrementAndGet());
+
+ /** Locks. */
+ private final ReentrantReadWriteLock[] locks;
+
+ /** Composite write lock. */
+ private final WriteLock writeLock;
+
+ /**
+ * Creates a new instance with given concurrency level.
+ *
+ * @param concurrencyLvl Number of internal read locks.
+ */
+ public IgniteStripedReadWriteLock(int concurrencyLvl) {
+ locks = new ReentrantReadWriteLock[concurrencyLvl];
+
+ for (int i = 0; i < concurrencyLvl; i++) {
+ locks[i] = new ReentrantReadWriteLock();
+ }
+
+ writeLock = new WriteLock();
+ }
+
+ /**
+ * Gets current index.
+ *
+ * @return Index of current thread stripe.
+ */
+ private int curIdx() {
+ int idx = IDX.get();
+
+ return idx % locks.length;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Lock readLock() {
+ return locks[curIdx()].readLock();
+ }
+
+ /**
+ * Get a lock by stripe.
+ *
+ * @param idx Stripe index.
+ * @return The lock.
+ */
+ public Lock readLock(int idx) {
+ return locks[idx].readLock();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Lock writeLock() {
+ return writeLock;
+ }
+
+ /**
+ * Queries if the write lock is held by the current thread.
+ *
+ * @return {@code true} if the current thread holds the write lock and
{@code false} otherwise
+ */
+ public boolean isWriteLockedByCurrentThread() {
+ return locks[locks.length - 1].isWriteLockedByCurrentThread();
+ }
+
+ /**
+ * Queries the number of reentrant read holds on this lock by the current
thread. A reader thread has a hold on a lock for each lock
+ * action that is not matched by an unlock action.
+ *
+ * @return the number of holds on the read lock by the current thread, or
zero if the read lock is not held by the current thread
+ */
+ public int getReadHoldCount() {
+ return locks[curIdx()].getReadHoldCount();
+ }
+
+ /**
+ * Write lock.
+ */
+ private class WriteLock implements Lock {
+ /** {@inheritDoc} */
+ @Override
+ public void lock() {
+ try {
+ lock0(false);
+ } catch (InterruptedException ignore) {
+ assert false : "Should never happen";
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ lock0(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void unlock() {
+ unlock0(locks.length - 1);
+ }
+
+ /**
+ * Internal lock routine.
+ *
+ * @param canInterrupt Whether to acquire the lock interruptibly.
+ * @throws InterruptedException If interrupted.
+ */
+ private void lock0(boolean canInterrupt) throws InterruptedException {
+ int i = 0;
+
+ try {
+ for (; i < locks.length; i++) {
+ if (canInterrupt) {
+ locks[i].writeLock().lockInterruptibly();
+ } else {
+ locks[i].writeLock().lock();
+ }
+ }
+ } catch (InterruptedException e) {
+ unlock0(i - 1);
+
+ throw e;
+ }
+ }
+
+ /**
+ * Internal unlock routine.
+ *
+ * @param fromIdx Start index.
+ */
+ private void unlock0(int fromIdx) {
+ for (int i = fromIdx; i >= 0; i--) {
+ locks[i].writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean tryLock() {
+ int i = 0;
+
+ try {
+ for (; i < locks.length; i++) {
+ if (!locks[i].writeLock().tryLock()) {
+ break;
+ }
+ }
+ } finally {
+ if (0 < i && i < locks.length) {
+ unlock0(i - 1);
+ }
+ }
+
+ return i == locks.length;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) throws
InterruptedException {
+ int i = 0;
+
+ long end = unit.toNanos(time) + System.nanoTime();
+
+ try {
+ for (; i < locks.length && System.nanoTime() < end; i++) {
+ if (!locks[i].writeLock().tryLock(time, unit)) {
+ break;
+ }
+ }
+ } finally {
+ if (0 < i && i < locks.length) {
+ unlock0(i - 1);
+ }
+ }
+
+ return i == locks.length;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Condition newCondition() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
+
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLockSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLockSelfTest.java
new file mode 100644
index 0000000000..30287e5e82
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteStripedReadWriteLockSelfTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Test for {@link IgniteStripedReadWriteLockSelfTest}.
+ */
+@Timeout(value = 60, unit = TimeUnit.SECONDS)
+public class IgniteStripedReadWriteLockSelfTest {
+ private static final int CONCURRENCY = 16;
+
+ /** The lock under test. */
+ private final IgniteStripedReadWriteLock lock = new
IgniteStripedReadWriteLock(CONCURRENCY);
+
+ /** Executor service used to run tasks in threads different from the main
test thread. */
+ private final ExecutorService executor = Executors.newCachedThreadPool();
+
+ /**
+ * Cleans up after a test.
+ */
+ @AfterEach
+ void cleanup() {
+ releaseReadLockHeldByCurrentThread();
+ releaseWriteLockHeldByCurrentThread();
+
+ IgniteUtils.shutdownAndAwaitTermination(executor, 3, TimeUnit.SECONDS);
+ }
+
+ private void releaseReadLockHeldByCurrentThread() {
+ while (true) {
+ try {
+ lock.readLock().unlock();
+ } catch (IllegalMonitorStateException e) {
+ // released our read lock completely
+ break;
+ }
+ }
+ }
+
+ private void releaseWriteLockHeldByCurrentThread() {
+ while (lock.isWriteLockedByCurrentThread()) {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Test
+ void readLockDoesNotAllowWriteLockToBeAcquired() {
+ lock.readLock().lock();
+
+ assertThatWriteLockAcquireAttemptBlocksForever();
+
+ lock.readLock().unlock();
+ }
+
+ private void assertThatWriteLockAcquireAttemptBlocksForever() {
+ Future<?> future = executor.submit(() -> lock.writeLock().lock());
+
+ assertThrows(TimeoutException.class, () -> future.get(100,
TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ void readLockDoesNotAllowWriteLockToBeAcquiredWithTimeout() throws
Exception {
+ lock.readLock().lock();
+
+ Boolean acquired = callWithTimeout(() -> lock.writeLock().tryLock(1,
TimeUnit.MILLISECONDS));
+ assertThat(acquired, is(false));
+
+ lock.readLock().unlock();
+ }
+
+ @Test
+ void readLockAllowsReadLockToBeAcquired() throws Exception {
+ lock.readLock().lock();
+
+ assertThatReadLockCanBeAcquired();
+ }
+
+ private void assertThatReadLockCanBeAcquired() throws
InterruptedException, ExecutionException, TimeoutException {
+ runWithTimeout(() -> lock.readLock().lock());
+ }
+
+ private <T> T callWithTimeout(Callable<T> call) throws ExecutionException,
InterruptedException, TimeoutException {
+ Future<T> future = executor.submit(call);
+ return getWithTimeout(future);
+ }
+
+ private void runWithTimeout(Runnable runnable) throws ExecutionException,
InterruptedException, TimeoutException {
+ Future<?> future = executor.submit(runnable);
+ getWithTimeout(future);
+ }
+
+ private static <T> T getWithTimeout(Future<? extends T> future) throws
ExecutionException,
+ InterruptedException, TimeoutException {
+ return future.get(10, TimeUnit.SECONDS);
+ }
+
+ @Test
+ void writeLockDoesNotAllowReadLockToBeAcquired() {
+ lock.writeLock().lock();
+
+ assertThatReadLockAcquireAttemptBlocksForever();
+
+ lock.writeLock().unlock();
+ }
+
+ private void assertThatReadLockAcquireAttemptBlocksForever() {
+ Future<?> readLockAttemptFuture = executor.submit(() ->
lock.readLock().lock());
+
+ assertThrows(TimeoutException.class, () ->
readLockAttemptFuture.get(100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ void writeLockDoesNotAllowWriteLockToBeAcquired() {
+ lock.writeLock().lock();
+
+ assertThatWriteLockAcquireAttemptBlocksForever();
+
+ lock.writeLock().unlock();
+ }
+
+ @Test
+ void readUnlockReleasesTheLock() throws Exception {
+ lock.readLock().lock();
+ lock.readLock().unlock();
+
+ runWithTimeout(lock::writeLock);
+ }
+
+ @Test
+ void writeUnlockReleasesTheLock() throws Exception {
+ lock.writeLock().lock();
+ lock.writeLock().unlock();
+
+ assertThatReadLockCanBeAcquired();
+ }
+
+ @Test
+ void testWriteLockReentry() {
+ lock.writeLock();
+
+ lock.writeLock();
+
+ assertTrue(lock.writeLock().tryLock());
+ }
+
+ @Test
+ void testWriteLockReentryWithTryWriteLock() {
+ lock.writeLock().tryLock();
+
+ assertTrue(lock.writeLock().tryLock());
+ }
+
+ @Test
+ void testWriteLockReentryWithTimeout() throws Exception {
+ lock.writeLock().tryLock(1, TimeUnit.MILLISECONDS);
+ lock.writeLock().tryLock(1, TimeUnit.MILLISECONDS);
+
+ assertTrue(lock.writeLock().tryLock());
+ }
+
+ @Test
+ void testReadLockReentry() {
+ lock.readLock();
+
+ lock.readLock();
+
+ assertTrue(lock.readLock().tryLock());
+ }
+
+ @Test
+ void shouldAllowAcquireAndReleaseReadLockWhileHoldingWriteLock() {
+ lock.writeLock().lock();
+
+ lock.readLock().lock();
+ lock.readLock().unlock();
+
+ lock.writeLock().unlock();
+ }
+
+ @Test
+ void shouldAllowInterleavingHoldingReadAndWriteLocks() {
+ lock.writeLock().lock();
+
+ lock.readLock().lock();
+
+ lock.writeLock().unlock();
+
+ assertFalse(lock.writeLock().tryLock());
+
+ lock.readLock().unlock();
+
+ // Test that we can operate with write locks now.
+ lock.writeLock().lock();
+ lock.writeLock().unlock();
+ }
+
+ @Test
+ void readLockReleasedLessTimesThanAcquiredShouldStillBeTaken() {
+ lock.readLock().lock();
+ lock.readLock().lock();
+ lock.readLock().unlock();
+
+ assertThatWriteLockAcquireAttemptBlocksForever();
+
+ lock.readLock().lock();
+ }
+
+ @Test
+ void writeLockReleasedLessTimesThanAcquiredShouldStillBeTaken() {
+ lock.writeLock().lock();
+ lock.writeLock().lock();
+ lock.writeLock().unlock();
+
+ assertThatReadLockAcquireAttemptBlocksForever();
+
+ lock.writeLock().unlock();
+ }
+
+ @Test
+ void shouldThrowOnReadUnlockingWhenNotHoldingReadLock() {
+ assertThrows(IllegalMonitorStateException.class, () ->
lock.readLock().unlock());
+ }
+
+ @Test
+ void shouldThrowOnWriteUnlockingWhenNotHoldingWriteLock() {
+ assertThrows(IllegalMonitorStateException.class, () ->
lock.writeLock().unlock());
+ }
+
+ @Test
+ void readLockAcquiredWithTryReadLockDoesNotAllowWriteLockToBeAcquired() {
+ lock.readLock().tryLock();
+
+ assertThatWriteLockAcquireAttemptBlocksForever();
+
+ lock.readLock().unlock();
+ }
+
+ @Test
+ void tryReadLockShouldReturnTrueWhenReadLockWasAcquiredSuccessfully() {
+ assertTrue(lock.readLock().tryLock());
+ }
+
+ @Test
+ void tryReadLockShouldReturnFalseWhenReadLockCouldNotBeAcquired() throws
Exception {
+ lock.writeLock().lock();
+
+ Boolean acquired = callWithTimeout(() -> lock.readLock().tryLock());
+
+ assertThat(acquired, is(false));
+ }
+
+ @Test
+ void writeLockAcquiredWithTryWriteLockDoesNotAllowWriteLockToBeAcquired() {
+ lock.writeLock().tryLock();
+
+ assertThatReadLockAcquireAttemptBlocksForever();
+
+ lock.writeLock().unlock();
+ }
+
+ @Test
+ void tryWriteLockShouldReturnTrueWhenWriteLockWasAcquiredSuccessfully() {
+ assertTrue(lock.writeLock().tryLock());
+ }
+
+ @Test
+ void tryWriteLockShouldReturnFalseWhenWriteLockCouldNotBeAcquired() throws
Exception {
+ lock.writeLock().lock();
+
+ Boolean acquired = callWithTimeout(() -> lock.writeLock().tryLock());
+
+ assertThat(acquired, is(false));
+ }
+
+ @Test
+ void writeLockedByCurrentThreadShouldReturnTrueWhenLockedByCurrentThread()
{
+ lock.writeLock().lock();
+
+ assertTrue(lock.isWriteLockedByCurrentThread());
+ }
+
+ @Test
+ void writeLockedByCurrentThreadShouldReturnFalseWhenNotLocked() {
+ assertFalse(lock.isWriteLockedByCurrentThread());
+ }
+
+ @Test
+ void
writeLockedByCurrentThreadShouldReturnFalseWhenLockedByAnotherThread() throws
Exception {
+ lock.writeLock();
+
+ Boolean lockedByCaller =
callWithTimeout(lock::isWriteLockedByCurrentThread);
+ assertThat(lockedByCaller, is(false));
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index 23678e9954..e6cc47d0ac 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -197,7 +197,8 @@ public class AbstractMultiNodeBenchmark {
+ " },\n"
+ " clientConnector: { port:{} },\n"
+ " rest.port: {},\n"
- + " raft.fsync = " + fsync
+ + " raft.fsync = " + fsync() + ",\n"
+ + " system.partitionsLogPath = \"" + logPath() + "\""
+ "}";
for (int i = 0; i < nodes(); i++) {
@@ -237,6 +238,14 @@ public class AbstractMultiNodeBenchmark {
return Files.createTempDirectory("tmpDirPrefix").toFile().toPath();
}
+ protected String logPath() {
+ return "";
+ }
+
+ protected boolean fsync() {
+ return fsync;
+ }
+
protected int nodes() {
return 3;
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
index 8c7a5efb38..507f4d8387 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.benchmark;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.table.KeyValueView;
@@ -27,10 +31,10 @@ import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
@@ -46,15 +50,22 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
@Threads(1)
@Warmup(iterations = 10, time = 2)
@Measurement(iterations = 20, time = 2)
-@BenchmarkMode(Mode.AverageTime)
-@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@BenchmarkMode(Mode.Throughput)
+@OutputTimeUnit(TimeUnit.SECONDS)
public class UpsertKvBenchmark extends AbstractMultiNodeBenchmark {
private final Tuple tuple = Tuple.create();
- private int id = 0;
-
private static KeyValueView<Tuple, Tuple> kvView;
+ @Param({"1"})
+ private int batch;
+
+ @Param({"false"})
+ private boolean fsync;
+
+ @Param({"8"})
+ private int partitionCount;
+
@Override
public void nodeSetUp() throws Exception {
System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK,
"true");
@@ -67,23 +78,33 @@ public class UpsertKvBenchmark extends
AbstractMultiNodeBenchmark {
*/
@Setup
public void setUp() {
- kvView = publicIgnite.tables().table(TABLE_NAME).keyValueView();
+ kvView = igniteImpl.tables().table(TABLE_NAME).keyValueView();
for (int i = 1; i < 11; i++) {
tuple.set("field" + i, FIELD_VAL);
}
}
- @TearDown
- public void tearDown() {
- System.out.println("Inserted " + id + " tuples");
- }
-
/**
* Benchmark for KV upsert via embedded client.
*/
@Benchmark
public void upsert() {
- kvView.put(null, Tuple.create().set("ycsb_key", id++), tuple);
+ List<CompletableFuture<Void>> futs = new ArrayList<>();
+
+ for (int i = 0; i < batch - 1; i++) {
+ CompletableFuture<Void> fut = kvView.putAsync(null,
Tuple.create().set("ycsb_key", nextId()), tuple);
+ futs.add(fut);
+ }
+
+ for (CompletableFuture<Void> fut : futs) {
+ fut.join();
+ }
+
+ kvView.put(null, Tuple.create().set("ycsb_key", nextId()), tuple);
+ }
+
+ private int nextId() {
+ return ThreadLocalRandom.current().nextInt();
}
/**
@@ -92,11 +113,18 @@ public class UpsertKvBenchmark extends
AbstractMultiNodeBenchmark {
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + UpsertKvBenchmark.class.getSimpleName() + ".*")
+ // .jvmArgsAppend("-Djmh.executor=VIRTUAL")
+ // .addProfiler(JavaFlightRecorderProfiler.class,
"configName=profile.jfc")
.build();
new Runner(opt).run();
}
+ @Override
+ protected boolean fsync() {
+ return fsync;
+ }
+
@Override
protected int nodes() {
return 1;
@@ -104,6 +132,11 @@ public class UpsertKvBenchmark extends
AbstractMultiNodeBenchmark {
@Override
protected int partitionCount() {
- return 8;
+ return partitionCount;
+ }
+
+ @Override
+ protected int replicaCount() {
+ return 1;
}
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
index c2aa7702f8..ee48aa527b 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager.LockState;
-import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
@@ -149,8 +148,7 @@ public class ItLockTableTest extends IgniteAbstractTest {
new HeapLockManager(
DeadlockPreventionPolicy.NO_OP,
HeapLockManager.SLOTS,
- CACHE_SIZE,
- new HeapUnboundedLockManager()),
+ CACHE_SIZE),
clockService,
generator,
placementDriver,
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index a99541dca8..a4b0d121cf 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -237,6 +237,41 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
assertTrue(waitForCondition(() -> txStoredState(commitPartNode,
orphanTxId) == TxState.ABORTED, 10_000));
}
+ @Test
+ public void testAbandonedTxWithCoarseLockIsAborted() throws Exception {
+ TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
+
+ var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
+
+ String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
+
+ IgniteImpl commitPartNode = findNodeByName(leaseholder);
+
+ log.info("Transaction commit partition is determined [node={}].",
commitPartNode.name());
+
+ IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
+
+ log.info("Transaction coordinator is chosen [node={}].",
txCrdNode.name());
+
+ RecordView<Tuple> view =
txCrdNode.tables().table(TABLE_NAME).recordView();
+ view.upsert(null, Tuple.create().set("key", 42).set("val", "val1"));
+
+ // Start scan transaction.
+ InternalTransaction rwTx = (InternalTransaction)
txCrdNode.transactions().begin();
+ scanSingleEntryAndLeaveCursorOpen(commitPartNode,
unwrapTableImpl(txCrdNode.tables().table(TABLE_NAME)), rwTx);
+
+ txCrdNode.stop();
+
+ assertTrue(waitForCondition(
+ () -> node(0).clusterNodes().stream().filter(n ->
txCrdNode.id().equals(n.id())).count() == 0,
+ 10_000));
+
+ InternalTransaction conflictTx = (InternalTransaction)
node(0).transactions().begin();
+ runConflictingTransaction(node(0), conflictTx);
+
+ assertTrue(waitForCondition(() -> txStoredState(commitPartNode,
rwTx.id()) == TxState.ABORTED, 10_000));
+ }
+
@Test
public void testWriteIntentRecoverNoCoordinator() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 91cfe46c9e..3788f5475d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -271,10 +271,7 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
storage.releasePartitionSnapshotsReadLock();
}
- // Completing the closure out of the partition snapshots lock to
reduce possibility of deadlocks as it might
- // trigger other actions taking same locks.
- clo.result(result);
-
+ // Adjust safe time before completing update to reduce waiting.
if (command instanceof SafeTimePropagatingCommand) {
SafeTimePropagatingCommand safeTimePropagatingCommand =
(SafeTimePropagatingCommand) command;
@@ -283,6 +280,10 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
updateTrackerIgnoringTrackerClosedException(safeTime,
safeTimePropagatingCommand.safeTime());
}
+ // Completing the closure out of the partition snapshots lock to
reduce possibility of deadlocks as it might
+ // trigger other actions taking same locks.
+ clo.result(result);
+
updateTrackerIgnoringTrackerClosedException(storageIndexTracker,
commandIndex);
});
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 56c78a78b8..83eaed3668 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -347,6 +347,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
private final IndexMetaStorage indexMetaStorage;
+ private static final boolean SKIP_UPDATES =
+
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK);
+
/**
* The constructor.
*
@@ -1207,7 +1210,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
FullyQualifiedResourceId cursorId = cursorId(txId, request.scanId());
- return lockManager.acquire(txId, new LockKey(tableId()), LockMode.S)
+ return lockManager.acquire(txId, new LockKey(replicationGroupId),
LockMode.S)
.thenCompose(tblLock ->
retrieveExactEntriesUntilCursorEmpty(txId, request.coordinatorId(), cursorId,
batchCount));
}
@@ -1266,25 +1269,16 @@ public class PartitionReplicaListener implements
ReplicaListener {
BinaryTuple exactKey = request.exactKey().asBinaryTuple();
- return lockManager.acquire(txId, new LockKey(indexId),
LockMode.IS).thenCompose(idxLock -> { // Index IS lock
- return lockManager.acquire(txId, new LockKey(tableId()),
LockMode.IS) // Table IS lock
- .thenCompose(tblLock -> {
- return lockManager.acquire(txId, new LockKey(indexId,
exactKey.byteBuffer()), LockMode.S)
- .thenCompose(indRowLock -> { // Hash index
bucket S lock
-
- Cursor<RowId> cursor =
remotelyTriggeredResourceRegistry.<CursorResource>register(
- cursorId,
- txCoordinatorId,
- () -> new
CursorResource(indexStorage.get(exactKey))
- ).cursor();
+ return lockManager.acquire(txId, new LockKey(indexId,
exactKey.byteBuffer()), LockMode.S)
+ .thenCompose(indRowLock -> { // Hash index bucket S lock
+ Cursor<RowId> cursor =
remotelyTriggeredResourceRegistry.<CursorResource>register(cursorId,
txCoordinatorId,
+ () -> new
CursorResource(indexStorage.get(exactKey))).cursor();
- var result = new
ArrayList<BinaryRow>(batchCount);
+ var result = new ArrayList<BinaryRow>(batchCount);
- return continueIndexLookup(txId, cursor,
batchCount, result)
- .thenApply(ignore ->
closeCursorIfBatchNotFull(result, batchCount, cursorId));
- });
- });
- });
+ return continueIndexLookup(txId, cursor, batchCount,
result).thenApply(
+ ignore -> closeCursorIfBatchNotFull(result,
batchCount, cursorId));
+ });
}
/**
@@ -1314,61 +1308,56 @@ public class PartitionReplicaListener implements
ReplicaListener {
int flags = request.flags();
- return lockManager.acquire(txId, new LockKey(indexId),
LockMode.IS).thenCompose(idxLock -> { // Index IS lock
- return lockManager.acquire(txId, new LockKey(tableId()),
LockMode.IS) // Table IS lock
- .thenCompose(tblLock -> {
- var comparator = new
BinaryTupleComparator(indexStorage.indexDescriptor().columns());
+ var comparator = new
BinaryTupleComparator(indexStorage.indexDescriptor().columns());
- Predicate<IndexRow> isUpperBoundAchieved = indexRow ->
{
- if (indexRow == null) {
- return true;
- }
+ Predicate<IndexRow> isUpperBoundAchieved = indexRow -> {
+ if (indexRow == null) {
+ return true;
+ }
- if (upperBound == null) {
- return false;
- }
+ if (upperBound == null) {
+ return false;
+ }
- ByteBuffer buffer = upperBound.byteBuffer();
+ ByteBuffer buffer = upperBound.byteBuffer();
- if ((flags & SortedIndexStorage.LESS_OR_EQUAL) !=
0) {
- byte boundFlags = buffer.get(0);
+ if ((flags & SortedIndexStorage.LESS_OR_EQUAL) != 0) {
+ byte boundFlags = buffer.get(0);
- buffer.put(0, (byte) (boundFlags |
BinaryTupleCommon.EQUALITY_FLAG));
- }
+ buffer.put(0, (byte) (boundFlags |
BinaryTupleCommon.EQUALITY_FLAG));
+ }
- return
comparator.compare(indexRow.indexColumns().byteBuffer(), buffer) >= 0;
- };
+ return comparator.compare(indexRow.indexColumns().byteBuffer(),
buffer) >= 0;
+ };
- Cursor<IndexRow> cursor =
remotelyTriggeredResourceRegistry.<CursorResource>register(
- cursorId,
- request.coordinatorId(),
- () -> new CursorResource(indexStorage.scan(
- lowerBound,
- // We have to handle upperBound on a
level of replication listener,
- // for correctness of taking of a
range lock.
- null,
- flags
- ))
- ).cursor();
+ Cursor<IndexRow> cursor =
remotelyTriggeredResourceRegistry.<CursorResource>register(
+ cursorId,
+ request.coordinatorId(),
+ () -> new CursorResource(indexStorage.scan(
+ lowerBound,
+ // We have to handle upperBound on a level of
replication listener,
+ // for correctness of taking of a range lock.
+ null,
+ flags
+ ))
+ ).cursor();
- SortedIndexLocker indexLocker = (SortedIndexLocker)
indexesLockers.get().get(indexId);
+ SortedIndexLocker indexLocker = (SortedIndexLocker)
indexesLockers.get().get(indexId);
- int batchCount = request.batchSize();
+ int batchCount = request.batchSize();
- var result = new ArrayList<BinaryRow>(batchCount);
+ var result = new ArrayList<BinaryRow>(batchCount);
- return continueIndexScan(
- txId,
- schemaAwareIndexStorage,
- indexLocker,
- cursor,
- batchCount,
- result,
- isUpperBoundAchieved,
- tableVersionByTs(beginTimestamp(txId))
- ).thenApply(ignore ->
closeCursorIfBatchNotFull(result, batchCount, cursorId));
- });
- });
+ return continueIndexScan(
+ txId,
+ schemaAwareIndexStorage,
+ indexLocker,
+ cursor,
+ batchCount,
+ result,
+ isUpperBoundAchieved,
+ tableVersionByTs(beginTimestamp(txId))
+ ).thenApply(ignore -> closeCursorIfBatchNotFull(result, batchCount,
cursorId));
}
/**
@@ -1479,7 +1468,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
RowId rowId = currentRow.rowId();
- return lockManager.acquire(txId, new LockKey(tableId(),
rowId), LockMode.S)
+ return lockManager.acquire(txId, new
LockKey(replicationGroupId, rowId), LockMode.S)
.thenComposeAsync(rowLock -> { // Table row S lock
return resolvePlainReadResult(rowId,
txId).thenCompose(resolvedReadResult -> {
BinaryRow binaryRow =
upgrade(binaryRow(resolvedReadResult), tableVersion);
@@ -1530,7 +1519,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
RowId rowId = indexCursor.next();
- return lockManager.acquire(txId, new LockKey(tableId(), rowId),
LockMode.S)
+ return lockManager.acquire(txId, new LockKey(replicationGroupId,
rowId), LockMode.S)
.thenComposeAsync(rowLock -> { // Table row S lock
return resolvePlainReadResult(rowId,
txId).thenCompose(resolvedReadResult -> {
if (resolvedReadResult != null &&
resolvedReadResult.binaryRow() != null) {
@@ -2738,18 +2727,20 @@ public class PartitionReplicaListener implements
ReplicaListener {
);
if (!cmd.full()) {
- // We don't need to take the partition snapshots read lock,
see #INTERNAL_DOC_PLACEHOLDER why.
- storageUpdateHandler.handleUpdate(
- cmd.txId(),
- cmd.rowUuid(),
- cmd.tablePartitionId().asTablePartitionId(),
- cmd.rowToUpdate(),
- true,
- null,
- null,
- null,
- indexIdsAtRwTxBeginTs(txId)
- );
+ if (!SKIP_UPDATES) {
+ // We don't need to take the partition snapshots read
lock, see #INTERNAL_DOC_PLACEHOLDER why.
+ storageUpdateHandler.handleUpdate(
+ cmd.txId(),
+ cmd.rowUuid(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ cmd.rowToUpdate(),
+ true,
+ null,
+ null,
+ null,
+ indexIdsAtRwTxBeginTs(txId)
+ );
+ }
CompletableFuture<UUID> fut =
applyCmdWithExceptionHandling(cmd, new CompletableFuture<>())
.thenApply(res -> cmd.txId());
@@ -2770,7 +2761,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
if (updateCommandResult != null &&
updateCommandResult.isPrimaryInPeersAndLearners()) {
return safeTime.waitFor(((UpdateCommand)
res.getCommand()).safeTime()).thenApply(ignored -> null);
} else {
- if
(!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK))
{
+ if (!SKIP_UPDATES) {
// We don't need to take the partition snapshots
read lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdate(
cmd.txId(),
@@ -3333,8 +3324,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future completes with tuple {@link RowId} and collection of
{@link Lock}.
*/
private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>
takeLocksForUpdate(BinaryRow binaryRow, RowId rowId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
- .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.X))
+ return lockManager.acquire(txId, new LockKey(replicationGroupId),
LockMode.IX)
+ .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(replicationGroupId, rowId), LockMode.X))
.thenCompose(ignored -> takePutLockOnIndexes(binaryRow, rowId,
txId))
.thenApply(shortTermLocks -> new IgniteBiTuple<>(rowId,
shortTermLocks));
}
@@ -3347,7 +3338,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future completes with tuple {@link RowId} and collection of
{@link Lock}.
*/
private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>
takeLocksForInsert(BinaryRow binaryRow, RowId rowId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
// IX lock on table
+ return lockManager.acquire(txId, new LockKey(replicationGroupId),
LockMode.IX)
.thenCompose(ignored -> takePutLockOnIndexes(binaryRow, rowId,
txId))
.thenApply(shortTermLocks -> new IgniteBiTuple<>(rowId,
shortTermLocks));
}
@@ -3405,11 +3396,11 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future completes with {@link RowId} or {@code null} if there is
no value for remove.
*/
private CompletableFuture<RowId> takeLocksForDeleteExact(BinaryRow
expectedRow, RowId rowId, BinaryRow actualRow, UUID txId) {
- return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
// IX lock on table
- .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.S)) // S lock on RowId
+ return lockManager.acquire(txId, new LockKey(replicationGroupId),
LockMode.IX)
+ .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(replicationGroupId, rowId), LockMode.S)) // S lock on RowId
.thenCompose(ignored -> {
if (equalValues(actualRow, expectedRow)) {
- return lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.X) // X lock on RowId
+ return lockManager.acquire(txId, new
LockKey(replicationGroupId, rowId), LockMode.X) // X lock on RowId
.thenCompose(ignored0 ->
takeRemoveLockOnIndexes(actualRow, rowId, txId))
.thenApply(exclusiveRowLock -> rowId);
}
@@ -3425,8 +3416,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future completes with {@link RowId} or {@code null} if there is
no value for the key.
*/
private CompletableFuture<RowId> takeLocksForDelete(BinaryRow binaryRow,
RowId rowId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
// IX lock on table
- .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.X)) // X lock on RowId
+ return lockManager.acquire(txId, new LockKey(replicationGroupId),
LockMode.IX)
+ .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(replicationGroupId, rowId), LockMode.X)) // X lock on RowId
.thenCompose(ignored -> takeRemoveLockOnIndexes(binaryRow,
rowId, txId))
.thenApply(ignored -> rowId);
}
@@ -3438,8 +3429,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Future completes with {@link RowId} or {@code null} if there is
no value for the key.
*/
private CompletableFuture<RowId> takeLocksForGet(RowId rowId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IS)
// IS lock on table
- .thenCompose(tblLock -> lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.S)) // S lock on RowId
+ return lockManager.acquire(txId, new LockKey(replicationGroupId,
rowId), LockMode.S) // S lock on RowId
.thenApply(ignored -> rowId);
}
@@ -3510,11 +3500,11 @@ public class PartitionReplicaListener implements
ReplicaListener {
*/
private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>
takeLocksForReplace(BinaryRow expectedRow, @Nullable BinaryRow oldRow,
BinaryRow newRow, RowId rowId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX)
- .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.S))
+ return lockManager.acquire(txId, new LockKey(replicationGroupId),
LockMode.IX)
+ .thenCompose(ignored -> lockManager.acquire(txId, new
LockKey(replicationGroupId, rowId), LockMode.S))
.thenCompose(ignored -> {
if (oldRow != null && equalValues(oldRow, expectedRow)) {
- return lockManager.acquire(txId, new
LockKey(tableId(), rowId), LockMode.X) // X lock on RowId
+ return lockManager.acquire(txId, new
LockKey(replicationGroupId, rowId), LockMode.X) // X lock on RowId
.thenCompose(ignored1 ->
takePutLockOnIndexes(newRow, rowId, txId))
.thenApply(shortTermLocks -> new
IgniteBiTuple<>(rowId, shortTermLocks));
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java
index 9569c04459..bae2005914 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java
@@ -88,6 +88,6 @@ public class LockKey {
@Override
public String toString() {
- return S.toString(this);
+ return S.toString(LockKey.class, this, "ctx", contextId, "key", key);
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxPriority.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxPriority.java
index 6f3459b4d4..8e9a08e092 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxPriority.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxPriority.java
@@ -26,4 +26,17 @@ import
org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy;
public enum TxPriority {
LOW,
NORMAL;
+
+ /** Enum values. */
+ private static final TxPriority[] VALS = values();
+
+ /**
+ * Efficiently gets enumerated value from its ordinal.
+ *
+ * @param ord Ordinal value.
+ * @return Enumerated value.
+ */
+ public static TxPriority fromOrdinal(int ord) {
+ return VALS[ord];
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
index 4d8f3d529e..60ddf4dad1 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
@@ -18,6 +18,9 @@
package org.apache.ignite.internal.tx.impl;
import static java.util.Collections.emptyList;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.tx.event.LockEvent.LOCK_CONFLICT;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_TIMEOUT_ERR;
@@ -27,11 +30,12 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.EnumMap;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
@@ -41,7 +45,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.event.AbstractEventProducer;
-import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
@@ -54,7 +57,7 @@ import org.apache.ignite.internal.tx.LockMode;
import org.apache.ignite.internal.tx.Waiter;
import org.apache.ignite.internal.tx.event.LockEvent;
import org.apache.ignite.internal.tx.event.LockEventParameters;
-import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.internal.util.IgniteStripedReadWriteLock;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -76,6 +79,11 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
*/
public static final int SLOTS = 131072;
+ /**
+ * Striped lock concurrency.
+ */
+ private static final int CONCURRENCY = Math.max(1,
Runtime.getRuntime().availableProcessors() / 2);
+
/**
* Empty slots.
*/
@@ -104,28 +112,25 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
/**
* Enlisted transactions.
*/
- private final ConcurrentHashMap<UUID, ConcurrentLinkedQueue<LockState>>
txMap = new ConcurrentHashMap<>(1024);
+ private final ConcurrentHashMap<UUID, ConcurrentLinkedQueue<Releasable>>
txMap = new ConcurrentHashMap<>(1024);
/**
- * Parent lock manager.
- * TODO asch Needs optimization
https://issues.apache.org/jira/browse/IGNITE-20895
+ * Coarse locks.
*/
- private final LockManager parentLockManager;
-
- private final EventListener<LockEventParameters>
parentLockConflictListener = this::parentLockConflictListener;
+ private final ConcurrentHashMap<Object, CoarseLockState> coarseMap = new
ConcurrentHashMap<>();
/**
* Constructor.
*/
public HeapLockManager() {
- this(new WaitDieDeadlockPreventionPolicy(), SLOTS, SLOTS, new
HeapUnboundedLockManager());
+ this(new WaitDieDeadlockPreventionPolicy(), SLOTS, SLOTS);
}
/**
* Constructor.
*/
public HeapLockManager(DeadlockPreventionPolicy deadlockPreventionPolicy) {
- this(deadlockPreventionPolicy, SLOTS, SLOTS, new
HeapUnboundedLockManager());
+ this(deadlockPreventionPolicy, SLOTS, SLOTS);
}
/**
@@ -135,12 +140,11 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
* @param maxSize Raw slots size.
* @param mapSize Lock map size.
*/
- public HeapLockManager(DeadlockPreventionPolicy deadlockPreventionPolicy,
int maxSize, int mapSize, LockManager parentLockManager) {
+ public HeapLockManager(DeadlockPreventionPolicy deadlockPreventionPolicy,
int maxSize, int mapSize) {
if (mapSize > maxSize) {
throw new IllegalArgumentException("maxSize=" + maxSize + " <
mapSize=" + mapSize);
}
- this.parentLockManager = Objects.requireNonNull(parentLockManager);
this.deadlockPreventionPolicy = deadlockPreventionPolicy;
this.delayedExecutor = deadlockPreventionPolicy.waitTimeout() > 0
?
CompletableFuture.delayedExecutor(deadlockPreventionPolicy.waitTimeout(),
TimeUnit.MILLISECONDS)
@@ -158,14 +162,14 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
}
slots = tmp; // Atomic init.
-
- parentLockManager.listen(LockEvent.LOCK_CONFLICT,
parentLockConflictListener);
}
@Override
public CompletableFuture<Lock> acquire(UUID txId, LockKey lockKey,
LockMode lockMode) {
- if (lockKey.contextId() == null) { // Treat this lock as a hierarchy
lock.
- return parentLockManager.acquire(txId, lockKey, lockMode);
+ if (lockKey.contextId() == null) { // Treat this lock as a
hierarchy(coarse) lock.
+ CoarseLockState state = coarseMap.computeIfAbsent(lockKey, key ->
new CoarseLockState(lockKey));
+
+ return state.acquire(txId, lockKey, lockMode);
}
while (true) {
@@ -186,6 +190,14 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
@Override
@TestOnly
public void release(Lock lock) {
+ if (lock.lockKey().contextId() == null) {
+ CoarseLockState lockState2 = coarseMap.get(lock.lockKey());
+ if (lockState2 != null) {
+ lockState2.release(lock);
+ }
+ return;
+ }
+
LockState state = lockState(lock.lockKey());
if (state.tryRelease(lock.txId())) {
@@ -195,11 +207,8 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
@Override
public void release(UUID txId, LockKey lockKey, LockMode lockMode) {
- // TODO: Delegation to parentLockManager might change after
https://issues.apache.org/jira/browse/IGNITE-20895
- if (lockKey.contextId() == null) { // Treat this lock as a hierarchy
lock.
- parentLockManager.release(txId, lockKey, lockMode);
-
- return;
+ if (lockKey.contextId() == null) {
+ throw new IllegalArgumentException("Coarse locks don't support
downgrading");
}
LockState state = lockState(lockKey);
@@ -211,42 +220,48 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
@Override
public void releaseAll(UUID txId) {
- ConcurrentLinkedQueue<LockState> states = this.txMap.remove(txId);
+ ConcurrentLinkedQueue<Releasable> states = this.txMap.remove(txId);
if (states != null) {
- for (LockState state : states) {
+ // Default size corresponds to average number of entities used by
transaction. Estimate it to 5.
+ List<Releasable> delayed = new ArrayList<>(4);
+ for (Releasable state : states) {
+ if (state.coarse()) {
+ delayed.add(state); // Delay release.
+ continue;
+ }
+
if (state.tryRelease(txId)) {
- LockKey key = state.key; // State may be already
invalidated.
+ LockKey key = state.key(); // State may be already
invalidated.
if (key != null) {
- locks.compute(key, (k, v) -> adjustLockState(state,
v));
+ locks.compute(key, (k, v) ->
adjustLockState((LockState) state, v));
}
}
}
- }
- parentLockManager.releaseAll(txId);
+ // Unlock coarse locks after all.
+ for (Releasable state : delayed) {
+ state.tryRelease(txId);
+ }
+ }
}
@Override
public Iterator<Lock> locks(UUID txId) {
- ConcurrentLinkedQueue<LockState> lockStates = txMap.get(txId);
-
- // TODO: Delegation to parentLockManager might change after
https://issues.apache.org/jira/browse/IGNITE-20895
- if (lockStates == null) {
- return parentLockManager.locks(txId);
- }
+ ConcurrentLinkedQueue<Releasable> lockStates = txMap.get(txId);
List<Lock> result = new ArrayList<>();
- for (LockState lockState : lockStates) {
- Waiter waiter = lockState.waiter(txId);
-
- if (waiter != null) {
- result.add(new Lock(lockState.key, waiter.lockMode(), txId));
+ if (lockStates != null) {
+ for (Releasable lockState : lockStates) {
+ Lock lock = lockState.lock(txId);
+ if (lock != null) {
+ result.add(lock);
+ }
}
}
- return CollectionUtils.concat(result.iterator(),
parentLockManager.locks(txId));
+ return result.iterator();
}
/**
@@ -302,11 +317,17 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
}
}
- return parentLockManager.isEmpty();
- }
+ for (CoarseLockState value : coarseMap.values()) {
+ if (!value.slockOwners.isEmpty()) {
+ return false;
+ }
- private CompletableFuture<Boolean>
parentLockConflictListener(LockEventParameters params) {
- return fireEvent(LockEvent.LOCK_CONFLICT, params).thenApply(v ->
false);
+ if (!value.ixlockOwners.isEmpty()) {
+ return false;
+ }
+ }
+
+ return true;
}
@Nullable
@@ -328,10 +349,375 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
}
}
+ private void track(UUID txId, Releasable val) {
+ txMap.compute(txId, (k, v) -> {
+ if (v == null) {
+ v = new ConcurrentLinkedQueue<>();
+ }
+
+ v.add(val);
+
+ return v;
+ });
+ }
+
+ /**
+ * Create lock exception with given parameters.
+ *
+ * @param locker Locker.
+ * @param holder Lock holder.
+ * @return Lock exception.
+ */
+ private static LockException lockException(UUID locker, UUID holder) {
+ return new LockException(ACQUIRE_LOCK_ERR,
+ "Failed to acquire a lock due to a possible deadlock [locker="
+ locker + ", holder=" + holder + ']');
+ }
+
+ /**
+ * Create lock exception when lock holder is believed to be missing.
+ *
+ * @param locker Locker.
+ * @param holder Lock holder.
+ * @return Lock exception.
+ */
+ private static LockException abandonedLockException(UUID locker, UUID
holder) {
+ return new LockException(ACQUIRE_LOCK_ERR,
+ "Failed to acquire an abandoned lock due to a possible
deadlock [locker=" + locker + ", holder=" + holder + ']');
+ }
+
+ /**
+ * Create coarse lock exception.
+ *
+ * @param locker Locker.
+ * @param holder Lock holder.
+ * @param abandoned If locker is abandoned.
+ * @return Lock exception.
+ */
+ private static LockException coarseLockException(UUID locker, UUID holder,
boolean abandoned) {
+ return new LockException(ACQUIRE_LOCK_ERR,
+ "Failed to acquire the intention table lock due to a conflict
[locker=" + locker + ", holder=" + holder + ", abandoned="
+ + abandoned + ']');
+ }
+
+ /**
+ * Common interface for releasing transaction locks.
+ */
+ interface Releasable {
+ /**
+ * Tries to release a lock.
+ *
+ * @param txId Tx id.
+ * @return {@code True} if lock state requires cleanup after release.
+ */
+ boolean tryRelease(UUID txId);
+
+ /**
+ * Gets associated lock key.
+ *
+ * @return Lock key.
+ */
+ LockKey key();
+
+ /**
+ * Returns the lock which is requested by given tx.
+ *
+ * @param txId Tx id.
+ * @return The lock or null if no lock exist.
+ */
+ @Nullable Lock lock(UUID txId);
+
+ /**
+ * Returns lock type.
+ *
+ * @return The type.
+ */
+ boolean coarse();
+ }
+
+ /**
+ * Coarse lock.
+ */
+ public class CoarseLockState implements Releasable {
+ private final IgniteStripedReadWriteLock stripedLock = new
IgniteStripedReadWriteLock(CONCURRENCY);
+ private final ConcurrentHashMap<UUID, Lock> ixlockOwners = new
ConcurrentHashMap<>();
+ private final Map<UUID, IgniteBiTuple<Lock, CompletableFuture<Lock>>>
slockWaiters = new HashMap<>();
+ private final ConcurrentHashMap<UUID, Lock> slockOwners = new
ConcurrentHashMap<>();
+ private final LockKey lockKey;
+ private final Comparator<UUID> txComparator;
+
+ CoarseLockState(LockKey lockKey) {
+ this.lockKey = lockKey;
+ txComparator =
+ deadlockPreventionPolicy.txIdComparator() != null ?
deadlockPreventionPolicy.txIdComparator() : UUID::compareTo;
+ }
+
+ @Override
+ public boolean tryRelease(UUID txId) {
+ Lock lock = lock(txId);
+
+ release(lock);
+
+ return false;
+ }
+
+ @Override
+ public LockKey key() {
+ return lockKey;
+ }
+
+ @Override
+ public Lock lock(UUID txId) {
+ Lock lock = ixlockOwners.get(txId);
+
+ if (lock != null) {
+ return lock;
+ }
+
+ int idx = Math.floorMod(spread(txId.hashCode()), CONCURRENCY);
+
+ stripedLock.readLock(idx).lock();
+
+ try {
+ lock = slockOwners.get(txId);
+
+ if (lock != null) {
+ return lock;
+ }
+
+ IgniteBiTuple<Lock, CompletableFuture<Lock>> tuple =
slockWaiters.get(txId);
+
+ if (tuple != null) {
+ return tuple.get1();
+ }
+ } finally {
+ stripedLock.readLock(idx).unlock();
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean coarse() {
+ return true;
+ }
+
+ /**
+ * Acquires a lock.
+ *
+ * @param txId Tx id.
+ * @param lockKey Lock key.
+ * @param lockMode Lock mode.
+ * @return The future.
+ */
+ public CompletableFuture<Lock> acquire(UUID txId, LockKey lockKey,
LockMode lockMode) {
+ switch (lockMode) {
+ case S:
+ stripedLock.writeLock().lock();
+
+ try {
+ // IX-locks can't be modified under the striped write
lock.
+ if (!ixlockOwners.isEmpty()) {
+ if (ixlockOwners.containsKey(txId)) {
+ if (ixlockOwners.size() == 1) {
+ // Safe to upgrade.
+ track(txId, this); // Double track.
+ Lock lock = new Lock(lockKey, lockMode,
txId);
+ slockOwners.putIfAbsent(txId, lock);
+ return completedFuture(lock);
+ } else {
+ // Attempt to upgrade to SIX in the
presence of concurrent transactions. Deny lock attempt.
+ for (Lock lock : ixlockOwners.values()) {
+ if (!lock.txId().equals(txId)) {
+ return notifyAndFail(txId,
lock.txId());
+ }
+ }
+ }
+
+ assert false : "Should not reach here";
+ }
+
+ // Validate reordering with IX locks if prevention
is enabled.
+ if (deadlockPreventionPolicy.usePriority()) {
+ for (Lock lock : ixlockOwners.values()) {
+ // Allow only high priority transactions
to wait.
+ if (txComparator.compare(lock.txId(),
txId) < 0) {
+ return notifyAndFail(txId,
lock.txId());
+ }
+ }
+ }
+
+ track(txId, this);
+
+ CompletableFuture<Lock> fut = new
CompletableFuture<>();
+ IgniteBiTuple<Lock, CompletableFuture<Lock>> prev
= slockWaiters.putIfAbsent(txId,
+ new IgniteBiTuple<>(new Lock(lockKey,
lockMode, txId), fut));
+ return prev == null ? fut : prev.get2();
+ } else {
+ Lock lock = new Lock(lockKey, lockMode, txId);
+ Lock prev = slockOwners.putIfAbsent(txId, lock);
+
+ if (prev == null) {
+ track(txId, this); // Do not track on reenter.
+ }
+
+ return completedFuture(lock);
+ }
+ } finally {
+ stripedLock.writeLock().unlock();
+ }
+
+ case IX:
+ int idx = Math.floorMod(spread(txId.hashCode()),
CONCURRENCY);
+
+ stripedLock.readLock(idx).lock();
+
+ try {
+ // S-locks can't be modified under the striped read
lock.
+ if (!slockOwners.isEmpty()) {
+ if (slockOwners.containsKey(txId)) {
+ if (slockOwners.size() == 1) {
+ // Safe to upgrade.
+ track(txId, this); // Double track.
+ Lock lock = new Lock(lockKey, lockMode,
txId);
+ ixlockOwners.putIfAbsent(txId, lock);
+ return completedFuture(lock);
+ } else {
+ // Attempt to upgrade to SIX in the
presence of concurrent transactions. Deny lock attempt.
+ for (Lock lock : slockOwners.values()) {
+ if (!lock.txId().equals(txId)) {
+ return notifyAndFail(txId,
lock.txId());
+ }
+ }
+ }
+
+ assert false : "Should not reach here";
+ }
+
+ // IX locks never allowed to wait.
+ UUID holderTx =
slockOwners.keySet().iterator().next();
+ return notifyAndFail(txId, holderTx);
+ } else {
+ Lock lock = new Lock(lockKey, lockMode, txId);
+ Lock prev = ixlockOwners.putIfAbsent(txId, lock);
// Avoid overwrite existing lock.
+
+ if (prev == null) {
+ track(txId, this); // Do not track on reenter.
+ }
+
+ return completedFuture(lock);
+ }
+ } finally {
+ stripedLock.readLock(idx).unlock();
+ }
+
+ default:
+ assert false : "Unsupported coarse lock mode: " + lockMode;
+
+ return null; // Should not be here.
+ }
+ }
+
+ /**
+ * Triggers event and fails.
+ *
+ * @param txId Tx id.
+ * @param holderId Holder tx id.
+ * @return Failed future.
+ */
+ CompletableFuture<Lock> notifyAndFail(UUID txId, UUID holderId) {
+ CompletableFuture<Void> res = fireEvent(LOCK_CONFLICT, new
LockEventParameters(txId, holderId));
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-21153
+ return failedFuture(coarseLockException(txId, holderId,
res.isCompletedExceptionally()));
+ }
+
+ /**
+ * Releases the lock. Should be called from {@link #releaseAll(UUID)}.
+ *
+ * @param lock The lock.
+ */
+ public void release(@Nullable Lock lock) {
+ if (lock == null) {
+ return;
+ }
+
+ switch (lock.lockMode()) {
+ case S:
+ IgniteBiTuple<Lock, CompletableFuture<Lock>> waiter = null;
+
+ stripedLock.writeLock().lock();
+
+ try {
+ Lock removed = slockOwners.remove(lock.txId());
+
+ if (removed == null) {
+ waiter = slockWaiters.remove(lock.txId());
+
+ if (waiter != null) {
+ removed = waiter.get1();
+ }
+ }
+
+ assert removed != null : "Attempt to release not
requested lock: " + lock.txId();
+ } finally {
+ stripedLock.writeLock().unlock();
+ }
+
+ if (waiter != null) {
+ waiter.get2().complete(waiter.get1());
+ }
+
+ break;
+ case IX:
+ int idx = Math.floorMod(spread(lock.txId().hashCode()),
CONCURRENCY);
+
+ Map<UUID, IgniteBiTuple<Lock, CompletableFuture<Lock>>>
wakeups;
+
+ stripedLock.readLock(idx).lock();
+
+ try {
+ var removed = ixlockOwners.remove(lock.txId());
+
+ assert removed != null : "Attempt to release not
acquired lock: " + lock.txId();
+
+ if (slockWaiters.isEmpty()) {
+ return; // Nothing to do.
+ }
+
+ if (!ixlockOwners.isEmpty()) {
+ assert slockOwners.isEmpty() ||
slockOwners.containsKey(lock.txId());
+
+ return; // Nothing to do.
+ }
+
+ // No race here because no new locks can be acquired
after releaseAll due to 2-phase locking protocol.
+
+ // Promote waiters to owners.
+ wakeups = new HashMap<>(slockWaiters);
+
+ slockWaiters.clear();
+
+ for (IgniteBiTuple<Lock, CompletableFuture<Lock>>
value : wakeups.values()) {
+ slockOwners.put(value.getKey().txId(),
value.getKey());
+ }
+ } finally {
+ stripedLock.readLock(idx).unlock();
+ }
+
+ for (Entry<UUID, IgniteBiTuple<Lock,
CompletableFuture<Lock>>> entry : wakeups.entrySet()) {
+
entry.getValue().get2().complete(entry.getValue().get1());
+ }
+
+ break;
+ default:
+ assert false : "Unsupported coarse unlock mode: " +
lock.lockMode();
+ }
+ }
+ }
+
/**
- * A lock state.
+ * Key lock.
*/
- public class LockState {
+ public class LockState implements Releasable {
/** Waiters. */
private final TreeMap<UUID, WaiterImpl> waiters;
@@ -348,6 +734,27 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
this.waiters = new TreeMap<>(txComparator);
}
+ @Override
+ public LockKey key() {
+ return key;
+ }
+
+ @Override
+ public Lock lock(UUID txId) {
+ Waiter waiter = waiters.get(txId);
+
+ if (waiter != null) {
+ return new Lock(key, waiter.lockMode(), txId);
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean coarse() {
+ return false;
+ }
+
/**
* Attempts to acquire a lock for the specified {@code key} in
specified lock mode.
*
@@ -393,7 +800,7 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
// Put to wait queue, track.
if (prev == null) {
- track(waiter.txId);
+ track(waiter.txId, this);
}
return new IgniteBiTuple<>(waiter.fut, waiter.lockMode());
@@ -406,7 +813,7 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
} else {
// Lock granted, track.
if (prev == null) {
- track(waiter.txId);
+ track(waiter.txId, this);
}
}
}
@@ -437,15 +844,15 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean
skipFail) {
for (Map.Entry<UUID, WaiterImpl> entry :
waiters.tailMap(waiter.txId(), false).entrySet()) {
WaiterImpl tmp = entry.getValue();
- LockMode mode = lockedMode(tmp);
+ LockMode mode = tmp.lockMode;
if (mode != null &&
!mode.isCompatible(waiter.intendedLockMode())) {
if (conflictFound(waiter.txId(), tmp.txId())) {
- waiter.fail(abandonedLockException(waiter, tmp));
+ waiter.fail(abandonedLockException(waiter.txId,
tmp.txId));
return true;
} else if (!deadlockPreventionPolicy.usePriority() &&
deadlockPreventionPolicy.waitTimeout() == 0) {
- waiter.fail(lockException(waiter, tmp));
+ waiter.fail(lockException(waiter.txId, tmp.txId));
return true;
}
@@ -456,17 +863,17 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
for (Map.Entry<UUID, WaiterImpl> entry :
waiters.headMap(waiter.txId()).entrySet()) {
WaiterImpl tmp = entry.getValue();
- LockMode mode = lockedMode(tmp);
+ LockMode mode = tmp.lockMode;
if (mode != null &&
!mode.isCompatible(waiter.intendedLockMode())) {
if (skipFail) {
return false;
} else if (conflictFound(waiter.txId(), tmp.txId())) {
- waiter.fail(abandonedLockException(waiter, tmp));
+ waiter.fail(abandonedLockException(waiter.txId,
tmp.txId));
return true;
} else if (deadlockPreventionPolicy.waitTimeout() == 0) {
- waiter.fail(lockException(waiter, tmp));
+ waiter.fail(lockException(waiter.txId, tmp.txId));
return true;
} else {
@@ -480,37 +887,14 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
return true;
}
- /**
- * Create lock exception with given parameters.
- *
- * @param locker Locker.
- * @param holder Lock holder.
- * @return Lock exception.
- */
- private LockException lockException(WaiterImpl locker, WaiterImpl
holder) {
- return new LockException(ACQUIRE_LOCK_ERR,
- "Failed to acquire a lock due to a possible deadlock
[locker=" + locker + ", holder=" + holder + ']');
- }
-
- /**
- * Create lock exception when lock holder is believed to be missing.
- *
- * @param locker Locker.
- * @param holder Lock holder.
- * @return Lock exception.
- */
- private LockException abandonedLockException(WaiterImpl locker,
WaiterImpl holder) {
- return new LockException(ACQUIRE_LOCK_ERR,
- "Failed to acquire an abandoned lock due to a possible
deadlock [locker=" + locker + ", holder=" + holder + ']');
- }
-
/**
* Attempts to release a lock for the specified {@code key} in
exclusive mode.
*
* @param txId Transaction id.
* @return {@code True} if the queue is empty.
*/
- boolean tryRelease(UUID txId) {
+ @Override
+ public boolean tryRelease(UUID txId) {
Collection<WaiterImpl> toNotify;
synchronized (waiters) {
@@ -639,22 +1023,6 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
});
}
- /**
- * Gets a lock mode for this waiter.
- *
- * @param waiter Waiter.
- * @return Lock mode, which is held by the waiter or {@code null}, if
the waiter holds nothing.
- */
- private LockMode lockedMode(WaiterImpl waiter) {
- LockMode mode = null;
-
- if (waiter.locked()) {
- mode = waiter.lockMode();
- }
-
- return mode;
- }
-
/**
* Returns a collection of timestamps that is associated with the
specified {@code key}.
*
@@ -678,18 +1046,6 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
}
}
- private void track(UUID txId) {
- txMap.compute(txId, (k, v) -> {
- if (v == null) {
- v = new ConcurrentLinkedQueue<>();
- }
-
- v.add(this);
-
- return v;
- });
- }
-
/**
* Notifies about the lock conflict found between transactions.
*
@@ -697,7 +1053,7 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
* @param holderTx Transaction which holds the lock.
*/
private boolean conflictFound(UUID acquirerTx, UUID holderTx) {
- CompletableFuture<Void> eventResult =
fireEvent(LockEvent.LOCK_CONFLICT, new LockEventParameters(acquirerTx,
holderTx));
+ CompletableFuture<Void> eventResult = fireEvent(LOCK_CONFLICT, new
LockEventParameters(acquirerTx, holderTx));
// No async handling is expected.
// TODO: https://issues.apache.org/jira/browse/IGNITE-21153
assert eventResult.isDone() : "Async lock conflict handling is not
supported";
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapUnboundedLockManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapUnboundedLockManager.java
deleted file mode 100644
index 0c05ecb27f..0000000000
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapUnboundedLockManager.java
+++ /dev/null
@@ -1,775 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.tx.impl;
-
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
-import static
org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_TIMEOUT_ERR;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.internal.event.AbstractEventProducer;
-import org.apache.ignite.internal.lang.IgniteBiTuple;
-import org.apache.ignite.internal.tostring.IgniteToStringExclude;
-import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.internal.tx.DeadlockPreventionPolicy;
-import org.apache.ignite.internal.tx.Lock;
-import org.apache.ignite.internal.tx.LockException;
-import org.apache.ignite.internal.tx.LockKey;
-import org.apache.ignite.internal.tx.LockManager;
-import org.apache.ignite.internal.tx.LockMode;
-import org.apache.ignite.internal.tx.Waiter;
-import org.apache.ignite.internal.tx.event.LockEvent;
-import org.apache.ignite.internal.tx.event.LockEventParameters;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * A {@link LockManager} which uses unbounded hashtable implementation.
Suitable for holding coarse-grained locks.
- */
-public class HeapUnboundedLockManager extends AbstractEventProducer<LockEvent,
LockEventParameters> implements LockManager {
- /** Locks. */
- private final ConcurrentHashMap<LockKey, LockState> locks = new
ConcurrentHashMap<>();
-
- /** Prevention policy. */
- private final DeadlockPreventionPolicy deadlockPreventionPolicy;
-
- /** Executor that is used to fail waiters after timeout. */
- private final Executor delayedExecutor;
-
- /**
- * Constructor.
- */
- public HeapUnboundedLockManager() {
- this(new WaitDieDeadlockPreventionPolicy());
- }
-
- /**
- * Constructor.
- *
- * @param deadlockPreventionPolicy Deadlock prevention policy.
- */
- public HeapUnboundedLockManager(DeadlockPreventionPolicy
deadlockPreventionPolicy) {
- this.deadlockPreventionPolicy = deadlockPreventionPolicy;
- this.delayedExecutor = deadlockPreventionPolicy.waitTimeout() > 0
- ?
CompletableFuture.delayedExecutor(deadlockPreventionPolicy.waitTimeout(),
TimeUnit.MILLISECONDS)
- : null;
- }
-
- @Override
- public CompletableFuture<Lock> acquire(UUID txId, LockKey lockKey,
LockMode lockMode) {
- while (true) {
- LockState state = lockState(lockKey);
-
- IgniteBiTuple<CompletableFuture<Void>, LockMode> futureTuple =
state.tryAcquire(txId, lockMode);
-
- if (futureTuple.get1() == null) {
- continue; // Obsolete state.
- }
-
- LockMode newLockMode = futureTuple.get2();
-
- return futureTuple.get1().thenApply(res -> new Lock(lockKey,
newLockMode, txId));
- }
- }
-
- @Override
- public void release(Lock lock) {
- LockState state = lockState(lock.lockKey());
-
- if (state.tryRelease(lock.txId())) {
- locks.remove(lock.lockKey(), state);
- }
- }
-
- @Override
- public void release(UUID txId, LockKey lockKey, LockMode lockMode) {
- LockState state = lockState(lockKey);
-
- if (state.tryRelease(txId, lockMode)) {
- locks.remove(lockKey, state);
- }
- }
-
- @Override
- public void releaseAll(UUID txId) {
- Iterator<Lock> locks = locks(txId);
-
- while (locks.hasNext()) {
- Lock lock = locks.next();
- release(lock);
- }
- }
-
- @Override
- public Iterator<Lock> locks(UUID txId) {
- // Decently ok to do full scan here because coarse-grained locks are a
few in quantity.
- List<Lock> result = new ArrayList<>();
-
- for (Map.Entry<LockKey, LockState> entry : locks.entrySet()) {
- Waiter waiter = entry.getValue().waiter(txId);
-
- if (waiter != null) {
- result.add(
- new Lock(
- entry.getKey(),
- waiter.lockMode(),
- txId
- )
- );
- }
- }
-
- return result.iterator();
- }
-
- /**
- * Returns the lock state for the key.
- *
- * @param key The key.
- */
- private LockState lockState(LockKey key) {
- return locks.computeIfAbsent(key, k -> new
LockState(deadlockPreventionPolicy, delayedExecutor));
- }
-
- /** {@inheritDoc} */
- @Override
- public Collection<UUID> queue(LockKey key) {
- return lockState(key).queue();
- }
-
- /** {@inheritDoc} */
- @Override
- public Waiter waiter(LockKey key, UUID txId) {
- return lockState(key).waiter(txId);
- }
-
- /**
- * A lock state.
- */
- private class LockState {
- /** Waiters. */
- private final TreeMap<UUID, WaiterImpl> waiters;
-
- private final DeadlockPreventionPolicy deadlockPreventionPolicy;
-
- /** Delayed executor for waiters timeout callback. */
- private final Executor delayedExecutor;
-
- /** Marked for removal flag. */
- private volatile boolean markedForRemove = false;
-
- public LockState(DeadlockPreventionPolicy deadlockPreventionPolicy,
Executor delayedExecutor) {
- Comparator<UUID> txComparator =
- deadlockPreventionPolicy.txIdComparator() != null ?
deadlockPreventionPolicy.txIdComparator() : UUID::compareTo;
-
- this.waiters = new TreeMap<>(txComparator);
- this.deadlockPreventionPolicy = deadlockPreventionPolicy;
- this.delayedExecutor = delayedExecutor;
- }
-
- /**
- * Attempts to acquire a lock for the specified {@code key} in
specified lock mode.
- *
- * @param txId Transaction id.
- * @param lockMode Lock mode.
- * @return The future or null if state is marked for removal and
acquired lock mode.
- */
- public @Nullable IgniteBiTuple<CompletableFuture<Void>, LockMode>
tryAcquire(UUID txId, LockMode lockMode) {
- WaiterImpl waiter = new WaiterImpl(txId, lockMode);
-
- synchronized (waiters) {
- if (markedForRemove) {
- return new IgniteBiTuple(null, lockMode);
- }
-
- // We always replace the previous waiter with the new one. If
the previous waiter has lock intention then incomplete
- // lock future is copied to the new waiter. This guarantees
that, if the previous waiter was locked concurrently, then
- // it doesn't have any lock intentions, and the future is not
copied to the new waiter. Otherwise, if there is lock
- // intention, this means that the lock future contained in
previous waiter, is not going to be completed and can be
- // copied safely.
- WaiterImpl prev = waiters.put(txId, waiter);
-
- // Reenter
- if (prev != null) {
- if (prev.locked() &&
prev.lockMode().allowReenter(lockMode)) {
- waiter.lock();
-
- waiter.upgrade(prev);
-
- return new IgniteBiTuple(completedFuture(null),
prev.lockMode());
- } else {
- waiter.upgrade(prev);
-
- assert prev.lockMode() == waiter.lockMode() :
- "Lock modes are incorrect [prev=" +
prev.lockMode() + ", new=" + waiter.lockMode() + ']';
- }
- }
-
- if (!isWaiterReadyToNotify(waiter, false)) {
- if (deadlockPreventionPolicy.waitTimeout() > 0) {
- setWaiterTimeout(waiter);
- }
-
- return new IgniteBiTuple<>(waiter.fut, waiter.lockMode());
- }
-
- if (!waiter.locked()) {
- waiters.remove(waiter.txId());
- } else if (waiter.hasLockIntent()) {
- waiter.refuseIntent(); // Restore old lock.
- }
- }
-
- // Notify outside the monitor.
- waiter.notifyLocked();
-
- return new IgniteBiTuple(waiter.fut, waiter.lockMode());
- }
-
- /**
- * Checks current waiter. It can change the internal state of the
waiter.
- *
- * @param waiter Checked waiter.
- * @return True if current waiter ready to notify, false otherwise.
- */
- private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean
skipFail) {
- for (Map.Entry<UUID, WaiterImpl> entry :
waiters.tailMap(waiter.txId(), false).entrySet()) {
- WaiterImpl tmp = entry.getValue();
- LockMode mode = lockedMode(tmp);
-
- if (mode != null &&
!mode.isCompatible(waiter.intendedLockMode())) {
- if (conflictFound(waiter.txId(), tmp.txId())) {
- waiter.fail(abandonedLockException(waiter, tmp));
-
- return true;
- } else if (!deadlockPreventionPolicy.usePriority() &&
deadlockPreventionPolicy.waitTimeout() == 0) {
- waiter.fail(lockException(waiter, tmp));
-
- return true;
- }
-
- return false;
- }
- }
-
- for (Map.Entry<UUID, WaiterImpl> entry :
waiters.headMap(waiter.txId()).entrySet()) {
- WaiterImpl tmp = entry.getValue();
- LockMode mode = lockedMode(tmp);
-
- if (mode != null &&
!mode.isCompatible(waiter.intendedLockMode())) {
- if (skipFail) {
- return false;
- } else if (conflictFound(waiter.txId(), tmp.txId())) {
- waiter.fail(abandonedLockException(waiter, tmp));
-
- return true;
- } else if (deadlockPreventionPolicy.waitTimeout() == 0) {
- waiter.fail(lockException(waiter, tmp));
-
- return true;
- } else {
- return false;
- }
- }
- }
-
- waiter.lock();
-
- return true;
- }
-
- /**
- * Create lock exception with given parameters.
- *
- * @param locker Locker.
- * @param holder Lock holder.
- * @return Lock exception.
- */
- private LockException lockException(Waiter locker, Waiter holder) {
- return new LockException(ACQUIRE_LOCK_ERR,
- "Failed to acquire a lock due to a possible deadlock
[locker=" + locker + ", holder=" + holder + ']');
- }
-
- /**
- * Create lock exception when lock holder is believed to be missing.
- *
- * @param locker Locker.
- * @param holder Lock holder.
- * @return Lock exception.
- */
- private LockException abandonedLockException(WaiterImpl locker,
WaiterImpl holder) {
- return new LockException(ACQUIRE_LOCK_ERR,
- "Failed to acquire an abandoned lock due to a possible
deadlock [locker=" + locker + ", holder=" + holder + ']');
- }
-
- /**
- * Attempts to release a lock for the specified {@code key} in
exclusive mode.
- *
- * @param txId Transaction id.
- * @return {@code True} if the queue is empty.
- */
- public boolean tryRelease(UUID txId) {
- Collection<WaiterImpl> toNotify;
-
- synchronized (waiters) {
- toNotify = release(txId);
- }
-
- // Notify outside the monitor.
- for (WaiterImpl waiter : toNotify) {
- waiter.notifyLocked();
- }
-
- return markedForRemove;
- }
-
- /**
- * Releases a specific lock of the key.
- *
- * @param txId Transaction id.
- * @param lockMode Lock mode.
- * @return If the value is true, no one waits of any lock of the key,
false otherwise.
- */
- public boolean tryRelease(UUID txId, LockMode lockMode) {
- List<WaiterImpl> toNotify = Collections.emptyList();
- synchronized (waiters) {
- WaiterImpl waiter = waiters.get(txId);
-
- if (waiter != null) {
- assert lockMode.supremum(lockMode, waiter.lockMode()) ==
waiter.lockMode() :
- "The lock mode is not locked [mode=" + lockMode +
", locked=" + waiter.lockMode() + ']';
-
- LockMode modeFromDowngrade =
waiter.recalculateMode(lockMode);
-
- if (!waiter.locked() && !waiter.hasLockIntent()) {
- toNotify = release(txId);
- } else if (modeFromDowngrade != waiter.lockMode()) {
- toNotify = unlockCompatibleWaiters();
- }
- }
- }
-
- // Notify outside the monitor.
- for (WaiterImpl waiter : toNotify) {
- waiter.notifyLocked();
- }
-
- return markedForRemove;
- }
-
- /**
- * Releases all locks are held by a specific transaction.
- * This method should be invoked synchronously.
- *
- * @param txId Transaction id.
- * @return List of waiters to notify.
- */
- private List<WaiterImpl> release(UUID txId) {
- waiters.remove(txId);
-
- if (waiters.isEmpty()) {
- markedForRemove = true;
-
- return Collections.emptyList();
- }
-
- List<WaiterImpl> toNotify = unlockCompatibleWaiters();
-
- return toNotify;
- }
-
- /**
- * Unlock compatible waiters.
- *
- * @return List of waiters to notify.
- */
- private List<WaiterImpl> unlockCompatibleWaiters() {
- if (!deadlockPreventionPolicy.usePriority() &&
deadlockPreventionPolicy.waitTimeout() == 0) {
- return Collections.emptyList();
- }
-
- ArrayList<WaiterImpl> toNotify = new ArrayList<>();
- Set<UUID> toFail = new HashSet<>();
-
- for (Map.Entry<UUID, WaiterImpl> entry : waiters.entrySet()) {
- WaiterImpl tmp = entry.getValue();
-
- if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, true)) {
- assert !tmp.hasLockIntent() : "This waiter in not locked
for notification [waiter=" + tmp + ']';
-
- toNotify.add(tmp);
- }
- }
-
- if (deadlockPreventionPolicy.usePriority() &&
deadlockPreventionPolicy.waitTimeout() >= 0) {
- for (Map.Entry<UUID, WaiterImpl> entry : waiters.entrySet()) {
- WaiterImpl tmp = entry.getValue();
-
- if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp,
false)) {
- assert tmp.hasLockIntent() : "Only failed waiter can
be notified here [waiter=" + tmp + ']';
-
- toNotify.add(tmp);
- toFail.add(tmp.txId());
- }
- }
-
- for (UUID failTx : toFail) {
- var w = waiters.get(failTx);
-
- if (w.locked()) {
- w.refuseIntent();
- } else {
- waiters.remove(failTx);
- }
- }
- }
-
- return toNotify;
- }
-
- /**
- * Makes the waiter fail after specified timeout (in milliseconds), if
intended lock was not acquired within this timeout.
- *
- * @param waiter Waiter.
- */
- private void setWaiterTimeout(WaiterImpl waiter) {
- delayedExecutor.execute(() -> {
- if (!waiter.fut.isDone()) {
- waiter.fut.completeExceptionally(new
LockException(ACQUIRE_LOCK_TIMEOUT_ERR, "Failed to acquire a lock due to "
- + "timeout [txId=" + waiter.txId() + ", waiter=" +
waiter
- + ", timeout=" +
deadlockPreventionPolicy.waitTimeout() + ']'));
- }
- });
- }
-
- /**
- * Gets a lock mode for this waiter.
- *
- * @param waiter Waiter.
- * @return Lock mode, which is held by the waiter or {@code null}, if
the waiter holds nothing.
- */
- private LockMode lockedMode(WaiterImpl waiter) {
- LockMode mode = null;
-
- if (waiter.locked()) {
- mode = waiter.lockMode();
- }
-
- return mode;
- }
-
- /**
- * Returns a collection of timestamps that is associated with the
specified {@code key}.
- *
- * @return The waiters queue.
- */
- public Collection<UUID> queue() {
- synchronized (waiters) {
- return new ArrayList<>(waiters.keySet());
- }
- }
-
- /**
- * Returns a waiter for the specified {@code key}.
- *
- * @param txId Transaction id.
- * @return The waiter.
- */
- public Waiter waiter(UUID txId) {
- synchronized (waiters) {
- return waiters.get(txId);
- }
- }
-
- /**
- * Notifies about the lock conflict found between transactions.
- *
- * @param acquirerTx Transaction which tries to acquire the lock.
- * @param holderTx Transaction which holds the lock.
- */
- private boolean conflictFound(UUID acquirerTx, UUID holderTx) {
- CompletableFuture<Void> eventResult =
fireEvent(LockEvent.LOCK_CONFLICT, new LockEventParameters(acquirerTx,
holderTx));
- // No async handling is expected.
- // TODO: https://issues.apache.org/jira/browse/IGNITE-21153
- assert eventResult.isDone() : "Async lock conflict handling is not
supported";
-
- return eventResult.isCompletedExceptionally();
- }
- }
-
- /**
- * A waiter implementation.
- */
- private static class WaiterImpl implements Comparable<WaiterImpl>, Waiter {
- /**
- * Holding locks by type.
- * TODO: IGNITE-18350 Abandon the collection in favor of BitSet.
- */
- private final Map<LockMode, Integer> locks = new HashMap<>();
-
- /**
- * Lock modes are marked as intended, but have not taken yet. This is
NOT specific to intention lock modes, such as IS and IX.
- * TODO: IGNITE-18350 Abandon the collection in favor of BitSet.
- */
- private final Set<LockMode> intendedLocks = new HashSet<>();
-
- /** Locked future. */
- @IgniteToStringExclude
- private CompletableFuture<Void> fut;
-
- /** Waiter transaction id. */
- private final UUID txId;
-
- /** The lock mode to intend to hold. This is NOT specific to intention
lock modes, such as IS and IX. */
- private LockMode intendedLockMode;
-
- /** The lock mode. */
- private LockMode lockMode;
-
- /**
- * The filed has a value when the waiter couldn't lock a key.
- */
- private LockException ex;
-
- /**
- * The constructor.
- *
- * @param txId Transaction id.
- * @param lockMode Lock mode.
- */
- WaiterImpl(UUID txId, LockMode lockMode) {
- this.fut = new CompletableFuture<>();
- this.txId = txId;
- this.intendedLockMode = lockMode;
-
- locks.put(lockMode, 1);
- intendedLocks.add(lockMode);
- }
-
- /**
- * Adds a lock mode.
- *
- * @param lockMode Lock mode.
- * @param increment Value to increment amount.
- */
- void addLock(LockMode lockMode, int increment) {
- locks.merge(lockMode, increment, Integer::sum);
- }
-
- /**
- * Removes a lock mode.
- *
- * @param lockMode Lock mode.
- * @return True if the lock mode was removed, false otherwise.
- */
- private boolean removeLock(LockMode lockMode) {
- Integer counter = locks.get(lockMode);
-
- if (counter == null || counter < 2) {
- locks.remove(lockMode);
-
- return true;
- } else {
- locks.put(lockMode, counter - 1);
-
- return false;
- }
- }
-
- /**
- * Recalculates lock mode based of all locks which the waiter has
taken.
- *
- * @param modeToRemove Mode without which, the recalculation will
happen.
- * @return Previous lock mode.
- */
- LockMode recalculateMode(LockMode modeToRemove) {
- if (!removeLock(modeToRemove)) {
- return lockMode;
- }
-
- return recalculate();
- }
-
- /**
- * Recalculates lock supremums.
- *
- * @return Previous lock mode.
- */
- private LockMode recalculate() {
- LockMode newIntendedLockMode = null;
- LockMode newLockMode = null;
-
- for (LockMode mode : locks.keySet()) {
- assert locks.get(mode) > 0 : "Incorrect lock counter [txId=" +
txId + ", mode=" + mode + "]";
-
- if (intendedLocks.contains(mode)) {
- newIntendedLockMode = newIntendedLockMode == null ? mode :
LockMode.supremum(newIntendedLockMode, mode);
- } else {
- newLockMode = newLockMode == null ? mode :
LockMode.supremum(newLockMode, mode);
- }
- }
-
- LockMode mode = lockMode;
-
- lockMode = newLockMode;
- intendedLockMode = newLockMode != null && newIntendedLockMode !=
null ? LockMode.supremum(newLockMode, newIntendedLockMode)
- : newIntendedLockMode;
-
- return mode;
- }
-
- /**
- * Merge all locks that were held by another waiter to the current one.
- *
- * @param other Other waiter.
- */
- void upgrade(WaiterImpl other) {
- intendedLocks.addAll(other.intendedLocks);
-
- other.locks.entrySet().forEach(entry -> addLock(entry.getKey(),
entry.getValue()));
-
- recalculate();
-
- if (other.hasLockIntent()) {
- fut = other.fut;
- }
- }
-
- /**
- * Removes all locks that were intended to hold.
- */
- void refuseIntent() {
- for (LockMode mode : intendedLocks) {
- locks.remove(mode);
- }
-
- intendedLocks.clear();
- intendedLockMode = null;
- }
-
- /** {@inheritDoc} */
- @Override
- public int compareTo(WaiterImpl o) {
- return txId.compareTo(o.txId);
- }
-
- /** Notifies a future listeners. */
- private void notifyLocked() {
- if (ex != null) {
- fut.completeExceptionally(ex);
- } else {
- assert lockMode != null;
-
- fut.complete(null);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean locked() {
- return this.lockMode != null;
- }
-
- /**
- * Checks is the waiter has any intended to lock a key.
- *
- * @return True if the waiter has an intended lock, false otherwise.
- */
- public boolean hasLockIntent() {
- return this.intendedLockMode != null;
- }
-
- /** {@inheritDoc} */
- @Override
- public LockMode lockMode() {
- return lockMode;
- }
-
- /** {@inheritDoc} */
- @Override
- public LockMode intendedLockMode() {
- return intendedLockMode;
- }
-
- /** Grant a lock. */
- private void lock() {
- lockMode = intendedLockMode;
-
- intendedLockMode = null;
-
- intendedLocks.clear();
- }
-
- /**
- * Fails the lock waiter.
- *
- * @param e Lock exception.
- */
- private void fail(LockException e) {
- ex = e;
- }
-
- /** {@inheritDoc} */
- @Override
- public UUID txId() {
- return txId;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof WaiterImpl)) {
- return false;
- }
-
- return compareTo((WaiterImpl) o) == 0;
- }
-
- /** {@inheritDoc} */
- @Override
- public int hashCode() {
- return txId.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString() {
- return S.toString(WaiterImpl.class, this, "isDone", fut.isDone());
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean isEmpty() {
- return locks.isEmpty();
- }
-}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
index f790eb9044..17d56f8535 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
@@ -1070,7 +1070,7 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
@Test
public void testLockIsReleased() {
- LockKey key = new LockKey(0);
+ LockKey key = lockKey();
UUID txId1 = TestTransactionIds.newTransactionId();
@@ -1097,7 +1097,7 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
UUID tx2 = TestTransactionIds.newTransactionId();
UUID tx3 = TestTransactionIds.newTransactionId();
- var key = new LockKey(0);
+ var key = lockKey();
assertThat(lockManager.acquire(tx1, key, S),
willCompleteSuccessfully());
assertThat(lockManager.acquire(tx2, key, IS),
willCompleteSuccessfully());
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
new file mode 100644
index 0000000000..d3348e7e06
--- /dev/null
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/CoarseGrainedLockManagerTest.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
+import org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy;
+import org.apache.ignite.internal.tx.test.TestTransactionIds;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests coarse lock modes. It allows IX, S locks and upgrade from S to SIX
(S, then IX).
+ */
+public class CoarseGrainedLockManagerTest {
+ private final HeapLockManager lockManager = new HeapLockManager(new
WaitDieDeadlockPreventionPolicy());
+
+ @AfterEach
+ void after() {
+ assertTrue(lockManager.isEmpty());
+ }
+
+ @Test
+ public void testSimple() {
+ UUID older = TestTransactionIds.newTransactionId();
+ UUID newer = TestTransactionIds.newTransactionId();
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(),
LockMode.IX);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(older, lockKey(),
LockMode.S);
+ assertFalse(fut2.isDone());
+
+ lockManager.releaseAll(newer);
+ fut2.join();
+
+ lockManager.releaseAll(older);
+ }
+
+ @Test
+ public void testSimpleInverse() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(),
LockMode.S);
+ assertTrue(fut1.isDone());
+
+ UUID txId2 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, lockKey(),
LockMode.IX);
+
+ assertThrowsWithCause(fut2::join, LockException.class);
+
+ lockManager.releaseAll(txId1);
+
+ fut2 = lockManager.acquire(txId2, lockKey(), LockMode.IX);
+ assertTrue(fut2.isDone());
+
+ lockManager.releaseAll(txId2);
+ }
+
+ @Test
+ public void testComplex() {
+ // Older.
+ UUID txId4 = TestTransactionIds.newTransactionId();
+ UUID txId5 = TestTransactionIds.newTransactionId();
+ // Newer.
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ UUID txId2 = TestTransactionIds.newTransactionId();
+ UUID txId3 = TestTransactionIds.newTransactionId();
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(),
LockMode.IX);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, lockKey(),
LockMode.IX);
+ assertTrue(fut2.isDone());
+
+ CompletableFuture<Lock> fut3 = lockManager.acquire(txId3, lockKey(),
LockMode.IX);
+ assertTrue(fut3.isDone());
+
+ CompletableFuture<Lock> fut4 = lockManager.acquire(txId4, lockKey(),
LockMode.S);
+ assertFalse(fut4.isDone());
+
+ CompletableFuture<Lock> fut5 = lockManager.acquire(txId5, lockKey(),
LockMode.S);
+ assertFalse(fut5.isDone());
+
+ lockManager.releaseAll(txId1);
+ assertFalse(fut4.isDone());
+ assertFalse(fut5.isDone());
+
+ lockManager.releaseAll(txId2);
+ assertFalse(fut4.isDone());
+ assertFalse(fut5.isDone());
+
+ lockManager.releaseAll(txId3);
+ fut4.join();
+ fut5.join();
+
+ lockManager.releaseAll(txId4);
+ lockManager.releaseAll(txId5);
+ }
+
+ @Test
+ public void testComplexInverse() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(),
LockMode.S);
+ assertTrue(fut1.isDone());
+
+ UUID txId2 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, lockKey(),
LockMode.S);
+ assertTrue(fut2.isDone());
+
+ UUID txId3 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut3 = lockManager.acquire(txId3, lockKey(),
LockMode.S);
+ assertTrue(fut3.isDone());
+
+ UUID txId4 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut4 = lockManager.acquire(txId4, lockKey(),
LockMode.IX);
+ assertThrowsWithCause(fut4::join, LockException.class);
+
+ UUID txId5 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut5 = lockManager.acquire(txId5, lockKey(),
LockMode.IX);
+ assertThrowsWithCause(fut5::join, LockException.class);
+
+ lockManager.releaseAll(txId1);
+ lockManager.releaseAll(txId2);
+ lockManager.releaseAll(txId3);
+
+ fut4 = lockManager.acquire(txId4, lockKey(), LockMode.IX);
+ fut4.join();
+
+ fut5 = lockManager.acquire(txId5, lockKey(), LockMode.IX);
+ fut5.join();
+
+ lockManager.releaseAll(txId4);
+ lockManager.releaseAll(txId5);
+ }
+
+ @Test
+ public void testUpgrade() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(),
LockMode.IX);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, lockKey(),
LockMode.S);
+ assertTrue(fut2.isDone());
+
+ lockManager.releaseAll(txId1);
+ }
+
+ @Test
+ public void testUpgradeReverse() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(),
LockMode.S);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, lockKey(),
LockMode.IX);
+ assertTrue(fut2.isDone());
+
+ lockManager.releaseAll(txId1);
+ }
+
+ @Test
+ public void testUpgradeMulti() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(),
LockMode.IX);
+ assertTrue(fut1.isDone());
+
+ UUID txId2 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, lockKey(),
LockMode.IX);
+ assertTrue(fut2.isDone());
+
+ CompletableFuture<Lock> fut3 = lockManager.acquire(txId1, lockKey(),
LockMode.S);
+ assertThrowsWithCause(fut3::join, LockException.class);
+
+ lockManager.releaseAll(txId1);
+ lockManager.releaseAll(txId2);
+ }
+
+ @Test
+ public void testUpgradeReverseMulti() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(),
LockMode.S);
+ assertTrue(fut1.isDone());
+
+ UUID txId2 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, lockKey(),
LockMode.S);
+ assertTrue(fut2.isDone());
+
+ CompletableFuture<Lock> fut3 = lockManager.acquire(txId1, lockKey(),
LockMode.IX);
+ assertThrowsWithCause(fut3::join, LockException.class);
+
+ lockManager.releaseAll(txId1);
+ lockManager.releaseAll(txId2);
+ }
+
+ @Test
+ public void testReenter() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(),
LockMode.IX);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, lockKey(),
LockMode.IX);
+ assertTrue(fut2.isDone());
+
+ lockManager.releaseAll(txId1);
+ }
+
+ @Test
+ public void testReenter2() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(),
LockMode.S);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, lockKey(),
LockMode.S);
+ assertTrue(fut2.isDone());
+
+ lockManager.releaseAll(txId1);
+ }
+
+ @Test
+ public void testUpgradeAndLockRequest() {
+ UUID older = TestTransactionIds.newTransactionId();
+ UUID newer = TestTransactionIds.newTransactionId();
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(),
LockMode.IX);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(newer, lockKey(),
LockMode.S);
+ assertTrue(fut2.isDone());
+
+
+ CompletableFuture<Lock> fut3 = lockManager.acquire(older, lockKey(),
LockMode.S);
+ assertFalse(fut3.isDone());
+
+ lockManager.releaseAll(newer);
+
+ fut3.join();
+
+ lockManager.releaseAll(older);
+ }
+
+ @Test
+ public void testUpgradeAndLockRequestReverse() {
+ UUID older = TestTransactionIds.newTransactionId();
+ UUID newer = TestTransactionIds.newTransactionId();
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(),
LockMode.S);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(newer, lockKey(),
LockMode.IX);
+ assertTrue(fut2.isDone());
+
+ CompletableFuture<Lock> fut3 = lockManager.acquire(older, lockKey(),
LockMode.S);
+ assertFalse(fut3.isDone());
+
+ lockManager.releaseAll(newer);
+
+ fut3.join();
+
+ lockManager.releaseAll(older);
+ }
+
+ @Test
+ public void testUpgradeAndLockRequest2() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(),
LockMode.IX);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, lockKey(),
LockMode.S);
+ assertTrue(fut2.isDone());
+
+ UUID txId2 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut3 = lockManager.acquire(txId2, lockKey(),
LockMode.IX);
+ assertThrowsWithCause(fut3::join, LockException.class);
+
+ lockManager.releaseAll(txId1);
+ }
+
+ @Test
+ public void testUpgradeAndLockRequestReverse2() {
+ UUID txId1 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, lockKey(),
LockMode.S);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, lockKey(),
LockMode.IX);
+ assertTrue(fut2.isDone());
+
+ UUID txId2 = TestTransactionIds.newTransactionId();
+ CompletableFuture<Lock> fut3 = lockManager.acquire(txId2, lockKey(),
LockMode.IX);
+ assertThrowsWithCause(fut3::join, LockException.class);
+
+ lockManager.releaseAll(txId1);
+ }
+
+ @Test
+ public void testDeadlockAvoidance() {
+ UUID older = TestTransactionIds.newTransactionId();
+ UUID newer = TestTransactionIds.newTransactionId();
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(),
LockMode.IX);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(older, lockKey2(),
LockMode.IX);
+ assertTrue(fut2.isDone());
+
+ CompletableFuture<Lock> fut3 = lockManager.acquire(newer, lockKey2(),
LockMode.S);
+ assertThrowsWithCause(fut3::join, LockException.class);
+
+ CompletableFuture<Lock> fut4 = lockManager.acquire(older, lockKey(),
LockMode.S);
+ assertFalse(fut4.isDone());
+
+ lockManager.releaseAll(newer);
+
+ fut4.join();
+
+ lockManager.releaseAll(older);
+ }
+
+ @Test
+ public void testReleaseWaitingTx() {
+ UUID older = TestTransactionIds.newTransactionId();
+ UUID newer = TestTransactionIds.newTransactionId();
+
+ CompletableFuture<Lock> fut1 = lockManager.acquire(newer, lockKey(),
LockMode.IX);
+ assertTrue(fut1.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(older, lockKey(),
LockMode.S);
+ assertFalse(fut2.isDone());
+
+ lockManager.releaseAll(older);
+
+ fut2.join();
+
+ lockManager.releaseAll(newer);
+ }
+
+ private static LockKey lockKey() {
+ return new LockKey("test");
+ }
+
+ private static LockKey lockKey2() {
+ return new LockKey("test2");
+ }
+}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapUnboundedLockManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapUnboundedLockManagerTest.java
deleted file mode 100644
index 24744f7504..0000000000
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapUnboundedLockManagerTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.tx;
-
-import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
-import org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy;
-
-/**
- * Test class for {@link HeapUnboundedLockManager}.
- */
-public class HeapUnboundedLockManagerTest extends AbstractLockManagerTest {
- @Override
- protected LockManager newInstance() {
- return new HeapUnboundedLockManager(new
WaitDieDeadlockPreventionPolicy());
- }
-
- @Override
- protected LockKey lockKey() {
- return new LockKey("test");
- }
-}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionUnboundedTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionUnboundedTest.java
deleted file mode 100644
index c335950e91..0000000000
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionUnboundedTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.tx;
-
-import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
-
-/**
- * NoWaitDeadlockPreventionUnboundedTest.
- */
-public class NoWaitDeadlockPreventionUnboundedTest extends
NoWaitDeadlockPreventionTest {
- @Override
- protected LockManager lockManager() {
- return new HeapUnboundedLockManager(deadlockPreventionPolicy());
- }
-}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionUnboundedTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionUnboundedTest.java
deleted file mode 100644
index 4dcdfadd00..0000000000
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionUnboundedTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.tx;
-
-import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
-
-/**
- * NoneDeadlockPreventionUnboundedTest.
- */
-public class NoneDeadlockPreventionUnboundedTest extends
NoneDeadlockPreventionTest {
- @Override
- protected LockManager lockManager() {
- return new HeapUnboundedLockManager(deadlockPreventionPolicy());
- }
-}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/ReversedDeadlockPreventionUnboundedTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/ReversedDeadlockPreventionUnboundedTest.java
deleted file mode 100644
index f0f66bb8ea..0000000000
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/ReversedDeadlockPreventionUnboundedTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.tx;
-
-import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
-
-/**
- * ReversedDeadlockPreventionUnboundedTest.
- */
-public class ReversedDeadlockPreventionUnboundedTest extends
ReversedDeadlockPreventionTest {
- @Override
- protected LockManager lockManager() {
- return new HeapUnboundedLockManager(deadlockPreventionPolicy());
- }
-}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TimeoutDeadlockPreventionUnboundedTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TimeoutDeadlockPreventionUnboundedTest.java
deleted file mode 100644
index 3bd121abd5..0000000000
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TimeoutDeadlockPreventionUnboundedTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.tx;
-
-import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
-
-/**
- * TimeoutDeadlockPreventionUnboundedTest.
- */
-public class TimeoutDeadlockPreventionUnboundedTest extends
TimeoutDeadlockPreventionTest {
- @Override
- protected LockManager lockManager() {
- return new HeapUnboundedLockManager(deadlockPreventionPolicy());
- }
-}