This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 50ea73e3da IGNITE-17733 Change lock manager implementation (#1370)
50ea73e3da is described below
commit 50ea73e3daf1885c60db3c1fa97650b538f6f5f0
Author: Denis Chudov <[email protected]>
AuthorDate: Fri Nov 25 21:27:15 2022 +0200
IGNITE-17733 Change lock manager implementation (#1370)
---
.../matchers/CompletableFutureMatcher.java | 45 ++-
.../Transactions/TransactionsTests.cs | 11 +-
.../app/client/ItThinClientTransactionsTest.java | 10 +-
.../ignite/internal/table/ItTableScanTest.java | 28 +-
.../ignite/internal/table/TxAbstractTest.java | 188 +++--------
.../ignite/internal/tx/impl/HeapLockManager.java | 178 +++++-----
.../internal/tx/AbstractLockManagerTest.java | 226 ++++++-------
.../ignite/internal/tx/DeadlockPreventionTest.java | 372 +++++++++++++++++++++
.../ignite/internal/tx/HeapLockManagerTest.java | 2 -
9 files changed, 686 insertions(+), 374 deletions(-)
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
index d5bab3a906..ad99ecd1a1 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
@@ -32,26 +32,45 @@ import org.hamcrest.TypeSafeMatcher;
* {@link Matcher} that awaits for the given future to complete and then
forwards the result to the nested {@code matcher}.
*/
public class CompletableFutureMatcher<T> extends
TypeSafeMatcher<CompletableFuture<? extends T>> {
- /** Timeout in seconds. */
- private static final int TIMEOUT_SECONDS = 30;
+ /** Default timeout in seconds. */
+ private static final int DEFAULT_TIMEOUT_SECONDS = 30;
/** Matcher to forward the result of the completable future. */
private final Matcher<T> matcher;
+ /** Timeout. */
+ private final int timeout;
+
+ /** Time unit for timeout. */
+ private final TimeUnit timeoutTimeUnit;
+
/**
* Constructor.
*
* @param matcher Matcher to forward the result of the completable future.
*/
private CompletableFutureMatcher(Matcher<T> matcher) {
+ this(matcher, DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param matcher Matcher to forward the result of the completable future.
+ * @param timeout Timeout.
+ * @param timeoutTimeUnit {@link TimeUnit} for timeout.
+ */
+ private CompletableFutureMatcher(Matcher<T> matcher, int timeout, TimeUnit
timeoutTimeUnit) {
this.matcher = matcher;
+ this.timeout = timeout;
+ this.timeoutTimeUnit = timeoutTimeUnit;
}
/** {@inheritDoc} */
@Override
protected boolean matchesSafely(CompletableFuture<? extends T> item) {
try {
- return matcher.matches(item.get(TIMEOUT_SECONDS,
TimeUnit.SECONDS));
+ return matcher.matches(item.get(timeout, timeoutTimeUnit));
} catch (InterruptedException | ExecutionException | TimeoutException
e) {
throw new AssertionError(e);
}
@@ -80,6 +99,26 @@ public class CompletableFutureMatcher<T> extends
TypeSafeMatcher<CompletableFutu
return willBe(anything());
}
+ /**
+ * Creates a matcher that matches a future that completes successfully and
decently fast.
+ *
+ * @return matcher.
+ */
+ public static CompletableFutureMatcher<Object> willSucceedFast() {
+ return willSucceedIn(1, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Creates a matcher that matches a future that completes successfully
with any result within the given timeout.
+ *
+ * @param time Timeout.
+ * @param timeUnit Time unit for timeout.
+ * @return matcher.
+ */
+ public static CompletableFutureMatcher<Object> willSucceedIn(int time,
TimeUnit timeUnit) {
+ return new CompletableFutureMatcher<>(anything(), time, timeUnit);
+ }
+
/**
* A shorter version of {@link #willBe} to be used with some matchers for
aesthetic reasons.
*/
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
index b6f339d7b9..937ab937f5 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
@@ -159,17 +159,22 @@ namespace Apache.Ignite.Tests.Transactions
[Test]
public async Task TestClientDisconnectClosesActiveTransactions()
{
+ await using var tx0 = await Client.Transactions.BeginAsync();
await TupleView.UpsertAsync(null, GetTuple(1, "1"));
using (var client2 = await IgniteClient.StartAsync(GetConfig()))
{
var table = await client2.Tables.GetTableAsync(TableName);
- var tx = await client2.Transactions.BeginAsync();
+ var tx1 = await client2.Transactions.BeginAsync();
- await table!.RecordBinaryView.UpsertAsync(tx, GetTuple(1,
"2"));
+ await table!.RecordBinaryView.UpsertAsync(tx1, GetTuple(1,
"2"));
}
- Assert.AreEqual("1", (await TupleView.GetAsync(null,
GetTuple(1))).Value[ValCol]);
+ // The code above is intentionally written in a way that we have
no guarantee that the lock taken by tx1 is already released,
+ // as client2 is closed without rolling back tx1, forcing Ignite
server to rollback tx1 in background. So we should check the
+ // value using transaction that is older than tx1, to make sure
that the conflict on key 1 between tx0 and tx1 will not lead
+ // to exception.
+ Assert.AreEqual("1", (await TupleView.GetAsync(tx0,
GetTuple(1))).Value[ValCol]);
}
[Test]
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index 6824e411c0..c9b275410e 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -187,13 +187,15 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
void testAccessLockedKeyTimesOut() {
KeyValueView<Integer, String> kvView = kvView();
- Transaction tx = client().transactions().begin();
- kvView.put(tx, -100, "1");
+ Transaction tx1 = client().transactions().begin();
+ Transaction tx2 = client().transactions().begin();
+
+ kvView.put(tx2, -100, "1");
- var ex = assertThrows(IgniteException.class, () -> kvView.get(null,
-100));
+ var ex = assertThrows(IgniteException.class, () -> kvView.get(tx1,
-100));
assertThat(ex.getMessage(), containsString("TimeoutException"));
- tx.rollback();
+ tx2.rollback();
}
@Test
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 6743d15bfc..2ffbdc25db 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.table;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -37,8 +36,10 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.IgniteTransactions;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -48,7 +49,6 @@ import org.junit.jupiter.api.Test;
* Tests to check a scan internal command.
*/
public class ItTableScanTest extends AbstractBasicIntegrationTest {
-
/** Table name. */
private static final String TABLE_NAME = "test";
@@ -73,6 +73,10 @@ public class ItTableScanTest extends
AbstractBasicIntegrationTest {
@Test
public void testInsertWaitScanComplete() throws Exception {
TableImpl table = getOrCreateTable();
+ IgniteTransactions transactions = CLUSTER_NODES.get(0).transactions();
+
+ InternalTransaction tx0 = (InternalTransaction) transactions.begin();
+ InternalTransaction tx1 = (InternalTransaction) transactions.begin();
InternalTable internalTable = table.internalTable();
@@ -80,7 +84,7 @@ public class ItTableScanTest extends
AbstractBasicIntegrationTest {
ArrayList<ByteBuffer> scannedRows = new ArrayList<>();
- Publisher<BinaryRow> publisher = internalTable.scan(0, null,
soredIndexId, null, null, 0, null);
+ Publisher<BinaryRow> publisher = internalTable.scan(0, tx1,
soredIndexId, null, null, 0, null);
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -94,17 +98,20 @@ public class ItTableScanTest extends
AbstractBasicIntegrationTest {
assertFalse(scanned.isDone());
CompletableFuture<Void> insertFut = table.keyValueView()
- .putAsync(null, Tuple.create().set("key", 3),
Tuple.create().set("valInt", 3).set("valStr", "New_3"));
+ .putAsync(tx0, Tuple.create().set("key", 3),
Tuple.create().set("valInt", 3).set("valStr", "New_3"));
assertFalse(insertFut.isDone());
subscription.request(1_000); // Request so much entries here to close
the publisher.
- IgniteTestUtils.waitForCondition(() -> scannedRows.size() ==
ROW_IDS.size(), 10_000);
+ IgniteTestUtils.await(scanned);
assertEquals(ROW_IDS.size(), scannedRows.size());
- assertTrue(scanned.isDone());
- assertTrue(insertFut.isDone());
+
+ tx1.commit();
+ IgniteTestUtils.await(insertFut);
+
+ tx0.commit();
}
@Test
@@ -134,10 +141,10 @@ public class ItTableScanTest extends
AbstractBasicIntegrationTest {
subscription.request(1_000); // Request so much entries here to close
the publisher.
- IgniteTestUtils.waitForCondition(() -> scannedRows.size() ==
ROW_IDS.size() + 1, 10_000);
+ IgniteTestUtils.await(scanned);
- assertEquals(ROW_IDS.size(), scannedRows.size());
- assertTrue(scanned.isDone());
+ //TODO: IGNITE-18243 Uncomment this change when the scan on a sorted
index will be redesigned.
+ //assertEquals(ROW_IDS.size() + 1, scannedRows.size());
}
/**
@@ -242,6 +249,5 @@ public class ItTableScanTest extends
AbstractBasicIntegrationTest {
private static void insertRow(int rowId) {
sql(IgniteStringFormatter.format("INSERT INTO {} (key, valInt, valStr)
VALUES ({}, {}, '{}');",
TABLE_NAME, rowId, rowId, "Str_" + rowId));
-
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 8a98574b77..18fcb681f5 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -56,6 +57,7 @@ import
org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.Lock;
+import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
import org.apache.ignite.internal.tx.TxManager;
@@ -206,35 +208,35 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
public void testLockOrdering() throws InterruptedException {
accounts.recordView().upsert(null, makeValue(1, 50.));
- InternalTransaction tx = (InternalTransaction)
igniteTransactions.begin();
+ InternalTransaction tx1 = (InternalTransaction)
igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction)
igniteTransactions.begin();
InternalTransaction tx3 = (InternalTransaction)
igniteTransactions.begin();
InternalTransaction tx4 = (InternalTransaction)
igniteTransactions.begin();
- assertTrue(tx2.id().compareTo(tx.id()) > 0);
- assertTrue(tx3.id().compareTo(tx2.id()) > 0);
- assertTrue(tx4.id().compareTo(tx3.id()) > 0);
+ assertTrue(tx3.id().compareTo(tx4.id()) < 0);
+ assertTrue(tx2.id().compareTo(tx3.id()) < 0);
+ assertTrue(tx1.id().compareTo(tx2.id()) < 0);
RecordView<Tuple> acc0 = accounts.recordView();
RecordView<Tuple> acc2 = accounts.recordView();
RecordView<Tuple> acc3 = accounts.recordView();
RecordView<Tuple> acc4 = accounts.recordView();
- acc0.upsert(tx, makeValue(1, 100.));
+ acc0.upsert(tx4, makeValue(1, 100.));
- CompletableFuture<Void> fut = acc3.upsertAsync(tx3, makeValue(1,
300.));
+ CompletableFuture<Void> fut = acc3.upsertAsync(tx2, makeValue(1,
300.));
Thread.sleep(100);
assertFalse(fut.isDone());
- CompletableFuture<Void> fut2 = acc4.upsertAsync(tx3, makeValue(1,
400.));
+ CompletableFuture<Void> fut2 = acc4.upsertAsync(tx2, makeValue(1,
400.));
Thread.sleep(100);
assertFalse(fut2.isDone());
- CompletableFuture<Void> fut3 = acc2.upsertAsync(tx2, makeValue(1,
200.));
+ CompletableFuture<Void> fut3 = acc2.upsertAsync(tx3, makeValue(1,
200.));
assertFalse(fut3.isDone());
}
@@ -375,11 +377,11 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
@Test
public void testBatchPutConcurrently() {
- Transaction tx = igniteTransactions.begin();
+ Transaction tx1 = igniteTransactions.begin();
Transaction tx2 = igniteTransactions.begin();
- log.info("Tx " + tx);
- log.info("Tx2 " + tx2);
+ log.info("Tx " + tx2);
+ log.info("Tx2 " + tx1);
ArrayList<Tuple> rows = new ArrayList<>();
ArrayList<Tuple> rows2 = new ArrayList<>();
@@ -392,21 +394,21 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
var table = accounts.recordView();
var table2 = accounts.recordView();
- table2.upsertAll(tx2, rows2);
+ table2.upsertAll(tx1, rows2);
- Exception err = assertThrows(Exception.class, () ->
table.upsertAll(tx, rows));
+ Exception err = assertThrows(Exception.class, () ->
table.upsertAll(tx2, rows));
assertTrue(err.getMessage().contains("Failed to acquire a lock"),
err.getMessage());
- tx2.commit();
+ tx1.commit();
}
@Test
public void testBatchReadPutConcurrently() throws InterruptedException {
- InternalTransaction tx = (InternalTransaction)
igniteTransactions.begin();
+ InternalTransaction tx1 = (InternalTransaction)
igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction)
igniteTransactions.begin();
- log.info("Tx " + tx);
+ log.info("Tx1 " + tx1);
log.info("Tx2 " + tx2);
var table = accounts.recordView();
@@ -420,7 +422,7 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
keys2.add(makeKey(i));
}
- table2.getAll(tx, keys);
+ table2.getAll(tx1, keys);
table2.getAll(tx2, keys2);
ArrayList<Tuple> rows = new ArrayList<>();
@@ -431,12 +433,12 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
rows2.add(makeValue(i, 2 * i * 100.));
}
- var futUpd2 = table2.upsertAllAsync(tx2, rows2);
+ var futUpd2 = table2.upsertAllAsync(tx1, rows2);
assertTrue(IgniteTestUtils.waitForCondition(() -> {
boolean lockUpgraded = false;
- for (Iterator<Lock> it =
txManager(accounts).lockManager().locks(tx2.id()); it.hasNext(); ) {
+ for (Iterator<Lock> it =
txManager(accounts).lockManager().locks(tx1.id()); it.hasNext(); ) {
Lock lock = it.next();
lockUpgraded = lock.lockMode() == LockMode.X;
@@ -447,18 +449,11 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
}
return lockUpgraded;
- },
- 3000));
+ }, 3000));
assertFalse(futUpd2.isDone());
- table.upsertAll(tx, rows);
-
- tx.commit();
-
- Exception err = assertThrows(Exception.class, () -> futUpd2.join());
-
- assertTrue(err.getMessage().contains("Failed to acquire a lock"),
err.getMessage());
+ assertThrowsWithCause(() -> table.upsertAll(tx2, rows),
LockException.class);
}
/**
@@ -516,29 +511,29 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
public void testSimpleConflict() throws Exception {
accounts.recordView().upsert(null, makeValue(1, 100.));
- Transaction tx = igniteTransactions.begin();
+ Transaction tx1 = igniteTransactions.begin();
Transaction tx2 = igniteTransactions.begin();
var table = accounts.recordView();
var table2 = accounts.recordView();
- double val = table.get(tx, makeKey(1)).doubleValue("balance");
- table2.get(tx2, makeKey(1)).doubleValue("balance");
+ double val = table.get(tx2, makeKey(1)).doubleValue("balance");
+ table2.get(tx1, makeKey(1)).doubleValue("balance");
try {
- table.upsert(tx, makeValue(1, val + 1));
+ table.upsert(tx2, makeValue(1, val + 1));
fail();
} catch (Exception e) {
// Expected.
}
- table2.upsert(tx2, makeValue(1, val + 1));
+ table2.upsert(tx1, makeValue(1, val + 1));
- tx2.commit();
+ tx1.commit();
try {
- tx.commit();
+ tx2.commit();
fail();
} catch (TransactionException e) {
@@ -626,7 +621,7 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
*/
@Test
public void testIncrement() throws TransactionException {
- Transaction tx = igniteTransactions.begin();
+ Transaction tx1 = igniteTransactions.begin();
Transaction tx2 = igniteTransactions.begin();
Tuple key = makeKey(1);
@@ -637,63 +632,22 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
var table = accounts.recordView();
var table2 = accounts.recordView();
- // Read in tx
- double valTx = table.get(tx, key).doubleValue("balance");
-
// Read in tx2
- double valTx2 = table2.get(tx2, key).doubleValue("balance");
+ double valTx = table.get(tx2, key).doubleValue("balance");
- // Write in tx (out of order)
+ // Read in tx1
+ double valTx2 = table2.get(tx1, key).doubleValue("balance");
+
+ // Write in tx2 (out of order)
// TODO asch IGNITE-15937 fix exception model.
- Exception err = assertThrows(Exception.class, () -> table.upsert(tx,
makeValue(1, valTx + 1)));
+ Exception err = assertThrows(Exception.class, () -> table.upsert(tx2,
makeValue(1, valTx + 1)));
assertTrue(err.getMessage().contains("Failed to acquire a lock"),
err.getMessage());
- // Write in tx2
- table2.upsert(tx2, makeValue(1, valTx2 + 1));
+ // Write in tx1
+ table2.upsert(tx1, makeValue(1, valTx2 + 1));
- tx2.commit();
-
- assertEquals(101., accounts.recordView().get(null,
key).doubleValue("balance"));
- }
-
- /**
- * Tests if a lost update is not happening on concurrent increment.
- */
- @Test
- public void testIncrement2() throws TransactionException,
InterruptedException {
- InternalTransaction tx = (InternalTransaction)
igniteTransactions.begin();
- InternalTransaction tx2 = (InternalTransaction)
igniteTransactions.begin();
-
- Tuple key = makeKey(1);
- Tuple val = makeValue(1, 100.);
-
- accounts.recordView().upsert(null, val); // Creates implicit
transaction.
-
- var table = accounts.recordView();
- var table2 = accounts.recordView();
-
- // Read in tx
- double valTx = table.get(tx, key).doubleValue("balance");
-
- // Read in tx2
- double valTx2 = table2.get(tx2, key).doubleValue("balance");
-
- // Write in tx2 (should wait for read unlock in tx1)
- CompletableFuture<Void> fut = table2.upsertAsync(tx2, makeValue(1,
valTx2 + 1));
- Thread.sleep(300); // Give some time to update lock queue TODO asch
IGNITE-15928
- assertFalse(fut.isDone());
-
- CompletableFuture<Void> fut2 = fut.thenCompose(ret ->
tx2.commitAsync());
-
- // Write in tx
- table.upsert(tx, makeValue(1, valTx + 1));
-
- tx.commit();
-
- Exception err = assertThrows(Exception.class, () -> fut2.get(5,
TimeUnit.SECONDS));
-
- assertTrue(err.getMessage().contains("Failed to acquire a lock"),
err.getMessage());
+ tx1.commit();
assertEquals(101., accounts.recordView().get(null,
key).doubleValue("balance"));
}
@@ -826,24 +780,24 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
public void testGetAllConflict() throws Exception {
accounts.recordView().upsertAll(null, List.of(makeValue(1, 100.),
makeValue(2, 200.)));
- InternalTransaction tx = (InternalTransaction)
igniteTransactions.begin();
+ InternalTransaction tx1 = (InternalTransaction)
igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction)
igniteTransactions.begin();
RecordView<Tuple> txAcc = accounts.recordView();
RecordView<Tuple> txAcc2 = accounts.recordView();
- txAcc2.upsert(tx2, makeValue(1, 300.));
- txAcc.upsert(tx, makeValue(2, 400.));
+ txAcc2.upsert(tx1, makeValue(1, 300.));
+ txAcc.upsert(tx2, makeValue(2, 400.));
- Exception err = assertThrows(Exception.class, () -> txAcc.getAll(tx,
List.of(makeKey(2), makeKey(1))));
+ Exception err = assertThrows(Exception.class, () -> txAcc.getAll(tx2,
List.of(makeKey(2), makeKey(1))));
assertTrue(err.getMessage().contains("Failed to acquire a lock"),
err.getMessage());
- validateBalance(txAcc2.getAll(tx2, List.of(makeKey(2), makeKey(1))),
200., 300.);
- validateBalance(txAcc2.getAll(tx2, List.of(makeKey(1), makeKey(2))),
300., 200.);
+ validateBalance(txAcc2.getAll(tx1, List.of(makeKey(2), makeKey(1))),
200., 300.);
+ validateBalance(txAcc2.getAll(tx1, List.of(makeKey(1), makeKey(2))),
300., 200.);
- assertTrue(IgniteTestUtils.waitForCondition(() -> TxState.ABORTED ==
tx.state(), 5_000), tx.state().toString());
+ assertTrue(IgniteTestUtils.waitForCondition(() -> TxState.ABORTED ==
tx2.state(), 5_000), tx2.state().toString());
- tx2.commit();
+ tx1.commit();
validateBalance(accounts.recordView().getAll(null, List.of(makeKey(2),
makeKey(1))), 200., 300.);
}
@@ -1036,7 +990,7 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
public void testReorder() throws Exception {
accounts.recordView().upsert(null, makeValue(1, 100.));
- InternalTransaction tx = (InternalTransaction)
igniteTransactions.begin();
+ InternalTransaction tx1 = (InternalTransaction)
igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction)
igniteTransactions.begin();
InternalTransaction tx3 = (InternalTransaction)
igniteTransactions.begin();
@@ -1044,52 +998,14 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
var table2 = accounts.recordView();
var table3 = accounts.recordView();
- double v0 = table.get(tx, makeKey(1)).doubleValue("balance");
- double v1 = table3.get(tx3, makeKey(1)).doubleValue("balance");
-
- assertEquals(v0, v1);
-
- CompletableFuture<Void> fut = table3.upsertAsync(tx3, makeValue(1, v0
+ 10));
- assertFalse(fut.isDone());
-
- Thread.sleep(300); // Give some time to update lock queue TODO asch
IGNITE-15928
-
- table.upsert(tx, makeValue(1, v0 + 20));
-
- CompletableFuture<Tuple> fut2 = table2.getAsync(tx2, makeKey(1));
- assertFalse(fut2.isDone());
-
- tx.commit();
-
- fut2.get();
-
- tx2.rollback();
-
- Exception err = assertThrows(Exception.class, () -> fut.get(5,
TimeUnit.SECONDS));
-
- assertTrue(err.getMessage().contains("Failed to acquire a lock"),
err.getMessage());
- }
-
- @Test
- public void testReorder2() throws Exception {
- accounts.recordView().upsert(null, makeValue(1, 100.));
-
- InternalTransaction tx = (InternalTransaction)
igniteTransactions.begin();
- InternalTransaction tx2 = (InternalTransaction)
igniteTransactions.begin();
- InternalTransaction tx3 = (InternalTransaction)
igniteTransactions.begin();
-
- var table = accounts.recordView();
- var table2 = accounts.recordView();
- var table3 = accounts.recordView();
-
- double v0 = table.get(tx, makeKey(1)).doubleValue("balance");
+ double v0 = table.get(tx3, makeKey(1)).doubleValue("balance");
- table.upsertAsync(tx, makeValue(1, v0 + 10));
+ table.upsertAsync(tx3, makeValue(1, v0 + 10));
CompletableFuture<Tuple> fut = table2.getAsync(tx2, makeKey(1));
assertFalse(fut.isDone());
- CompletableFuture<Tuple> fut2 = table3.getAsync(tx3, makeKey(1));
+ CompletableFuture<Tuple> fut2 = table3.getAsync(tx1, makeKey(1));
assertFalse(fut2.isDone());
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
index 30f75f3263..4d802f5d38 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java
@@ -17,11 +17,8 @@
package org.apache.ignite.internal.tx.impl;
-import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Transactions.RELEASE_LOCK_ERR;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -44,7 +41,6 @@ import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
import org.apache.ignite.internal.tx.Waiter;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteSystemProperties;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -72,19 +68,8 @@ import org.jetbrains.annotations.Nullable;
public class HeapLockManager implements LockManager {
private ConcurrentHashMap<LockKey, LockState> locks = new
ConcurrentHashMap<>();
- /**
- * It is a test only property which is removing after IGNITE-17733.
- * We are forced to avoid all locks types except key lock in production
code.
- */
- private final boolean allLockTypesAreUsed =
IgniteSystemProperties.getBoolean("IGNITE_ALL_LOCK_TYPES_ARE_USED");
-
@Override
public CompletableFuture<Lock> acquire(UUID txId, LockKey lockKey,
LockMode lockMode) {
- //TODO: IGNITE-17733 Resume honest index lock
- if (! (lockKey.key() instanceof ByteBuffer) && !allLockTypesAreUsed) {
// Takes a lock on keys only.
- lockMode = LockMode.NL;
- }
-
while (true) {
LockState state = lockState(lockKey);
@@ -181,8 +166,6 @@ public class HeapLockManager implements LockManager {
public @Nullable IgniteBiTuple<CompletableFuture<Void>, LockMode>
tryAcquire(UUID txId, LockMode lockMode) {
WaiterImpl waiter = new WaiterImpl(txId, lockMode);
- boolean locked;
-
synchronized (waiters) {
if (markedForRemove) {
return new IgniteBiTuple(null, lockMode);
@@ -191,7 +174,7 @@ public class HeapLockManager implements LockManager {
WaiterImpl prev = waiters.putIfAbsent(txId, waiter);
// Reenter
- if (prev != null && prev.locked) {
+ if (prev != null && prev.locked()) {
if (prev.lockMode.allowReenter(lockMode)) {
prev.addLock(lockMode, 1);
@@ -211,55 +194,67 @@ public class HeapLockManager implements LockManager {
}
}
- // Check lock compatibility.
- Map.Entry<UUID, WaiterImpl> nextEntry =
waiters.higherEntry(txId);
+ if (!isWaiterReadyToNotify(waiter, false)) {
+ return new IgniteBiTuple(waiter.fut, lockMode);
+ }
- // If we have a younger waiter in a locked state, when refuse
to wait for lock.
- if (nextEntry != null
- && nextEntry.getValue().locked()
- &&
!lockMode.isCompatible(nextEntry.getValue().lockMode)) {
+ if (!waiter.locked()) {
if (prev == null) {
- waiters.remove(txId);
+ waiters.remove(waiter.txId());
} else {
- waiters.put(txId, prev); // Restore old lock.
+ waiters.put(waiter.txId(), prev); // Restore old lock.
}
-
- return new IgniteBiTuple(
- failedFuture(new LockException(
- ACQUIRE_LOCK_ERR,
- "Failed to acquire a lock due to a
conflict [txId=" + txId + ", waiter=" + nextEntry.getValue() + ']')),
- lockMode);
}
+ }
- // Lock if oldest.
- locked = waiters.firstKey().equals(txId);
+ // Notify outside the monitor.
+ waiter.notifyLocked();
- if (!locked) {
- Map.Entry<UUID, WaiterImpl> prevEntry =
waiters.lowerEntry(txId);
+ return new IgniteBiTuple(waiter.fut, lockMode);
+ }
- // Grant lock if previous entry lock is compatible (by
induction).
- locked = prevEntry == null ||
(prevEntry.getValue().lockMode.isCompatible(lockMode) && prevEntry
- .getValue().locked());
+ /**
+ * Checks current waiter.
+ *
+ * @param waiter Checked waiter.
+ * @return True if current waiter ready to notify, false otherwise.
+ */
+ private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean
skipFail) {
+ for (Map.Entry<UUID, WaiterImpl> entry :
waiters.tailMap(waiter.txId(), false).entrySet()) {
+ WaiterImpl tmp = entry.getValue();
+ LockMode mode = lockedMode(tmp);
+
+ if (mode != null && !mode.isCompatible(waiter.lockMode())) {
+ return false;
}
+ }
- if (locked) {
- if (waiter.upgraded) {
- // Upgrade lock.
- waiter.upgraded = false;
- waiter.prevLockMode = null;
- waiter.locked = true;
+ for (Map.Entry<UUID, WaiterImpl> entry :
waiters.headMap(waiter.txId()).entrySet()) {
+ WaiterImpl tmp = entry.getValue();
+ LockMode mode = lockedMode(tmp);
+
+ if (mode != null && !mode.isCompatible(waiter.lockMode())) {
+ if (skipFail) {
+ return false;
} else {
- waiter.lock();
+ waiter.fail(new LockException(ACQUIRE_LOCK_ERR,
"Failed to acquire a lock due to a conflict [txId=" + waiter.txId()
+ + ", waiter=" + tmp + ']'));
+
+ return true;
}
}
}
- // Notify outside the monitor.
- if (locked) {
- waiter.notifyLocked();
+ if (waiter.upgraded) {
+ // Upgrade lock.
+ waiter.upgraded = false;
+ waiter.prevLockMode = null;
+ waiter.locked = true;
+ } else {
+ waiter.lock();
}
- return new IgniteBiTuple(waiter.fut, lockMode);
+ return true;
}
/**
@@ -300,6 +295,12 @@ public class HeapLockManager implements LockManager {
LockMode modeToDowngrade = waiter.recalculateMode();
+ if (!waiter.locked()) {
+ assert waiter.lockMode() == modeToDowngrade : "The
lock mode is not locked [mode=" + lockMode + ']';
+
+ return false;
+ }
+
if (modeToDowngrade == null) {
toNotify = release(txId);
} else {
@@ -332,7 +333,7 @@ public class HeapLockManager implements LockManager {
return Collections.emptyList();
}
- List<WaiterImpl> toNotify = unlockCompatibleWaiters(txId, removed,
null);
+ List<WaiterImpl> toNotify = unlockCompatibleWaiters();
return toNotify;
}
@@ -340,57 +341,54 @@ public class HeapLockManager implements LockManager {
/**
* Unlock compatible waiters.
*
- * @param txId Transaction id.
- * @param pickedUpWaiter List of unlocked waiters.
- * @param downgradeMode Lock mode to downgrade.
* @return List of waiters to notify.
*/
- private ArrayList<WaiterImpl> unlockCompatibleWaiters(UUID txId,
WaiterImpl pickedUpWaiter, LockMode downgradeMode) {
+ private ArrayList<WaiterImpl> unlockCompatibleWaiters() {
ArrayList<WaiterImpl> toNotify = new ArrayList<>();
- Set<LockMode> lockModes = new HashSet<>();
+ Set<UUID> toFail = new HashSet<>();
- if (downgradeMode != null) {
- lockModes.add(downgradeMode);
+ for (Map.Entry<UUID, WaiterImpl> entry : waiters.entrySet()) {
+ WaiterImpl tmp = entry.getValue();
+
+ if (!tmp.locked() && isWaiterReadyToNotify(tmp, true)) {
+ toNotify.add(tmp);
+ }
}
- // Grant lock to all adjacent readers.
for (Map.Entry<UUID, WaiterImpl> entry : waiters.entrySet()) {
WaiterImpl tmp = entry.getValue();
- if (tmp.upgraded &&
!pickedUpWaiter.lockMode.isCompatible(tmp.prevLockMode)) {
- // Fail upgraded waiters.
- assert !tmp.locked;
+ if (!tmp.locked() && isWaiterReadyToNotify(tmp, false)) {
+ assert !tmp.locked();
- // Downgrade to acquired lock.
- tmp.upgraded = false;
- tmp.lockMode = tmp.prevLockMode;
- tmp.prevLockMode = null;
- tmp.locked = true;
+ toNotify.add(tmp);
+ toFail.add(tmp.txId());
+ }
+ }
- tmp.fail(new LockException(RELEASE_LOCK_ERR,
- "Failed to acquire a lock due to a conflict
[txId=" + txId + ", waiter=" + pickedUpWaiter + ']'));
+ for (UUID failTx : toFail) {
+ waiters.remove(failTx);
+ }
- toNotify.add(tmp);
- } else if
(lockModes.stream().allMatch(tmp.lockMode::isCompatible)) {
- if (tmp.upgraded) {
- // Fail upgraded waiters.
- assert !tmp.locked;
-
- // Upgrade lock.
- tmp.upgraded = false;
- tmp.prevLockMode = null;
- tmp.locked = true;
- } else {
- tmp.lock();
- }
+ return toNotify;
+ }
- lockModes.add(tmp.lockMode);
+ /**
+ * Gets a lock mode for this waiter.
+ *
+ * @param waiter Waiter.
+ * @return Lock mode, which is held by the waiter or {@code null}, if
the waiter holds nothing.
+ */
+ private LockMode lockedMode(WaiterImpl waiter) {
+ LockMode mode = null;
- toNotify.add(tmp);
- }
+ if (waiter.locked()) {
+ mode = waiter.lockMode();
+ } else if (waiter.upgraded) {
+ mode = waiter.prevLockMode;
}
- return toNotify;
+ return mode;
}
/**
@@ -402,11 +400,9 @@ public class HeapLockManager implements LockManager {
* @return List of waiters to notify.
*/
private List<WaiterImpl> downgrade(UUID txId, LockMode lockMode) {
- WaiterImpl waiter = waiters.remove(txId);
+ WaiterImpl waiter = waiters.get(txId);
if (waiter == null || waiter.lockMode == lockMode) {
- waiters.put(txId, waiter);
-
return Collections.emptyList();
}
@@ -417,11 +413,9 @@ public class HeapLockManager implements LockManager {
"Held lock mode have to be more strict than mode to
downgrade [from=" + waiter.lockMode + ", to=" + lockMode
+ ']';
- List<WaiterImpl> toNotify = unlockCompatibleWaiters(txId, waiter,
lockMode);
-
waiter.lockMode = lockMode;
- waiters.put(txId, waiter);
+ List<WaiterImpl> toNotify = unlockCompatibleWaiters();
return toNotify;
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
index 00b9427258..514443791e 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java
@@ -37,7 +37,6 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
@@ -85,35 +84,34 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
@Test
public void testSingleKeyWriteLock() throws LockException {
UUID txId1 = Timestamp.nextVersion().toUuid();
+ UUID txId2 = Timestamp.nextVersion().toUuid();
LockKey key = new LockKey("test");
- CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, X);
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId2, key, X);
assertTrue(fut0.isDone());
- UUID txId2 = Timestamp.nextVersion().toUuid();
-
- assertTrue(txId1.compareTo(txId2) < 0);
+ assertTrue(txId2.compareTo(txId1) > 0);
- CompletableFuture<Lock> fut1 = lockManager.acquire(txId2, key, X);
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, X);
assertFalse(fut1.isDone());
- assertTrue(lockManager.waiter(key, txId1).locked());
- assertFalse(lockManager.waiter(key, txId2).locked());
+ assertTrue(lockManager.waiter(key, txId2).locked());
+ assertFalse(lockManager.waiter(key, txId1).locked());
lockManager.release(fut0.join());
assertTrue(fut1.isDone());
- assertNull(lockManager.waiter(key, txId1));
- assertTrue(lockManager.waiter(key, txId2).locked());
+ assertNull(lockManager.waiter(key, txId2));
+ assertTrue(lockManager.waiter(key, txId1).locked());
lockManager.release(fut1.join());
- assertNull(lockManager.waiter(key, txId1));
assertNull(lockManager.waiter(key, txId2));
+ assertNull(lockManager.waiter(key, txId1));
}
@Test
@@ -127,20 +125,20 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
lockManager.acquire(txId0, key, S).join();
lockManager.acquire(txId2, key, S).join();
- CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, X);
- assertFalse(fut2.isDone());
-
CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
- fut0.get(10, TimeUnit.SECONDS);
+ assertFalse(fut0.isDone());
+
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, X);
+ assertTrue(fut2.isDone());
+ assertTrue(fut2.isCompletedExceptionally());
CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, S);
- assertFalse(fut1.isDone());
+ fut1.join();
- assertFalse(fut2.isDone());
- fut0.thenAccept(lock -> lockManager.release(lock));
+ assertFalse(fut0.isDone());
- fut1.join();
- expectConflict(fut2);
+ lockManager.release(txId2, key, X);
+ fut0.thenAccept(lock -> lockManager.release(lock));
}
@Test
@@ -172,48 +170,48 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
UUID txId1 = Timestamp.nextVersion().toUuid();
UUID txId2 = Timestamp.nextVersion().toUuid();
UUID txId3 = Timestamp.nextVersion().toUuid();
- assertTrue(txId0.compareTo(txId1) < 0);
- assertTrue(txId1.compareTo(txId2) < 0);
- assertTrue(txId2.compareTo(txId3) < 0);
+ assertTrue(txId3.compareTo(txId2) > 0);
+ assertTrue(txId2.compareTo(txId1) > 0);
+ assertTrue(txId1.compareTo(txId0) > 0);
LockKey key = new LockKey("test");
- CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, S);
- assertTrue(fut0.isDone());
-
- CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, S);
- assertTrue(fut2.isDone());
+ CompletableFuture<Lock> fut3 = lockManager.acquire(txId3, key, S);
+ assertTrue(fut3.isDone());
CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, S);
assertTrue(fut1.isDone());
- CompletableFuture<Lock> fut3 = lockManager.acquire(txId3, key, X);
- assertFalse(fut3.isDone());
-
- assertTrue(lockManager.waiter(key, txId0).locked());
- assertTrue(lockManager.waiter(key, txId1).locked());
- assertTrue(lockManager.waiter(key, txId2).locked());
- assertFalse(lockManager.waiter(key, txId3).locked());
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, S);
+ assertTrue(fut2.isDone());
- lockManager.release(fut2.join());
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
+ assertFalse(fut0.isDone());
- assertTrue(lockManager.waiter(key, txId0).locked());
+ assertTrue(lockManager.waiter(key, txId3).locked());
+ assertTrue(lockManager.waiter(key, txId2).locked());
assertTrue(lockManager.waiter(key, txId1).locked());
- assertNull(lockManager.waiter(key, txId2));
- assertFalse(lockManager.waiter(key, txId3).locked());
+ assertFalse(lockManager.waiter(key, txId0).locked());
- lockManager.release(fut0.join());
+ lockManager.release(fut1.join());
- assertNull(lockManager.waiter(key, txId0));
- assertTrue(lockManager.waiter(key, txId1).locked());
- assertNull(lockManager.waiter(key, txId2));
- assertFalse(lockManager.waiter(key, txId3).locked());
+ assertTrue(lockManager.waiter(key, txId3).locked());
+ assertTrue(lockManager.waiter(key, txId2).locked());
+ assertNull(lockManager.waiter(key, txId1));
+ assertFalse(lockManager.waiter(key, txId0).locked());
- lockManager.release(fut1.join());
+ lockManager.release(fut3.join());
- assertNull(lockManager.waiter(key, txId0));
+ assertNull(lockManager.waiter(key, txId3));
+ assertTrue(lockManager.waiter(key, txId2).locked());
assertNull(lockManager.waiter(key, txId1));
+ assertFalse(lockManager.waiter(key, txId0).locked());
+
+ lockManager.release(fut2.join());
+
+ assertNull(lockManager.waiter(key, txId3));
assertNull(lockManager.waiter(key, txId2));
- assertTrue(lockManager.waiter(key, txId3).locked());
+ assertNull(lockManager.waiter(key, txId1));
+ assertTrue(lockManager.waiter(key, txId0).locked());
}
@Test
@@ -223,10 +221,10 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
LockKey key = new LockKey("test");
// Lock in order
- CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, S);
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, S);
assertTrue(fut0.isDone());
- CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, X);
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId0, key, X);
assertFalse(fut1.isDone());
lockManager.release(fut0.join());
@@ -237,11 +235,11 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
assertTrue(lockManager.queue(key).isEmpty());
// Lock not in order
- fut0 = lockManager.acquire(txId1, key, S);
+ fut0 = lockManager.acquire(txId0, key, S);
assertTrue(fut0.isDone());
try {
- lockManager.acquire(txId0, key, X).join();
+ lockManager.acquire(txId1, key, X).join();
fail();
} catch (CompletionException e) {
@@ -258,10 +256,10 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
CompletableFuture<Lock> fut0 = lockManager.acquire(txId[1], key, S);
assertTrue(fut0.isDone());
- CompletableFuture<Lock> fut1 = lockManager.acquire(txId[2], key, X);
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId[0], key, X);
assertFalse(fut1.isDone());
- CompletableFuture<Lock> fut2 = lockManager.acquire(txId[0], key, S);
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId[2], key, S);
assertTrue(fut2.isDone());
lockManager.release(fut0.join());
@@ -278,16 +276,16 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
LockKey key = new LockKey("test");
// Lock in order
- CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, S);
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, S);
assertTrue(fut0.isDone());
- CompletableFuture<Lock> fut1 = lockManager.acquire(txId2, key, X);
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId0, key, X);
assertFalse(fut1.isDone());
- CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, key, S);
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, S);
assertTrue(fut2.isDone());
- assertFalse(lockManager.waiter(key, txId2).locked());
+ assertFalse(lockManager.waiter(key, txId0).locked());
lockManager.release(fut2.join());
lockManager.release(fut0.join());
@@ -297,14 +295,14 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
@Test
public void testSingleKeyReadWriteConflict4() throws LockException {
- UUID txId0 = Timestamp.nextVersion().toUuid();
UUID txId1 = Timestamp.nextVersion().toUuid();
final UUID txId2 = Timestamp.nextVersion().toUuid();
UUID txId3 = Timestamp.nextVersion().toUuid();
+ UUID txId4 = Timestamp.nextVersion().toUuid();
LockKey key = new LockKey("test");
- CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, S);
- assertTrue(fut0.isDone());
+ CompletableFuture<Lock> fut4 = lockManager.acquire(txId4, key, S);
+ assertTrue(fut4.isDone());
CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, X);
assertFalse(fut1.isDone());
@@ -322,29 +320,9 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
UUID txId1 = Timestamp.nextVersion().toUuid();
LockKey key = new LockKey("test");
- lockManager.acquire(txId1, key, X).join();
+ lockManager.acquire(txId0, key, X).join();
- expectConflict(lockManager.acquire(txId0, key, X));
- }
-
- @Test
- public void testSingleKeyReadWriteConflict6() throws LockException {
- UUID txId0 = Timestamp.nextVersion().toUuid();
- UUID txId1 = Timestamp.nextVersion().toUuid();
- LockKey key = new LockKey("test");
-
- lockManager.acquire(txId0, key, S).join();
-
- lockManager.acquire(txId1, key, S).join();
-
- CompletableFuture<Lock> fut = lockManager.acquire(txId1, key, X);
- assertFalse(fut.isDone());
-
- Lock lock0 = lockManager.acquire(txId0, key, X).join();
-
- lockManager.release(lock0);
-
- expectConflict(fut);
+ expectConflict(lockManager.acquire(txId1, key, X));
}
@Test
@@ -378,8 +356,8 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
lockModes.add(new IgniteBiTuple<>(X, IS));
for (IgniteBiTuple<LockMode, LockMode> lockModePair : lockModes) {
- CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key,
lockModePair.get1());
- CompletableFuture<Lock> fut1 = lockManager.acquire(txId0, key,
lockModePair.get2());
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key,
lockModePair.get2());
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key,
lockModePair.get1());
assertTrue(fut0.isDone());
expectConflict(fut1);
@@ -401,11 +379,11 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
CompletableFuture<Lock> fut0 = lockManager.acquire(txId1, key, X);
assertTrue(fut0.isDone());
- CompletableFuture<Lock> fut1 = lockManager.acquire(txId2, key, X);
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId0, key, X);
assertFalse(fut1.isDone());
try {
- lockManager.acquire(txId0, key, X).join();
+ lockManager.acquire(txId2, key, X).join();
fail();
} catch (CompletionException e) {
@@ -421,14 +399,14 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
LockKey key = new LockKey("test");
// Lock in order
- CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
- assertTrue(fut0.isDone());
+ CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, X);
+ assertTrue(fut2.isDone());
- CompletableFuture<Lock> fut1 = lockManager.acquire(txId2, key, X);
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, X);
assertFalse(fut1.isDone());
- CompletableFuture<Lock> fut2 = lockManager.acquire(txId1, key, X);
- assertFalse(fut2.isDone());
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X);
+ assertFalse(fut0.isDone());
}
@Test
@@ -469,11 +447,11 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
UUID txId1 = Timestamp.nextVersion().toUuid();
LockKey key = new LockKey("test");
- lockManager.acquire(txId1, key, S).join();
+ lockManager.acquire(txId0, key, S).join();
- Lock lock = lockManager.acquire(txId0, key, S).join();
+ Lock lock = lockManager.acquire(txId1, key, S).join();
- CompletableFuture<Lock> fut = lockManager.acquire(txId1, key, X);
+ CompletableFuture<Lock> fut = lockManager.acquire(txId0, key, X);
assertFalse(fut.isDone());
lockManager.release(lock);
@@ -495,7 +473,7 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
lockManager.acquire(txId1, key, S).join();
- expectConflict(lockManager.acquire(txId0, key, X));
+ expectConflict(lockManager.acquire(txId1, key, X));
}
@Test
@@ -509,13 +487,15 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
lockManager.acquire(txId0, key, S).join();
- lockManager.acquire(txId2, key, S).join();
+ Lock lock2 = lockManager.acquire(txId2, key, S).join();
- try {
- lockManager.acquire(txId1, key, X).join();
- } catch (CompletionException e) {
- // Expected.
- }
+ CompletableFuture<?> fut1 = lockManager.acquire(txId1, key, X);
+
+ assertFalse(fut1.isDone());
+
+ lockManager.release(lock2);
+ assertTrue(fut1.isDone());
+ assertTrue(fut1.isCompletedExceptionally());
}
@Test
@@ -524,11 +504,11 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
UUID txId1 = Timestamp.nextVersion().toUuid();
LockKey key = new LockKey("test");
- lockManager.acquire(txId1, key, S).join();
+ lockManager.acquire(txId0, key, S).join();
- Lock lock = lockManager.acquire(txId0, key, S).join();
+ Lock lock = lockManager.acquire(txId1, key, S).join();
- CompletableFuture<Lock> fut = lockManager.acquire(txId1, key, X);
+ CompletableFuture<Lock> fut = lockManager.acquire(txId0, key, X);
assertFalse(fut.isDone());
@@ -637,17 +617,17 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
lockModes.add(List.of(X, IS));
for (List<LockMode> lockModes0 : lockModes) {
- CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key,
lockModes0.get(1));
- CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key,
lockModes0.get(0));
-
- assertTrue(fut0.isDone());
- assertFalse(fut1.isDone());
-
- lockManager.release(fut0.join());
+ CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key,
lockModes0.get(1));
+ CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key,
lockModes0.get(0));
assertTrue(fut1.isDone());
+ assertFalse(fut0.isDone());
lockManager.release(fut1.join());
+
+ assertTrue(fut0.isDone());
+
+ lockManager.release(fut0.join());
}
}
@@ -801,23 +781,23 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
UUID txId1 = Timestamp.nextVersion().toUuid();
UUID txId2 = Timestamp.nextVersion().toUuid();
- var tx1SharedLock = lockManager.acquire(txId1, key, S);
+ var tx1SharedLock = lockManager.acquire(txId2, key, S);
assertTrue(tx1SharedLock.isDone());
- var tx1ExclusiveLock = lockManager.acquire(txId1, key, X);
+ var tx1ExclusiveLock = lockManager.acquire(txId2, key, X);
assertTrue(tx1ExclusiveLock.isDone());
- var tx2SharedLock = lockManager.acquire(txId2, key, S);
+ var tx2SharedLock = lockManager.acquire(txId1, key, S);
assertFalse(tx2SharedLock.isDone());
- lockManager.release(txId1, key, X);
+ lockManager.release(txId2, key, X);
- assertTrue(lockManager.locks(txId1).hasNext());
+ assertTrue(lockManager.locks(txId2).hasNext());
- var lock = lockManager.locks(txId1).next();
+ var lock = lockManager.locks(txId2).next();
assertSame(S, lock.lockMode());
@@ -831,23 +811,23 @@ public abstract class AbstractLockManagerTest extends
IgniteAbstractTest {
UUID txId1 = Timestamp.nextVersion().toUuid();
UUID txId2 = Timestamp.nextVersion().toUuid();
- var tx1SharedLock = lockManager.acquire(txId1, key, S);
+ var tx1SharedLock = lockManager.acquire(txId2, key, S);
assertTrue(tx1SharedLock.isDone());
- var tx1ExclusiveLock = lockManager.acquire(txId1, key, X);
+ var tx1ExclusiveLock = lockManager.acquire(txId2, key, X);
assertTrue(tx1ExclusiveLock.isDone());
- var tx2SharedLock = lockManager.acquire(txId2, key, S);
+ var tx2SharedLock = lockManager.acquire(txId1, key, S);
assertFalse(tx2SharedLock.isDone());
- lockManager.release(txId1, key, S);
+ lockManager.release(txId2, key, S);
- assertTrue(lockManager.locks(txId1).hasNext());
+ assertTrue(lockManager.locks(txId2).hasNext());
- var lock = lockManager.locks(txId1).next();
+ var lock = lockManager.locks(txId2).next();
assertSame(X, lock.lockMode());
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/DeadlockPreventionTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/DeadlockPreventionTest.java
new file mode 100644
index 0000000000..ca695d99cb
--- /dev/null
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/DeadlockPreventionTest.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.apache.ignite.internal.tx.LockMode.S;
+import static org.apache.ignite.internal.tx.LockMode.X;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for deadlock prevention scenarios.
+ */
+public class DeadlockPreventionTest {
+ private LockManager lockManager = new HeapLockManager();
+ private Map<UUID, List<CompletableFuture<Lock>>> locks = new HashMap<>();
+
+ @Test
+ public void testWaitDie0() {
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+
+ var key1 = key("test");
+
+ assertThat(xlock(tx1, key1), willCompleteSuccessfully());
+
+ assertThrowsLockException(() -> xlock(tx2, key1));
+ }
+
+ @Test
+ public void testWaitDie1() {
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+
+ var key1 = key("test");
+
+ assertThat(xlock(tx2, key1), willSucceedFast());
+
+ CompletableFuture<?> xlockFutTx1 = xlock(tx1, key1);
+ assertFalse(xlockFutTx1.isDone());
+
+ commitTx(tx2);
+ assertThat(xlockFutTx1, willSucceedFast());
+ }
+
+ @Test
+ public void testWaitDieSlocks1() {
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+
+ var key1 = key("test");
+
+ assertThat(slock(tx1, key1), willSucceedFast());
+ assertThat(slock(tx2, key1), willSucceedFast());
+
+ assertThrowsLockException(() -> xlock(tx2, key1));
+ }
+
+ @Test
+ public void testWaitDieSlocks2() {
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+
+ var key1 = key("test");
+
+ assertThat(slock(tx1, key1), willSucceedFast());
+ assertThat(slock(tx2, key1), willSucceedFast());
+
+ CompletableFuture<?> xlockTx1 = xlock(tx1, key1);
+ assertFalse(xlockTx1.isDone());
+
+ assertThrowsLockException(() -> xlock(tx2, key1));
+
+ rollbackTx(tx2);
+
+ assertThat(xlockTx1, willSucceedFast());
+ }
+
+ @Test
+ public void testNonFair() {
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+ var tx3 = beginTx();
+
+ var k = key("test");
+
+ assertThat(slock(tx3, k), willSucceedFast());
+
+ CompletableFuture<?> futTx2 = xlock(tx2, k);
+ assertFalse(futTx2.isDone());
+
+ CompletableFuture<?> futTx1 = xlock(tx1, k);
+ assertFalse(futTx1.isDone());
+
+ commitTx(tx3);
+
+ // TODO correctness
+ assertThat(futTx1, willSucceedFast());
+
+ assertThrowsLockException(() -> futTx2);
+ }
+
+ @Test
+ public void testReenterWithConflict() {
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+
+ var k = key("test");
+
+ assertThat(slock(tx2, k), willSucceedFast());
+ assertThat(slock(tx1, k), willSucceedFast());
+
+ CompletableFuture<?> futTx1 = xlock(tx1, k);
+ assertFalse(futTx1.isDone());
+
+ commitTx(tx2);
+
+ assertThat(futTx1, willSucceedFast());
+ }
+
+ @Test
+ public void testReenterWithConflictAndAbort() {
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+
+ var k = key("test");
+
+ assertThat(slock(tx2, k), willSucceedFast());
+ assertThat(slock(tx1, k), willSucceedFast());
+
+ assertThrowsLockException(() -> xlock(tx2, k));
+ }
+
+ @Test
+ public void testReenterAllowed() {
+ var tx1 = beginTx();
+
+ var k = key("test");
+
+ assertThat(slock(tx1, k), willSucceedFast());
+ assertThat(xlock(tx1, k), willSucceedFast());
+ }
+
+ @Test
+ public void testNonFairConflictWithAlreadyWaiting() {
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+ var tx3 = beginTx();
+
+ var k = key("test");
+
+ assertThat(slock(tx2, k), willSucceedFast());
+
+ CompletableFuture<?> futTx1 = xlock(tx1, k);
+ assertFalse(futTx1.isDone());
+
+ assertThat(slock(tx3, k), willSucceedFast());
+
+ assertFalse(futTx1.isDone());
+ }
+
+ @Test
+ public void testNonFairConflictWithAlreadyWaitingWithAbort() {
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+ var tx3 = beginTx();
+
+ var k = key("test");
+
+ assertThat(slock(tx3, k), willSucceedFast());
+
+ CompletableFuture<?> futTx2 = xlock(tx2, k);
+ assertFalse(futTx2.isDone());
+
+ assertThat(slock(tx1, k), willSucceedFast());
+
+ commitTx(tx3);
+
+ assertThrowsLockException(() -> futTx2);
+ }
+
+ @Test
+ public void testNonFairTakeFirstCompatible() {
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+ var tx3 = beginTx();
+ var tx4 = beginTx();
+
+ var k = key("test");
+
+ assertThat(slock(tx4, k), willSucceedFast());
+
+ CompletableFuture<?> futTx2 = xlock(tx2, k);
+ assertFalse(futTx2.isDone());
+
+ assertThat(slock(tx1, k), willSucceedFast());
+ assertThat(slock(tx3, k), willSucceedFast());
+
+ assertFalse(futTx2.isDone());
+
+ commitTx(tx1);
+ commitTx(tx3);
+ commitTx(tx4);
+
+ futTx2.join();
+ }
+
+ @Test
+ public void testLockOrderAfterRelease() {
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+ var tx3 = beginTx();
+ var tx4 = beginTx();
+
+ var k = key("test");
+
+ assertThat(xlock(tx4, k), willSucceedFast());
+
+ CompletableFuture<?> futTx3 = slock(tx3, k);
+ assertFalse(futTx3.isDone());
+
+ CompletableFuture<?> futTx2 = xlock(tx2, k);
+ assertFalse(futTx2.isDone());
+
+ CompletableFuture<?> futTx1 = slock(tx1, k);
+ assertFalse(futTx1.isDone());
+
+ commitTx(tx4);
+
+ assertThat(futTx3, willSucceedFast());
+ assertThat(futTx1, willSucceedFast());
+ assertFalse(futTx2.isDone());
+
+ commitTx(tx1);
+ commitTx(tx3);
+
+ assertThat(futTx2, willSucceedFast());
+ }
+
+ @Test
+ public void testMultipleCompatibleLocksAcquiredAfterIncompatibleReleased()
{
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+ var tx3 = beginTx();
+
+ var k = key("test");
+
+ assertThat(xlock(tx3, k), willSucceedFast());
+
+ CompletableFuture<?> futTx2 = slock(tx2, k);
+ assertFalse(futTx2.isDone());
+
+ CompletableFuture<?> futTx1 = slock(tx1, k);
+ assertFalse(futTx1.isDone());
+
+ commitTx(tx3);
+
+ assertThat(futTx2, willSucceedFast());
+ assertThat(futTx1, willSucceedFast());
+ }
+
+ private UUID beginTx() {
+ return Timestamp.nextVersion().toUuid();
+ }
+
+ private LockKey key(Object key) {
+ ByteBuffer b = ByteBuffer.allocate(Integer.BYTES);
+ b.putInt(key.hashCode());
+
+ return new LockKey(b);
+ }
+
+ private CompletableFuture<?> xlock(UUID tx, LockKey key) {
+ return acquire(tx, key, X);
+ }
+
+ private CompletableFuture<?> slock(UUID tx, LockKey key) {
+ return acquire(tx, key, S);
+ }
+
+ private CompletableFuture<?> acquire(UUID tx, LockKey key, LockMode mode) {
+ CompletableFuture<Lock> fut = lockManager.acquire(tx, key, mode);
+
+ locks.compute(tx, (k, v) -> {
+ if (v == null) {
+ v = new ArrayList<>();
+ }
+
+ v.add(fut);
+
+ return v;
+ });
+
+ return fut;
+ }
+
+ private void commitTx(UUID tx) {
+ finishTx(tx);
+ }
+
+ private void rollbackTx(UUID tx) {
+ finishTx(tx);
+ }
+
+ private void finishTx(UUID tx) {
+ List<CompletableFuture<Lock>> txLocks = locks.remove(tx);
+ assertNotNull(txLocks);
+
+ for (CompletableFuture<Lock> fut : txLocks) {
+ assertTrue(fut.isDone());
+
+ if (!fut.isCompletedExceptionally()) {
+ Lock lock = fut.join();
+
+ lockManager.release(lock);
+ }
+ }
+ }
+
+ private static void assertCompletedExceptionally(CompletableFuture<?> fut)
{
+ assertTrue(fut.isDone());
+ assertThrows(CompletionException.class, fut::join);
+ }
+
+ private static void
assertThrowsLockException(Supplier<CompletableFuture<?>> s) {
+ try {
+ CompletableFuture<?> f = s.get();
+
+ assertTrue(f.isDone());
+ f.join();
+
+ fail();
+ } catch (Exception e) {
+ if (!hasCause(e, LockException.class, null)) {
+ fail();
+ }
+ }
+ }
+}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java
index 75d1062bf2..2d08c52bf0 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java
@@ -17,13 +17,11 @@
package org.apache.ignite.internal.tx;
-import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
/**
* Test class for {@link HeapLockManager}.
*/
-@WithSystemProperty(key = "IGNITE_ALL_LOCK_TYPES_ARE_USED", value = "true")
public class HeapLockManagerTest extends AbstractLockManagerTest {
@Override
protected LockManager newInstance() {