This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6774c9fa90 IGNITE-18717 Fix parallel read and delete RowVersion for
PageMemoryMvPartitionStorage (#1652)
6774c9fa90 is described below
commit 6774c9fa90afbcceb8d03c1f200138afaba0e5da
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Feb 14 17:53:36 2023 +0300
IGNITE-18717 Fix parallel read and delete RowVersion for
PageMemoryMvPartitionStorage (#1652)
---
.../tree/AbstractBplusTreePageMemoryTest.java | 148 +++++++++-
.../ignite/internal/pagememory/tree/BplusTree.java | 283 +++++++++++++------
.../internal/pagememory/tree/IgniteTree.java | 11 +
.../ignite/internal/storage/StorageException.java | 11 +
.../ignite/internal/storage/util/StorageUtils.java | 10 +
.../AbstractMvPartitionStorageConcurrencyTest.java | 36 ++-
.../storage/AbstractMvPartitionStorageTest.java | 3 +-
.../storage/BaseMvPartitionStorageTest.java | 8 +
.../pagememory/mv/AbortWriteInvokeClosure.java | 122 ++++++++
.../mv/AbstractPageMemoryMvPartitionStorage.java | 309 +++++++++------------
.../mv/AbstractPartitionTimestampCursor.java | 79 ++----
.../mv/AddWriteCommittedInvokeClosure.java | 103 +++++++
.../pagememory/mv/AddWriteInvokeClosure.java | 156 +++++++++++
.../pagememory/mv/CommitWriteInvokeClosure.java | 101 +++++++
.../internal/storage/pagememory/mv/LockHolder.java | 71 +++++
.../storage/pagememory/mv/ScanVersionsCursor.java | 17 +-
.../storage/pagememory/mv/LockHolderTest.java | 83 ++++++
...ageMemoryMvPartitionStorageConcurrencyTest.java | 37 ++-
...ageMemoryMvPartitionStorageConcurrencyTest.java | 37 ++-
19 files changed, 1284 insertions(+), 341 deletions(-)
diff --git
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
index 789fd0d2ea..20ec72f1db 100644
---
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
+++
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
@@ -34,13 +34,16 @@ import static
org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreadedAsync;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.Constants.GiB;
import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -51,6 +54,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
@@ -80,6 +84,7 @@ import
org.apache.ignite.internal.pagememory.datastructure.DataStructure;
import org.apache.ignite.internal.pagememory.io.IoVersions;
import org.apache.ignite.internal.pagememory.reuse.ReuseList;
import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowClosure;
+import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowMapClosure;
import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
import org.apache.ignite.internal.pagememory.tree.io.BplusInnerIo;
@@ -2367,6 +2372,147 @@ public abstract class AbstractBplusTreePageMemoryTest
extends BaseIgniteAbstract
assertEquals(0L, tree.findNext(-1L, true));
}
+ @Test
+ void testFindOneWithMapper() throws Exception {
+ TestTree tree = createTestTree(true);
+
+ tree.put(0L);
+
+ TreeRowMapClosure<Long, Long, String> treeRowClosure = new
TreeRowMapClosure<>() {
+ @Override
+ public String map(Long treeRow) {
+ return "row" + treeRow;
+ }
+ };
+
+ assertEquals("row0", tree.findOne(0L, treeRowClosure, null));
+ assertEquals("rownull", tree.findOne(1L, treeRowClosure, null));
+ }
+
+ @Test
+ void testFindWithMapper() throws Exception {
+ TestTree tree = createTestTree(true);
+
+ tree.put(0L);
+ tree.put(1L);
+
+ TreeRowMapClosure<Long, Long, String> treeRowClosure = new
TreeRowMapClosure<>() {
+ @Override
+ public String map(Long treeRow) {
+ return "row" + treeRow;
+ }
+ };
+
+ Cursor<String> cursor = tree.find(null, null, treeRowClosure, null);
+
+ assertTrue(cursor.hasNext());
+ assertEquals("row0", cursor.next());
+
+ assertTrue(cursor.hasNext());
+ assertEquals("row1", cursor.next());
+
+ assertFalse(cursor.hasNext());
+ assertThrows(NoSuchElementException.class, cursor::next);
+ }
+
+ @Test
+ void testInvokeClosureWithOnUpdateCallbackForPut() throws Exception {
+ TestTree tree = createTestTree(true);
+
+ // Checks insert.
+ CompletableFuture<Void> future0 = new CompletableFuture<>();
+
+ tree.invoke(0L, null, new InvokeClosure<>() {
+ @Override
+ public void call(@Nullable Long oldRow) {
+ assertNull(oldRow);
+ }
+
+ @Override
+ public @Nullable Long newRow() {
+ return 0L;
+ }
+
+ @Override
+ public OperationType operationType() {
+ return PUT;
+ }
+
+ @Override
+ public void onUpdate() {
+ future0.complete(null);
+ }
+ });
+
+ assertThat(future0, willCompleteSuccessfully());
+
+ assertEquals(0L, tree.findOne(0L));
+
+ // Checks replace.
+ CompletableFuture<Void> future1 = new CompletableFuture<>();
+
+ tree.invoke(0L, null, new InvokeClosure<>() {
+ @Override
+ public void call(@Nullable Long oldRow) {
+ assertEquals(0L, oldRow);
+ }
+
+ @Override
+ public @Nullable Long newRow() {
+ return 0L;
+ }
+
+ @Override
+ public OperationType operationType() {
+ return PUT;
+ }
+
+ @Override
+ public void onUpdate() {
+ future1.complete(null);
+ }
+ });
+
+ assertThat(future1, willCompleteSuccessfully());
+
+ assertEquals(0L, tree.findOne(0L));
+ }
+
+ @Test
+ void testInvokeClosureWithOnUpdateCallbackForRemove() throws Exception {
+ TestTree tree = createTestTree(true);
+
+ tree.put(0L);
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
+ tree.invoke(0L, null, new InvokeClosure<>() {
+ @Override
+ public void call(@Nullable Long oldRow) {
+ assertEquals(0L, oldRow);
+ }
+
+ @Override
+ public @Nullable Long newRow() {
+ return null;
+ }
+
+ @Override
+ public OperationType operationType() {
+ return REMOVE;
+ }
+
+ @Override
+ public void onUpdate() {
+ future.complete(null);
+ }
+ });
+
+ assertThat(future, willCompleteSuccessfully());
+
+ assertNull(tree.findOne(0L));
+ }
+
private void doTestRandomPutRemoveMultithreaded(boolean canGetRow) throws
Exception {
final TestTree tree = createTestTree(canGetRow);
@@ -2850,7 +2996,7 @@ public abstract class AbstractBplusTreePageMemoryTest
extends BaseIgniteAbstract
/**
* {@link TreeRowClosure} implementation for the test.
*/
- static class TestTreeFindFilteredClosure implements TreeRowClosure<Long,
Long> {
+ static class TestTreeFindFilteredClosure implements
TreeRowMapClosure<Long, Long, Long> {
private final Set<Long> vals;
/**
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
index f0a915f41e..2be9922618 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
@@ -1178,18 +1178,18 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
*
* @param upper Upper bound.
* @param upIncl {@code true} if upper bound is inclusive.
- * @param c Filter closure.
+ * @param c Tree row closure.
* @param x Implementation specific argument, {@code null} always means
that we need to return full detached data row.
* @return Cursor.
* @throws IgniteInternalCheckedException If failed.
*/
- private Cursor<T> findLowerUnbounded(
+ private <R> Cursor<R> findLowerUnbounded(
L upper,
boolean upIncl,
- TreeRowClosure<L, T> c,
+ TreeRowMapClosure<L, T, R> c,
@Nullable Object x
) throws IgniteInternalCheckedException {
- ForwardCursor cursor = new ForwardCursor(upper, upIncl, c, x);
+ ForwardCursor<R> cursor = new ForwardCursor<>(upper, upIncl, c, x);
long firstPageId;
@@ -1249,15 +1249,15 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
*
* @param lower Lower bound inclusive or {@code null} if unbounded.
* @param upper Upper bound inclusive or {@code null} if unbounded.
- * @param c Filter closure.
+ * @param c Tree row closure.
* @param x Implementation specific argument, {@code null} always means
that we need to return full detached data row.
* @return Cursor.
* @throws IgniteInternalCheckedException If failed.
*/
- public Cursor<T> find(
+ public <R> Cursor<R> find(
@Nullable L lower,
@Nullable L upper,
- TreeRowClosure<L, T> c,
+ TreeRowMapClosure<L, T, R> c,
@Nullable Object x
) throws IgniteInternalCheckedException {
return find(lower, upper, true, true, c, x);
@@ -1270,22 +1270,24 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
* @param upper Upper bound or {@code null} if unbounded.
* @param lowIncl {@code true} if lower bound is inclusive.
* @param upIncl {@code true} if upper bound is inclusive.
- * @param c Filter closure.
+ * @param c Tree row closure.
* @param x Implementation specific argument, {@code null} always means
that we need to return full detached data row.
* @return Cursor.
- * @throws IgniteInternalCheckedException If failed.
+ * @throws CorruptedDataStructureException If the data structure is broken.
+ * @throws CorruptedTreeException If there were {@link RuntimeException}
or {@link AssertionError}.
+ * @throws IgniteInternalCheckedException If other errors occurred.
*/
- public Cursor<T> find(
+ public <R> Cursor<R> find(
@Nullable L lower,
@Nullable L upper,
boolean lowIncl,
boolean upIncl,
- @Nullable TreeRowClosure<L, T> c,
+ @Nullable TreeRowMapClosure<L, T, R> c,
@Nullable Object x
) throws IgniteInternalCheckedException {
checkDestroyed();
- ForwardCursor cursor = new ForwardCursor(lower, upper, lowIncl,
upIncl, c, x);
+ ForwardCursor<R> cursor = new ForwardCursor<>(lower, upper, lowIncl,
upIncl, c, x);
try {
if (lower == null) {
@@ -1301,7 +1303,7 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
throw new IgniteInternalCheckedException("Runtime failure on
bounds: [lower=" + lower + ", upper=" + upper + "]", e);
} catch (RuntimeException | AssertionError e) {
long[] pageIds = pages(
- lower == null || cursor == null || cursor.getCursor ==
null,
+ lower == null || cursor.getCursor == null,
() -> new long[]{cursor.getCursor.pageId}
);
@@ -1507,11 +1509,13 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
try {
if (c == null) {
- g = new GetOne(null, null, null, true);
+ GetOne<T> getOne = new GetOne<>(null, null, null, true);
+
+ g = getOne;
doFind(g);
- return (T) g.row;
+ return getOne.res;
} else {
GetLast getLast = new GetLast(c);
@@ -1549,18 +1553,25 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
* Returns found result or {@code null}.
*
* @param row Lookup row for exact match.
+ * @param c Tree row closure, if the tree row is not found, then {@code
null} will be passed to the {@link TreeRowMapClosure#map}.
* @param x Implementation specific argument, {@code null} always means
that we need to return full detached data row.
- * @throws IgniteInternalCheckedException If failed.
+ * @throws CorruptedDataStructureException If the data structure is broken.
+ * @throws CorruptedTreeException If there were {@link RuntimeException}
or {@link AssertionError}.
+ * @throws IgniteInternalCheckedException If other errors occurred.
*/
- public final <R> @Nullable R findOne(L row, @Nullable TreeRowClosure<L, T>
c, Object x) throws IgniteInternalCheckedException {
+ public final <R> @Nullable R findOne(
+ L row,
+ @Nullable TreeRowMapClosure<L, T, R> c,
+ @Nullable Object x
+ ) throws IgniteInternalCheckedException {
checkDestroyed();
- GetOne g = new GetOne(row, c, x, false);
+ GetOne<R> g = new GetOne<>(row, c, x, false);
try {
doFind(g);
- return (R) g.row;
+ return g.res;
} catch (CorruptedDataStructureException e) {
throw e;
} catch (IgniteInternalCheckedException e) {
@@ -3353,27 +3364,34 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
/**
* Get a single entry.
*/
- private final class GetOne extends Get {
- Object arg;
+ private final class GetOne<R> extends Get {
+ private final @Nullable Object arg;
- @Nullable TreeRowClosure<L, T> filter;
+ private final @Nullable TreeRowMapClosure<L, T, R> treeRowClosure;
+
+ private @Nullable R res;
/**
* Constructor.
*
* @param row Row.
- * @param filter Closure filter.
+ * @param treeRowClosure Tree row closure, if the tree row is not
found, then {@code null} will be passed to the
+ * {@link TreeRowMapClosure#map}.
* @param arg Implementation specific argument.
* @param findLast Ignore row passed, find last row
*/
- private GetOne(L row, @Nullable TreeRowClosure<L, T> filter, Object
arg, boolean findLast) {
+ private GetOne(
+ @Nullable L row,
+ @Nullable TreeRowMapClosure<L, T, R> treeRowClosure,
+ @Nullable Object arg,
+ boolean findLast
+ ) {
super(row, findLast);
+ this.treeRowClosure = treeRowClosure;
this.arg = arg;
- this.filter = filter;
}
- /** {@inheritDoc} */
@Override
boolean found(BplusIo<L> io, long pageAddr, int idx, int lvl) throws
IgniteInternalCheckedException {
// Check if we are on an inner page and can't get row from it.
@@ -3381,10 +3399,27 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
return false;
}
- row = filter == null || filter.apply(BplusTree.this, io, pageAddr,
idx) ? getRow(io, pageAddr, idx, arg) : null;
+ if (treeRowClosure == null || treeRowClosure.apply(BplusTree.this,
io, pageAddr, idx)) {
+ T treeRow = getRow(io, pageAddr, idx, arg);
+
+ res = treeRowClosure != null ? treeRowClosure.map(treeRow) :
(R) treeRow;
+ }
return true;
}
+
+ @Override
+ boolean notFound(BplusIo<L> io, long pageAddr, int idx, int lvl) {
+ assert lvl >= 0 : lvl;
+
+ if (lvl == 0) {
+ res = treeRowClosure == null ? null : treeRowClosure.map(null);
+
+ return true;
+ }
+
+ return false;
+ }
}
/**
@@ -3812,12 +3847,22 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
* @param needOld {@code True} If need return old value.
*/
private Put(T row, boolean needOld) {
- super(row);
+ this(row, needOld, null);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param row Row.
+ * @param needOld {@code True} If need return old value.
+ * @param onUpdateCallback Callback after performing an update of tree
row while on a page with that tree row under its write lock.
+ */
+ private Put(T row, boolean needOld, @Nullable Runnable
onUpdateCallback) {
+ super(row, onUpdateCallback);
this.needOld = needOld;
}
- /** {@inheritDoc} */
@Override
boolean notFound(BplusIo<L> io, long pageAddr, int idx, int lvl) {
assert btmLvl >= 0 : btmLvl;
@@ -3826,7 +3871,6 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
return lvl == btmLvl;
}
- /** {@inheritDoc} */
@Override
protected Result finishOrLockTail(long pageId, long page, long backId,
long fwdId, int lvl)
throws IgniteInternalCheckedException {
@@ -3854,7 +3898,6 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
return res;
}
- /** {@inheritDoc} */
@Override
protected Result finishTail() throws IgniteInternalCheckedException {
// An inner node is required for replacement.
@@ -3932,7 +3975,6 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
releaseTail();
}
- /** {@inheritDoc} */
@Override
boolean isFinished() {
return row == null;
@@ -3965,6 +4007,10 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
private void insertSimple(long pageAddr, BplusIo<L> io, int idx)
throws IgniteInternalCheckedException {
io.insert(pageAddr, idx, row, null, rightId, false);
+
+ if (onUpdateCallback != null) {
+ onUpdateCallback.run();
+ }
}
/**
@@ -4100,7 +4146,7 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
* @return Result.
* @throws IgniteInternalCheckedException If failed.
*/
- public Result tryReplace(long pageId, long page, long fwdId, int lvl)
throws IgniteInternalCheckedException {
+ private Result tryReplace(long pageId, long page, long fwdId, int lvl)
throws IgniteInternalCheckedException {
// Init args.
this.pageId = pageId;
this.fwdId = fwdId;
@@ -4115,11 +4161,14 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
* @param pageAddr Page address.
* @param idx Replacement index.
*/
- public void replaceRowInPage(BplusIo<L> io, long pageAddr, int idx)
throws IgniteInternalCheckedException {
+ private void replaceRowInPage(BplusIo<L> io, long pageAddr, int idx)
throws IgniteInternalCheckedException {
io.store(pageAddr, idx, row, null, false);
+
+ if (onUpdateCallback != null) {
+ onUpdateCallback.run();
+ }
}
- /** {@inheritDoc} */
@Override
void checkLockRetry() throws IgniteInternalCheckedException {
// Non-null tail means that lock on the tail page is still being
held, and we can't fail with exception.
@@ -4254,14 +4303,14 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
assert newRow != null;
- op = new Put(newRow, false);
+ op = new Put(newRow, false, clo::onUpdate);
break;
case REMOVE:
assert foundRow != null;
- op = new Remove(row, false);
+ op = new Remove(row, false, clo::onUpdate);
break;
@@ -4423,13 +4472,32 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
/** We may need to lock part of the tree branch from the bottom to up
for multiple levels. */
Tail<L> tail;
+ /**
+ * Callback after performing an {@link Put put} or {@link Remove
remove} of a tree row while on a page with that tree row under its
+ * write lock.
+ */
+ final @Nullable Runnable onUpdateCallback;
+
/**
* Constructor.
*
* @param row Row.
*/
private Update(L row) {
+ this(row, null);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param row Row.
+ * @param onUpdateCallback Callback after performing an {@link Put
put} or {@link Remove remove} of a tree row while on a page with
+ * that tree row under its write lock.
+ */
+ private Update(L row, @Nullable Runnable onUpdateCallback) {
super(row, false);
+
+ this.onUpdateCallback = onUpdateCallback;
}
/**
@@ -4504,7 +4572,6 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
}
}
- /** {@inheritDoc} */
@Override
public final boolean canRelease(long pageId, int lvl) {
return pageId != 0L && !isTail(pageId, lvl);
@@ -4681,12 +4748,22 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
* @param needOld {@code True} If need return old value.
*/
private Remove(L row, boolean needOld) {
- super(row);
+ this(row, needOld, null);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param row Row.
+ * @param needOld {@code True} If need return old value.
+ * @param onRemoveCallback Callback after performing an remove of tree
row while on a page with that tree row under its write lock.
+ */
+ private Remove(L row, boolean needOld, @Nullable Runnable
onRemoveCallback) {
+ super(row, onRemoveCallback);
this.needOld = needOld;
}
- /** {@inheritDoc} */
@Override
public long pollFreePage() {
if (freePages == null) {
@@ -4704,7 +4781,6 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
return res;
}
- /** {@inheritDoc} */
@Override
public void addFreePage(long pageId) {
assert pageId != 0L;
@@ -4728,7 +4804,6 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
}
}
- /** {@inheritDoc} */
@Override
public boolean isEmpty() {
if (freePages == null) {
@@ -4740,7 +4815,6 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
return false;
}
- /** {@inheritDoc} */
@Override
boolean notFound(BplusIo<L> io, long pageAddr, int idx, int lvl) {
if (lvl == 0) {
@@ -4893,7 +4967,6 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
return false;
}
- /** {@inheritDoc} */
@Override
protected Result finishTail() throws IgniteInternalCheckedException {
assert !isFinished();
@@ -5148,6 +5221,10 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
doRemove(pageAddr, io, cnt, idx);
assert isRemoved();
+
+ if (onUpdateCallback != null) {
+ onUpdateCallback.run();
+ }
}
/**
@@ -5745,7 +5822,7 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
* @return Data row.
* @throws IgniteInternalCheckedException If failed.
*/
- public abstract T getRow(BplusIo<L> io, long pageAddr, int idx, Object x)
throws IgniteInternalCheckedException;
+ public abstract T getRow(BplusIo<L> io, long pageAddr, int idx, @Nullable
Object x) throws IgniteInternalCheckedException;
/**
* Abstract forward cursor.
@@ -6117,35 +6194,38 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
/**
* Forward cursor.
*/
- private final class ForwardCursor extends AbstractForwardCursor implements
Cursor<T> {
+ private final class ForwardCursor<R> extends AbstractForwardCursor
implements Cursor<R> {
/** Implementation specific argument. */
- @Nullable
- final Object arg;
+ private final @Nullable Object arg;
- /** Rows. */
- @Nullable
- private T[] rows = (T[]) OBJECT_EMPTY_ARRAY;
+ /** {@code null} array means the end of iteration over the cursor. */
+ private @Nullable R @Nullable [] results = (R[]) OBJECT_EMPTY_ARRAY;
+
+ private @Nullable T lastRow;
/** Row index. */
private int row = -1;
/** Filter closure. */
- @Nullable
- private final TreeRowClosure<L, T> filter;
+ private final @Nullable TreeRowMapClosure<L, T, R> treeRowClosure;
- @Nullable
- private Boolean hasNext = null;
+ private @Nullable Boolean hasNext = null;
/**
* Lower unbound cursor.
*
* @param upperBound Upper bound.
* @param upIncl {@code true} if upper bound is inclusive.
- * @param filter Filter closure.
+ * @param treeRowClosure Tree row closure.
* @param arg Implementation specific argument, {@code null} always
means that we need to return full detached data row.
*/
- ForwardCursor(@Nullable L upperBound, boolean upIncl, @Nullable
TreeRowClosure<L, T> filter, @Nullable Object arg) {
- this(null, upperBound, true, upIncl, filter, arg);
+ ForwardCursor(
+ @Nullable L upperBound,
+ boolean upIncl,
+ @Nullable TreeRowMapClosure<L, T, R> treeRowClosure,
+ @Nullable Object arg
+ ) {
+ this(null, upperBound, true, upIncl, treeRowClosure, arg);
}
/**
@@ -6155,7 +6235,7 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
* @param upperBound Upper bound.
* @param lowIncl {@code true} if lower bound is inclusive.
* @param upIncl {@code true} if upper bound is inclusive.
- * @param filter Filter closure.
+ * @param treeRowClosure Tree row closure.
* @param arg Implementation specific argument, {@code null} always
means that we need to return full detached data row.
*/
ForwardCursor(
@@ -6163,16 +6243,15 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
@Nullable L upperBound,
boolean lowIncl,
boolean upIncl,
- @Nullable TreeRowClosure<L, T> filter,
+ @Nullable TreeRowMapClosure<L, T, R> treeRowClosure,
@Nullable Object arg
) {
super(lowerBound, upperBound, lowIncl, upIncl);
- this.filter = filter;
+ this.treeRowClosure = treeRowClosure;
this.arg = arg;
}
- /** {@inheritDoc} */
@Override
boolean fillFromBuffer0(long pageAddr, BplusIo<L> io, int startIdx,
int cnt) throws IgniteInternalCheckedException {
if (startIdx == -1) {
@@ -6193,30 +6272,35 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
return false;
}
- if (rows == OBJECT_EMPTY_ARRAY) {
- rows = (T[]) new Object[cnt0];
+ if (results == OBJECT_EMPTY_ARRAY) {
+ results = (R[]) new Object[cnt0];
}
int resCnt = 0;
for (int idx = startIdx; idx < cnt; idx++) {
- if (filter == null || filter.apply(BplusTree.this, io,
pageAddr, idx)) {
- rows = set(rows, resCnt++, getRow(io, pageAddr, idx, arg));
+ if (treeRowClosure == null ||
treeRowClosure.apply(BplusTree.this, io, pageAddr, idx)) {
+ T treeRow = getRow(io, pageAddr, idx, arg);
+
+ R result = treeRowClosure != null ?
treeRowClosure.map(treeRow) : (R) treeRow;
+
+ results = set(results, resCnt++, result);
+
+ lastRow = treeRow;
}
}
if (resCnt == 0) {
- rows = (T[]) OBJECT_EMPTY_ARRAY;
+ results = (R[]) OBJECT_EMPTY_ARRAY;
return false;
}
- clearTail(rows, resCnt);
+ clearTail(results, resCnt);
return true;
}
- /** {@inheritDoc} */
@Override
boolean reinitialize0() {
hasNext = null;
@@ -6224,31 +6308,28 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
return hasNext();
}
- /** {@inheritDoc} */
@Override
void onNotFound(boolean readDone) {
if (readDone) {
- rows = null;
+ results = null;
} else {
- if (rows != OBJECT_EMPTY_ARRAY) {
- assert rows.length > 0; // Otherwise it makes no sense to
create an array.
+ if (results != OBJECT_EMPTY_ARRAY) {
+ assert results.length > 0; // Otherwise it makes no sense
to create an array.
// Fake clear.
- rows[0] = null;
+ results[0] = null;
}
}
}
- /** {@inheritDoc} */
@Override
void init0() {
row = -1;
}
- /** {@inheritDoc} */
@Override
public boolean hasNext() {
- if (rows == null) {
+ if (results == null) {
return false;
}
@@ -6262,30 +6343,25 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
/**
* Returns cleared last row.
*/
- private @Nullable T clearLastRow() {
+ private void clearLastResult() {
if (row == 0) {
- return null;
+ return;
}
int last = row - 1;
- T r = rows[last];
-
- assert r != null;
-
- rows[last] = null;
+ assert results[last] != null;
- return r;
+ results[last] = null;
}
- /** {@inheritDoc} */
@Override
- public T next() {
+ public R next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
- T r = rows[row];
+ R r = results[row];
assert r != null;
@@ -6295,16 +6371,20 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
}
private boolean advance() {
- if (++row < rows.length && rows[row] != null) {
- clearLastRow(); // Allow to GC the last returned row.
+ if (++row < results.length && results[row] != null) {
+ clearLastResult(); // Allow to GC the last returned row.
return true;
}
- T lastRow = clearLastRow();
+ clearLastResult();
row = 0;
+ T lastRow = this.lastRow;
+
+ this.lastRow = null;
+
try {
return nextPage(lastRow);
} catch (IgniteInternalCheckedException e) {
@@ -6499,6 +6579,27 @@ public abstract class BplusTree<L, T extends L> extends
DataStructure implements
boolean apply(BplusTree<L, T> tree, BplusIo<L> io, long pageAddr, int
idx) throws IgniteInternalCheckedException;
}
+ /**
+ * Extension of the {@link TreeRowClosure} with the ability to {@link
#map(Object) convert} tree row to some object.
+ */
+ public interface TreeRowMapClosure<L, T extends L, R> extends
TreeRowClosure<L, T> {
+ @Override
+ default boolean apply(BplusTree<L, T> tree, BplusIo<L> io, long
pageAddr, int idx) throws IgniteInternalCheckedException {
+ return true;
+ }
+
+ /**
+ * Converts a tree row to some object.
+ *
+ * <p>Executed after {@link #apply} has returned {@code true}, and
also under read lock of page on which the tree row is located.
+ *
+ * @param treeRow Tree row.
+ */
+ default R map(T treeRow) {
+ return (R) treeRow;
+ }
+ }
+
/**
* A generic visitor-style interface for performing
inspection/modification operations on the tree.
*/
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
index b182029ad8..0bcc8b04b6 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/IgniteTree.java
@@ -126,6 +126,17 @@ public interface IgniteTree<L, T> {
* Returns operation type for this closure.
*/
OperationType operationType();
+
+ /**
+ * Callback after inserting/replacing/deleting a tree row.
+ *
+ * <p>It is performed under the same write lock of page on which the
tree row is located.
+ *
+ * <p>What can allow us to ensure the atomicity of changes in the tree
row and the data associated with it.
+ */
+ default void onUpdate() {
+ // No-op.
+ }
}
/**
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
index 0b2e6d2d3c..58ddbcecfe 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageException.java
@@ -88,4 +88,15 @@ public class StorageException extends
IgniteInternalException {
public StorageException(String messagePattern, Throwable cause, Object...
params) {
this(IgniteStringFormatter.format(messagePattern, params), cause);
}
+
+ /**
+ * Constructor.
+ *
+ * @param messagePattern Error message pattern.
+ * @param params Error message params.
+ * @see IgniteStringFormatter#format(String, Object...)
+ */
+ public StorageException(String messagePattern, Object... params) {
+ this(IgniteStringFormatter.format(messagePattern, params));
+ }
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
index cf67fbf12a..e1aa4be770 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
@@ -21,6 +21,7 @@ import java.util.function.Supplier;
import org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteStringFormatter;
/**
@@ -145,6 +146,15 @@ public class StorageUtils {
}
}
+ /**
+ * Throws a {@link StorageException} if it is the cause.
+ */
+ public static void
throwStorageExceptionIfItCause(IgniteInternalCheckedException e) {
+ if (e.getCause() instanceof StorageException) {
+ throw ((StorageException) e.getCause());
+ }
+ }
+
private static String createStorageInProcessOfRebalanceErrorMessage(String
storageInfo) {
return IgniteStringFormatter.format("Storage in the process of
rebalancing: [{}]", storageInfo);
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index 67692d7c08..f96e3d42ef 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -43,7 +43,9 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
runRace(
() -> abortWrite(ROW_ID),
() -> read(ROW_ID, clock.now()),
- () -> scanFirstEntry(clock.now())
+ () -> scanFirstEntry(clock.now()),
+ () -> scanFirstEntry(HybridTimestamp.MAX_VALUE),
+ () -> scanFirstVersion(ROW_ID)
);
assertNull(read(ROW_ID, clock.now()));
@@ -58,7 +60,9 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
runRace(
() -> commitWrite(ROW_ID, clock.now()),
() -> read(ROW_ID, clock.now()),
- () -> scanFirstEntry(clock.now())
+ () -> scanFirstEntry(clock.now()),
+ () -> scanFirstEntry(HybridTimestamp.MAX_VALUE),
+ () -> scanFirstVersion(ROW_ID)
);
assertRowMatches(read(ROW_ID, clock.now()), TABLE_ROW);
@@ -73,7 +77,9 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
runRace(
() -> addWrite(ROW_ID, TABLE_ROW2, TX_ID),
() -> read(ROW_ID, clock.now()),
- () -> scanFirstEntry(clock.now())
+ () -> scanFirstEntry(clock.now()),
+ () -> scanFirstEntry(HybridTimestamp.MAX_VALUE),
+ () -> scanFirstVersion(ROW_ID)
);
assertRowMatches(read(ROW_ID, clock.now()), TABLE_ROW2);
@@ -82,7 +88,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
- void testRegularGcAndRead(AddAndCommit addAndCommit) {
+ public void testRegularGcAndRead(AddAndCommit addAndCommit) {
for (int i = 0; i < REPEATS; i++) {
HybridTimestamp firstCommitTs = addAndCommit(TABLE_ROW);
@@ -102,7 +108,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
- void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
+ public void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
for (int i = 0; i < REPEATS; i++) {
HybridTimestamp firstCommitTs = addAndCommit.perform(this,
TABLE_ROW);
@@ -120,7 +126,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
- void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
+ public void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
for (int i = 0; i < REPEATS; i++) {
addAndCommit.perform(this, TABLE_ROW);
@@ -141,7 +147,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
- void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
+ public void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
for (int i = 0; i < REPEATS; i++) {
addAndCommit.perform(this, TABLE_ROW);
@@ -164,7 +170,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
- void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
+ public void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
for (int i = 0; i < REPEATS; i++) {
addAndCommit.perform(this, TABLE_ROW);
@@ -183,7 +189,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
- void testConcurrentGc(AddAndCommit addAndCommit) {
+ public void testConcurrentGc(AddAndCommit addAndCommit) {
for (int i = 0; i < REPEATS; i++) {
addAndCommit.perform(this, TABLE_ROW);
@@ -207,6 +213,13 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
}
}
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private void scanFirstVersion(RowId rowId) {
+ try (var cursor = scan(rowId)) {
+ cursor.hasNext();
+ }
+ }
+
/**
* Adds a tombstone and cleans a GC queue until nothing's there.
*/
@@ -220,7 +233,10 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
} while (row != null);
}
- private enum AddAndCommit {
+ /**
+ * Performing add write.
+ */
+ protected enum AddAndCommit {
ATOMIC {
@Override
HybridTimestamp perform(AbstractMvPartitionStorageConcurrencyTest
test, @Nullable BinaryRow binaryRow) {
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 85dc0ac67d..4fe8d0b286 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@@ -1166,7 +1167,7 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvPartitionStor
RowId rowId = insert(binaryRow, txId);
StorageException ex = assertThrows(StorageException.class, () ->
addWriteCommitted(rowId, binaryRow2, clock.now()));
- assertThat(ex.getMessage(), containsString("Write intent exists for "
+ rowId));
+ assertThat(ex.getMessage(), allOf(containsString("Write intent
exists"), containsString(rowId.toString())));
}
@Test
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
index 9573d496c3..7724b0c2d5 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
@@ -26,6 +26,7 @@ import
org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -111,6 +112,13 @@ public abstract class BaseMvPartitionStorageTest extends
BaseMvStoragesTest {
return storage.scan(timestamp);
}
+ /**
+ * Scans versions.
+ */
+ protected Cursor<ReadResult> scan(RowId rowId) {
+ return storage.scanVersions(rowId);
+ }
+
/**
* Inserts a row inside of consistency closure.
*/
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
new file mode 100644
index 0000000000..ad176c65d6
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
+
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for {@link
AbstractPageMemoryMvPartitionStorage#abortWrite(RowId)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} which will cause form
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+class AbortWriteInvokeClosure implements InvokeClosure<VersionChain> {
+ private final RowId rowId;
+
+ private final AbstractPageMemoryMvPartitionStorage storage;
+
+ private OperationType operationType;
+
+ private @Nullable VersionChain newRow;
+
+ private @Nullable RowVersion toRemove;
+
+ private @Nullable BinaryRow previousUncommittedRowVersion;
+
+ AbortWriteInvokeClosure(RowId rowId, AbstractPageMemoryMvPartitionStorage
storage) {
+ this.rowId = rowId;
+ this.storage = storage;
+ }
+
+ @Override
+ public void call(@Nullable VersionChain oldRow) throws
IgniteInternalCheckedException {
+ if (oldRow == null || oldRow.transactionId() == null) {
+ // Row doesn't exist or the chain doesn't contain an uncommitted
write intent.
+ operationType = OperationType.NOOP;
+
+ return;
+ }
+
+ RowVersion latestVersion = storage.readRowVersion(oldRow.headLink(),
ALWAYS_LOAD_VALUE);
+
+ assert latestVersion.isUncommitted();
+
+ toRemove = latestVersion;
+
+ if (latestVersion.hasNextLink()) {
+ // Next can be safely replaced with any value (like 0), because
this field is only used when there
+ // is some uncommitted value, but when we add an uncommitted
value, we 'fix' such placeholder value
+ // (like 0) by replacing it with a valid value.
+ newRow = VersionChain.createCommitted(rowId,
latestVersion.nextLink(), NULL_LINK);
+
+ operationType = OperationType.PUT;
+ } else {
+ // It was the only version, let's remove the chain as well.
+ operationType = OperationType.REMOVE;
+ }
+
+ previousUncommittedRowVersion =
storage.rowVersionToBinaryRow(latestVersion);
+ }
+
+ @Override
+ public @Nullable VersionChain newRow() {
+ assert operationType == OperationType.PUT ? newRow != null : newRow ==
null : "newRow=" + newRow + ", op=" + operationType;
+
+ return newRow;
+ }
+
+ @Override
+ public OperationType operationType() {
+ assert operationType != null;
+
+ return operationType;
+ }
+
+ /**
+ * Returns the result for {@link MvPartitionStorage#abortWrite(RowId)}.
+ */
+ @Nullable BinaryRow getPreviousUncommittedRowVersion() {
+ return previousUncommittedRowVersion;
+ }
+
+ /**
+ * Method to call after {@link BplusTree#invoke(Object, Object,
InvokeClosure)} has completed.
+ */
+ void afterCompletion() {
+ assert operationType == OperationType.NOOP ? toRemove == null :
toRemove != null : "toRemove=" + toRemove + ", op=" + operationType;
+
+ if (toRemove != null) {
+ storage.removeRowVersion(toRemove);
+ }
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 40a43d7544..2a68d07060 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -18,12 +18,11 @@
package org.apache.ignite.internal.storage.pagememory.mv;
import static
org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
+import static
org.apache.ignite.internal.storage.util.StorageUtils.throwStorageExceptionIfItCause;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -31,6 +30,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.ignite.configuration.NamedListView;
@@ -39,6 +40,8 @@ import org.apache.ignite.internal.pagememory.PageIdAllocator;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.datapage.DataPageReader;
import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowMapClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
@@ -68,6 +71,7 @@ import
org.apache.ignite.internal.storage.pagememory.index.sorted.SortedIndexTre
import org.apache.ignite.internal.storage.util.StorageState;
import org.apache.ignite.internal.storage.util.StorageUtils;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.CursorUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -76,6 +80,16 @@ import org.jetbrains.annotations.Nullable;
/**
* Abstract implementation of partition storage using Page Memory.
+ *
+ * <p>A few words about parallel operations with version chains:
+ * <ul>
+ * <li>All update operations (including creation) must first be
synchronized by row ID using
+ * {@link #inUpdateVersionChainLock(RowId, Supplier)};</li>
+ * <li>Reads and updates of version chains (or a single version) must be
synchronized by the {@link #versionChainTree}, for example for
+ * reading you can use {@link #findVersionChain(RowId, Function)} or
+ * {@link
AbstractPartitionTimestampCursor#createVersionChainCursorIfMissing()}, and for
updates you can use {@link InvokeClosure}
+ * for example {@link AddWriteInvokeClosure} or {@link
CommitWriteInvokeClosure}.</li>
+ * </ul>
*/
public abstract class AbstractPageMemoryMvPartitionStorage implements
MvPartitionStorage {
private static final byte[] TOMBSTONE_PAYLOAD = new byte[0];
@@ -108,6 +122,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
/** Current state of the storage. */
protected final AtomicReference<StorageState> state = new
AtomicReference<>(StorageState.RUNNABLE);
+ /** Version chain update lock by row ID. */
+ private final ConcurrentMap<RowId, LockHolder<ReentrantLock>>
updateVersionChainLockByRowId = new ConcurrentHashMap<>();
+
/**
* Constructor.
*
@@ -316,17 +333,17 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
String.format("RowId partition [%d] is not equal to
storage partition [%d].", rowId.partitionId(), partitionId));
}
- VersionChain versionChain = findVersionChain(rowId);
-
- if (versionChain == null) {
- return ReadResult.empty(rowId);
- }
+ return findVersionChain(rowId, versionChain -> {
+ if (versionChain == null) {
+ return ReadResult.empty(rowId);
+ }
- if (lookingForLatestVersion(timestamp)) {
- return findLatestRowVersion(versionChain);
- } else {
- return findRowVersionByTimestamp(versionChain, timestamp);
- }
+ if (lookingForLatestVersion(timestamp)) {
+ return findLatestRowVersion(versionChain);
+ } else {
+ return findRowVersionByTimestamp(versionChain, timestamp);
+ }
+ });
});
}
@@ -334,14 +351,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
return timestamp == HybridTimestamp.MAX_VALUE;
}
- private @Nullable VersionChain findVersionChain(RowId rowId) {
- try {
- return versionChainTree.findOne(new VersionChainKey(rowId));
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Version chain lookup failed", e);
- }
- }
-
ReadResult findLatestRowVersion(VersionChain versionChain) {
RowVersion rowVersion = readRowVersion(versionChain.headLink(),
ALWAYS_LOAD_VALUE);
@@ -375,15 +384,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
return read.result();
}
- private void throwIfChainBelongsToAnotherTx(VersionChain versionChain,
UUID txId) {
- assert versionChain.isUncommitted();
-
- if (!txId.equals(versionChain.transactionId())) {
- throw new TxIdMismatchException(txId,
versionChain.transactionId());
- }
- }
-
- private @Nullable BinaryRow rowVersionToBinaryRow(RowVersion rowVersion) {
+ @Nullable BinaryRow rowVersionToBinaryRow(RowVersion rowVersion) {
if (rowVersion.isTombstone()) {
return null;
}
@@ -497,25 +498,15 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
);
}
- private RowVersion insertRowVersion(@Nullable BinaryRow row, long
nextPartitionlessLink) {
- byte[] rowBytes = rowBytes(row);
-
- RowVersion rowVersion = new RowVersion(partitionId,
nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
-
- insertRowVersion(rowVersion);
-
- return rowVersion;
- }
-
- private void insertRowVersion(RowVersion rowVersion) {
+ void insertRowVersion(RowVersion rowVersion) {
try {
rowVersionFreeList.insertDataRow(rowVersion);
} catch (IgniteInternalCheckedException e) {
- throw new StorageException("Cannot store a row version", e);
+ throw new StorageException("Cannot store a row version: [row={},
{}]", e, rowVersion, createStorageInfo());
}
}
- private static byte[] rowBytes(@Nullable BinaryRow row) {
+ static byte[] rowBytes(@Nullable BinaryRow row) {
// TODO IGNITE-16913 Add proper way to write row bytes into array
without allocations.
return row == null ? TOMBSTONE_PAYLOAD : row.bytes();
}
@@ -528,42 +519,25 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
return busy(() -> {
throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(),
this::createStorageInfo);
- VersionChain currentChain = findVersionChain(rowId);
+ return inUpdateVersionChainLock(rowId, () -> {
+ try {
+ AddWriteInvokeClosure addWrite = new
AddWriteInvokeClosure(rowId, row, txId, commitTableId, commitPartitionId, this);
- if (currentChain == null) {
- RowVersion newVersion = insertRowVersion(row, NULL_LINK);
+ versionChainTree.invoke(new VersionChainKey(rowId), null,
addWrite);
- VersionChain versionChain =
VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId,
newVersion.link(),
- NULL_LINK);
+ addWrite.afterCompletion();
- updateVersionChain(versionChain);
+ return addWrite.getPreviousUncommittedRowVersion();
+ } catch (IgniteInternalCheckedException e) {
+ throwStorageExceptionIfItCause(e);
- return null;
- }
-
- if (currentChain.isUncommitted()) {
- throwIfChainBelongsToAnotherTx(currentChain, txId);
- }
-
- RowVersion newVersion = insertRowVersion(row,
currentChain.newestCommittedLink());
-
- BinaryRow res = null;
-
- if (currentChain.isUncommitted()) {
- RowVersion currentVersion =
readRowVersion(currentChain.headLink(), ALWAYS_LOAD_VALUE);
-
- res = rowVersionToBinaryRow(currentVersion);
-
- // as we replace an uncommitted version with new one, we need
to remove old uncommitted version
- removeRowVersion(currentVersion);
- }
-
- VersionChain chainReplacement =
VersionChain.createUncommitted(rowId, txId, commitTableId, commitPartitionId,
newVersion.link(),
- newVersion.nextLink());
-
- updateVersionChain(chainReplacement);
+ if (e.getCause() instanceof TxIdMismatchException) {
+ throw (TxIdMismatchException) e.getCause();
+ }
- return res;
+ throw new StorageException("Error while executing
addWrite: [rowId={}, {}]", e, rowId, createStorageInfo());
+ }
+ });
});
}
@@ -574,43 +548,24 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
return busy(() -> {
throwExceptionIfStorageNotInRunnableState();
- VersionChain currentVersionChain = findVersionChain(rowId);
-
- if (currentVersionChain == null ||
currentVersionChain.transactionId() == null) {
- // Row doesn't exist or the chain doesn't contain an
uncommitted write intent.
- return null;
- }
-
- RowVersion latestVersion =
readRowVersion(currentVersionChain.headLink(), ALWAYS_LOAD_VALUE);
+ return inUpdateVersionChainLock(rowId, () -> {
+ try {
+ AbortWriteInvokeClosure abortWrite = new
AbortWriteInvokeClosure(rowId, this);
- assert latestVersion.isUncommitted();
+ versionChainTree.invoke(new VersionChainKey(rowId), null,
abortWrite);
- removeRowVersion(latestVersion);
+ abortWrite.afterCompletion();
- if (latestVersion.hasNextLink()) {
- // Next can be safely replaced with any value (like 0),
because this field is only used when there
- // is some uncommitted value, but when we add an uncommitted
value, we 'fix' such placeholder value
- // (like 0) by replacing it with a valid value.
- VersionChain versionChainReplacement =
VersionChain.createCommitted(rowId, latestVersion.nextLink(), NULL_LINK);
+ return abortWrite.getPreviousUncommittedRowVersion();
+ } catch (IgniteInternalCheckedException e) {
+ throwStorageExceptionIfItCause(e);
- updateVersionChain(versionChainReplacement);
- } else {
- // it was the only version, let's remove the chain as well
- removeVersionChain(currentVersionChain);
- }
-
- return rowVersionToBinaryRow(latestVersion);
+ throw new StorageException("Error while executing
abortWrite: [rowId={}, {}]", e, rowId, createStorageInfo());
+ }
+ });
});
}
- private void removeVersionChain(VersionChain currentVersionChain) {
- try {
- versionChainTree.remove(currentVersionChain);
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Cannot remove chain version", e);
- }
- }
-
@Override
public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws
StorageException {
assert rowId.partitionId() == partitionId : rowId;
@@ -618,50 +573,25 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
busy(() -> {
throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(),
this::createStorageInfo);
- VersionChain currentVersionChain = findVersionChain(rowId);
-
- if (currentVersionChain == null ||
currentVersionChain.transactionId() == null) {
- // Row doesn't exist or the chain doesn't contain an
uncommitted write intent.
- return null;
- }
+ return inUpdateVersionChainLock(rowId, () -> {
+ try {
+ versionChainTree.invoke(new VersionChainKey(rowId), null,
new CommitWriteInvokeClosure(timestamp, this));
- long chainLink = currentVersionChain.headLink();
+ return null;
+ } catch (IgniteInternalCheckedException e) {
+ throwStorageExceptionIfItCause(e);
- try {
- rowVersionFreeList.updateTimestamp(chainLink, timestamp);
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Cannot update timestamp", e);
- }
-
- try {
- VersionChain updatedVersionChain =
VersionChain.createCommitted(
- currentVersionChain.rowId(),
- currentVersionChain.headLink(),
- currentVersionChain.nextLink()
- );
-
- versionChainTree.putx(updatedVersionChain);
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Cannot update transaction ID", e);
- }
-
- return null;
+ throw new StorageException("Error while executing
commitWrite: [rowId={}, {}]", e, rowId, createStorageInfo());
+ }
+ });
});
}
- private void removeRowVersion(RowVersion currentVersion) {
+ void removeRowVersion(RowVersion rowVersion) {
try {
- rowVersionFreeList.removeDataRowByLink(currentVersion.link());
+ rowVersionFreeList.removeDataRowByLink(rowVersion.link());
} catch (IgniteInternalCheckedException e) {
- throw new StorageException("Cannot update row version", e);
- }
- }
-
- private void updateVersionChain(VersionChain newVersionChain) {
- try {
- versionChainTree.putx(newVersionChain);
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Cannot update version chain", e);
+ throw new StorageException("Cannot remove row version: [row={},
{}]", e, rowVersion, createStorageInfo());
}
}
@@ -672,42 +602,36 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
busy(() -> {
throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(),
this::createStorageInfo);
- VersionChain currentChain = findVersionChain(rowId);
-
- if (currentChain != null && currentChain.isUncommitted()) {
- // This means that there is a bug in our code as the caller
must make sure that no write intent exists
- // below this write.
- throw new StorageException("Write intent exists for " + rowId);
- }
+ return inUpdateVersionChainLock(rowId, () -> {
+ try {
+ versionChainTree.invoke(
+ new VersionChainKey(rowId),
+ null,
+ new AddWriteCommittedInvokeClosure(rowId, row,
commitTimestamp, this)
+ );
- long nextLink = currentChain == null ? NULL_LINK :
currentChain.newestCommittedLink();
- RowVersion newVersion = insertCommittedRowVersion(row,
commitTimestamp, nextLink);
+ return null;
+ } catch (IgniteInternalCheckedException e) {
+ throwStorageExceptionIfItCause(e);
- VersionChain chainReplacement =
VersionChain.createCommitted(rowId, newVersion.link(), newVersion.nextLink());
-
- updateVersionChain(chainReplacement);
-
- return null;
+ throw new StorageException("Error while executing
addWriteCommitted: [rowId={}, {}]", e, rowId, createStorageInfo());
+ }
+ });
});
}
- private RowVersion insertCommittedRowVersion(@Nullable BinaryRow row,
HybridTimestamp commitTimestamp, long nextPartitionlessLink) {
- byte[] rowBytes = rowBytes(row);
-
- RowVersion rowVersion = new RowVersion(partitionId, commitTimestamp,
nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
-
- insertRowVersion(rowVersion);
-
- return rowVersion;
- }
-
@Override
public Cursor<ReadResult> scanVersions(RowId rowId) throws
StorageException {
return busy(() -> {
throwExceptionIfStorageNotInRunnableState();
- // TODO: IGNITE-18717 Add lock by rowId
- return new ScanVersionsCursor(rowId, this);
+ return findVersionChain(rowId, versionChain -> {
+ if (versionChain == null) {
+ return CursorUtils.emptyCursor();
+ }
+
+ return new ScanVersionsCursor(versionChain, this);
+ });
});
}
@@ -944,15 +868,58 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
}
- @Nullable VersionChain readVersionChain(RowId rowId) {
+ void throwExceptionIfStorageNotInRunnableState() {
+ StorageUtils.throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ }
+
+ /**
+ * Searches version chain by row ID and converts the found version chain
to the result if found.
+ *
+ * @param rowId Row ID.
+ * @param mapper Function for converting the version chain to a result,
function is executed under the read lock of the page on which
+ * the version chain is located. If the version chain is not found,
then {@code null} will be passed to the function.
+ */
+ <T> @Nullable T findVersionChain(RowId rowId, Function<VersionChain, T>
mapper) {
try {
- return versionChainTree.findOne(new VersionChainKey(rowId));
+ return versionChainTree.findOne(new VersionChainKey(rowId), new
TreeRowMapClosure<>() {
+ @Override
+ public T map(VersionChain treeRow) {
+ return mapper.apply(treeRow);
+ }
+ }, null);
} catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error getting version chain:
[rowId={}, {}]", e, rowId, createStorageInfo());
+ throwStorageExceptionIfItCause(e);
+
+ throw new StorageException("Row version lookup failed: [rowId={},
{}]", e, rowId, createStorageInfo());
}
}
- void throwExceptionIfStorageNotInRunnableState() {
- StorageUtils.throwExceptionIfStorageNotInRunnableState(state.get(),
this::createStorageInfo);
+ /**
+ * Organizes external synchronization of update operations for the same
version chain.
+ */
+ protected <T> T inUpdateVersionChainLock(RowId rowId, Supplier<T>
supplier) {
+ LockHolder<ReentrantLock> lockHolder =
updateVersionChainLockByRowId.compute(rowId, (rowId1, reentrantLockLockHolder)
-> {
+ if (reentrantLockLockHolder == null) {
+ reentrantLockLockHolder = new LockHolder<>(new
ReentrantLock());
+ }
+
+ reentrantLockLockHolder.incrementHolders();
+
+ return reentrantLockLockHolder;
+ });
+
+ lockHolder.getLock().lock();
+
+ try {
+ return supplier.get();
+ } finally {
+ lockHolder.getLock().unlock();
+
+ updateVersionChainLockByRowId.compute(rowId, (rowId1,
reentrantLockLockHolder) -> {
+ assert reentrantLockLockHolder != null;
+
+ return reentrantLockLockHolder.decrementHolders() ? null :
reentrantLockLockHolder;
+ });
+ }
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
index ce49626356..1968a394a6 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPartitionTimestampCursor.java
@@ -17,12 +17,11 @@
package org.apache.ignite.internal.storage.pagememory.mv;
-import java.util.HashMap;
-import java.util.Map;
+import static
org.apache.ignite.internal.storage.util.StorageUtils.throwStorageExceptionIfItCause;
+
import java.util.NoSuchElementException;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.pagememory.tree.BplusTree;
-import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowClosure;
+import org.apache.ignite.internal.pagememory.tree.BplusTree.TreeRowMapClosure;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
@@ -35,16 +34,13 @@ import org.jetbrains.annotations.Nullable;
abstract class AbstractPartitionTimestampCursor implements
PartitionTimestampCursor {
protected final AbstractPageMemoryMvPartitionStorage storage;
- private @Nullable Cursor<VersionChain> versionChainCursor;
-
- /** {@link ReadResult} obtained by caching {@link VersionChain} with
{@link BplusTree#find(Object, Object, TreeRowClosure, Object)}. */
- private final Map<RowId, ReadResult> readResultByRowId = new HashMap<>();
+ private @Nullable Cursor<ReadResult> cursor;
private boolean iterationExhausted;
private @Nullable ReadResult nextRead;
- private @Nullable VersionChain currentChain;
+ private @Nullable RowId currentRowId;
AbstractPartitionTimestampCursor(AbstractPageMemoryMvPartitionStorage
storage) {
this.storage = storage;
@@ -65,36 +61,25 @@ abstract class AbstractPartitionTimestampCursor implements
PartitionTimestampCur
createVersionChainCursorIfMissing();
- currentChain = null;
+ currentRowId = null;
while (true) {
- if (!versionChainCursor.hasNext()) {
+ if (!cursor.hasNext()) {
iterationExhausted = true;
return false;
}
- VersionChain chain = versionChainCursor.next();
-
- ReadResult result = readResultByRowId.remove(chain.rowId());
+ ReadResult result = cursor.next();
- if (result == null) {
- // TODO: IGNITE-18717 Add lock by rowId
- chain = storage.readVersionChain(chain.rowId());
-
- if (chain == null) {
- continue;
- }
-
- result = findRowVersion(chain);
- }
+ RowId rowId = result.rowId();
if (result.isEmpty() && !result.isWriteIntent()) {
continue;
}
nextRead = result;
- currentChain = chain;
+ currentRowId = rowId;
return true;
}
@@ -122,8 +107,8 @@ abstract class AbstractPartitionTimestampCursor implements
PartitionTimestampCur
@Override
public void close() {
- if (versionChainCursor != null) {
- versionChainCursor.close();
+ if (cursor != null) {
+ cursor.close();
}
}
@@ -132,18 +117,14 @@ abstract class AbstractPartitionTimestampCursor
implements PartitionTimestampCur
return storage.busy(() -> {
storage.throwExceptionIfStorageNotInRunnableState();
- if (currentChain == null) {
- throw new IllegalStateException("Version chain missing: " +
storage.createStorageInfo());
+ if (currentRowId == null) {
+ throw new IllegalStateException("RowId missing: " +
storage.createStorageInfo());
}
- // TODO: IGNITE-18717 Add lock by rowId
- VersionChain chain =
storage.readVersionChain(currentChain.rowId());
-
- if (chain == null) {
- return null;
- }
-
- ReadResult result = storage.findRowVersionByTimestamp(chain,
timestamp);
+ ReadResult result = storage.findVersionChain(currentRowId,
versionChain -> versionChain == null
+ ? ReadResult.empty(currentRowId)
+ : storage.findRowVersionByTimestamp(versionChain,
timestamp)
+ );
if (result.isEmpty()) {
return null;
@@ -163,29 +144,21 @@ abstract class AbstractPartitionTimestampCursor
implements PartitionTimestampCur
*/
abstract ReadResult findRowVersion(VersionChain versionChain);
- private void createVersionChainCursorIfMissing() {
- if (versionChainCursor != null) {
+ void createVersionChainCursorIfMissing() {
+ if (cursor != null) {
return;
}
try {
- versionChainCursor = storage.versionChainTree.find(null, null,
(tree, io, pageAddr, idx) -> {
- // Since the BplusTree cursor caches rows that are on the same
page, we should try to get actual ReadResult for them in this
- // filter so as not to get into a situation when we read the
chain and the links in it are no longer valid.
-
- VersionChain versionChain = tree.getRow(io, pageAddr, idx);
-
- // TODO: IGNITE-18717 Perhaps add lock by rowId
-
- ReadResult readResult = findRowVersion(versionChain);
-
- if (!readResult.isEmpty()) {
- readResultByRowId.put(versionChain.rowId(), readResult);
+ cursor = storage.versionChainTree.find(null, null, new
TreeRowMapClosure<>() {
+ @Override
+ public ReadResult map(VersionChain treeRow) {
+ return findRowVersion(treeRow);
}
-
- return true;
}, null);
} catch (IgniteInternalCheckedException e) {
+ throwStorageExceptionIfItCause(e);
+
throw new StorageException("Find failed: " +
storage.createStorageInfo(), e);
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
new file mode 100644
index 0000000000..f33493c6e3
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowBytes;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for
+ * {@link AbstractPageMemoryMvPartitionStorage#addWriteCommitted(RowId,
BinaryRow, HybridTimestamp)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} which will cause form
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+class AddWriteCommittedInvokeClosure implements InvokeClosure<VersionChain> {
+ private final RowId rowId;
+
+ private final @Nullable BinaryRow row;
+
+ private final HybridTimestamp commitTimestamp;
+
+ private final AbstractPageMemoryMvPartitionStorage storage;
+
+ private @Nullable VersionChain newRow;
+
+ AddWriteCommittedInvokeClosure(
+ RowId rowId,
+ @Nullable BinaryRow row,
+ HybridTimestamp commitTimestamp,
+ AbstractPageMemoryMvPartitionStorage storage
+ ) {
+ this.rowId = rowId;
+ this.row = row;
+ this.commitTimestamp = commitTimestamp;
+ this.storage = storage;
+ }
+
+ @Override
+ public void call(@Nullable VersionChain oldRow) throws
IgniteInternalCheckedException {
+ if (oldRow != null && oldRow.isUncommitted()) {
+ // This means that there is a bug in our code as the caller must
make sure that no write intent exists below this write.
+ throw new StorageException("Write intent exists: [rowId={}, {}]",
oldRow.rowId(), storage.createStorageInfo());
+ }
+
+ long nextLink = oldRow == null ? NULL_LINK :
oldRow.newestCommittedLink();
+
+ RowVersion newVersion = insertCommittedRowVersion(row,
commitTimestamp, nextLink);
+
+ newRow = VersionChain.createCommitted(rowId, newVersion.link(),
newVersion.nextLink());
+ }
+
+ @Override
+ public @Nullable VersionChain newRow() {
+ assert newRow != null;
+
+ return newRow;
+ }
+
+ @Override
+ public OperationType operationType() {
+ return OperationType.PUT;
+ }
+
+ private RowVersion insertCommittedRowVersion(@Nullable BinaryRow row,
HybridTimestamp commitTimestamp, long nextPartitionlessLink) {
+ byte[] rowBytes = rowBytes(row);
+
+ RowVersion rowVersion = new RowVersion(storage.partitionId,
commitTimestamp, nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
+
+ storage.insertRowVersion(rowVersion);
+
+ return rowVersion;
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
new file mode 100644
index 0000000000..bf66c5a1f1
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
+import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowBytes;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for {@link
AbstractPageMemoryMvPartitionStorage#addWrite(RowId, BinaryRow, UUID, UUID,
int)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} and {@link
TxIdMismatchException} which will cause form
+ * {@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+class AddWriteInvokeClosure implements InvokeClosure<VersionChain> {
+ private final RowId rowId;
+
+ private final @Nullable BinaryRow row;
+
+ private final UUID txId;
+
+ private final UUID commitTableId;
+
+ private final int commitPartitionId;
+
+ private final AbstractPageMemoryMvPartitionStorage storage;
+
+ private @Nullable VersionChain newRow;
+
+ private @Nullable BinaryRow previousUncommittedRowVersion;
+
+ private @Nullable RowVersion toRemove;
+
+ AddWriteInvokeClosure(
+ RowId rowId,
+ @Nullable BinaryRow row,
+ UUID txId,
+ UUID commitTableId,
+ int commitPartitionId,
+ AbstractPageMemoryMvPartitionStorage storage
+ ) {
+ this.rowId = rowId;
+ this.row = row;
+ this.txId = txId;
+ this.commitTableId = commitTableId;
+ this.commitPartitionId = commitPartitionId;
+ this.storage = storage;
+ }
+
+ @Override
+ public void call(@Nullable VersionChain oldRow) throws
IgniteInternalCheckedException {
+ if (oldRow == null) {
+ RowVersion newVersion = insertRowVersion(row, NULL_LINK);
+
+ newRow = VersionChain.createUncommitted(rowId, txId,
commitTableId, commitPartitionId, newVersion.link(), NULL_LINK);
+
+ return;
+ }
+
+ if (oldRow.isUncommitted()) {
+ throwIfChainBelongsToAnotherTx(oldRow);
+ }
+
+ RowVersion newVersion = insertRowVersion(row,
oldRow.newestCommittedLink());
+
+ if (oldRow.isUncommitted()) {
+ RowVersion currentVersion =
storage.readRowVersion(oldRow.headLink(), ALWAYS_LOAD_VALUE);
+
+ previousUncommittedRowVersion =
storage.rowVersionToBinaryRow(currentVersion);
+
+ // As we replace an uncommitted version with new one, we need to
remove old uncommitted version.
+ toRemove = currentVersion;
+ }
+
+ newRow = VersionChain.createUncommitted(rowId, txId, commitTableId,
commitPartitionId, newVersion.link(), newVersion.nextLink());
+ }
+
+ @Override
+ public @Nullable VersionChain newRow() {
+ assert newRow != null;
+
+ return newRow;
+ }
+
+ @Override
+ public OperationType operationType() {
+ return OperationType.PUT;
+ }
+
+ /**
+ * Returns the result for {@link MvPartitionStorage#addWrite(RowId,
BinaryRow, UUID, UUID, int)}.
+ */
+ @Nullable BinaryRow getPreviousUncommittedRowVersion() {
+ return previousUncommittedRowVersion;
+ }
+
+ private RowVersion insertRowVersion(@Nullable BinaryRow row, long
nextPartitionlessLink) {
+ byte[] rowBytes = rowBytes(row);
+
+ RowVersion rowVersion = new RowVersion(storage.partitionId,
nextPartitionlessLink, ByteBuffer.wrap(rowBytes));
+
+ storage.insertRowVersion(rowVersion);
+
+ return rowVersion;
+ }
+
+ private void throwIfChainBelongsToAnotherTx(VersionChain versionChain) {
+ assert versionChain.isUncommitted();
+
+ if (!txId.equals(versionChain.transactionId())) {
+ throw new TxIdMismatchException(txId,
versionChain.transactionId());
+ }
+ }
+
+ /**
+ * Method to call after {@link BplusTree#invoke(Object, Object,
InvokeClosure)} has completed.
+ */
+ void afterCompletion() {
+ if (toRemove != null) {
+ storage.removeRowVersion(toRemove);
+ }
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
new file mode 100644
index 0000000000..01b1be8e63
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for {@link
AbstractPageMemoryMvPartitionStorage#commitWrite(RowId, HybridTimestamp)}.
+ *
+ * <p>Synchronization between reading and updating the version chain occurs
due to the locks (read and write) of the page of the tree on
+ * which the version chain is located.
+ *
+ * <p>Synchronization between update operations for the version chain must be
external (by {@link RowId row ID}).
+ *
+ * <p>Operation may throw {@link StorageException} which will cause form
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+class CommitWriteInvokeClosure implements InvokeClosure<VersionChain> {
+ private final HybridTimestamp timestamp;
+
+ private final AbstractPageMemoryMvPartitionStorage storage;
+
+ private OperationType operationType;
+
+ private @Nullable VersionChain newRow;
+
+ private @Nullable Long updateTimestampLink;
+
+ CommitWriteInvokeClosure(HybridTimestamp timestamp,
AbstractPageMemoryMvPartitionStorage storage) {
+ this.timestamp = timestamp;
+ this.storage = storage;
+ }
+
+ @Override
+ public void call(@Nullable VersionChain oldRow) throws
IgniteInternalCheckedException {
+ if (oldRow == null || oldRow.transactionId() == null) {
+ // Row doesn't exist or the chain doesn't contain an uncommitted
write intent.
+ operationType = OperationType.NOOP;
+
+ return;
+ }
+
+ updateTimestampLink = oldRow.headLink();
+
+ operationType = OperationType.PUT;
+
+ newRow = VersionChain.createCommitted(oldRow.rowId(),
oldRow.headLink(), oldRow.nextLink());
+ }
+
+ @Override
+ public @Nullable VersionChain newRow() {
+ assert operationType == OperationType.PUT ? newRow != null : newRow ==
null : "newRow=" + newRow + ", op=" + operationType;
+
+ return newRow;
+ }
+
+ @Override
+ public OperationType operationType() {
+ assert operationType != null;
+
+ return operationType;
+ }
+
+ @Override
+ public void onUpdate() {
+ assert operationType == OperationType.PUT ? updateTimestampLink !=
null : updateTimestampLink == null :
+ "link=" + updateTimestampLink + ", op=" + operationType;
+
+ if (updateTimestampLink != null) {
+ try {
+
storage.rowVersionFreeList.updateTimestamp(updateTimestampLink, timestamp);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(
+ "Error while update timestamp: [link={}, timestamp={},
{}]",
+ e,
+ updateTimestampLink, timestamp,
storage.createStorageInfo());
+ }
+ }
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolder.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolder.java
new file mode 100644
index 0000000000..91167cf785
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolder.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Class to keep track of the number of lock holders.
+ *
+ * <p>Honest holder tracking requires an external sync, such as a {@link
ConcurrentMap}.
+ *
+ * <p>How to use:
+ * <ul>
+ * <li>Increases the count of lock holders by {@link
#incrementHolders()};</li>
+ * <li>You can {@link #getLock() get a lock} and work with it;</li>
+ * <li>After lock is no longer used, you need to call {@link
#decrementHolders()}, which will tell you if lock holders are left.</li>
+ * </ul>
+ */
+class LockHolder<T extends Lock> {
+ private final T lock;
+
+ private final AtomicInteger lockHolder = new AtomicInteger();
+
+ LockHolder(T lock) {
+ this.lock = lock;
+ }
+
+ /**
+ * Increment the count of lock holders ({@link Thread}).
+ */
+ void incrementHolders() {
+ int holders = lockHolder.incrementAndGet();
+
+ assert holders > 0 : holders;
+ }
+
+ /**
+ * Decrements the count of lock holders ({@link Thread}), returns {@code
true} if there are no more lock holders.
+ */
+ boolean decrementHolders() {
+ int holders = lockHolder.decrementAndGet();
+
+ assert holders >= 0 : holders;
+
+ return holders == 0;
+ }
+
+ /**
+ * Returns lock.
+ */
+ T getLock() {
+ return lock;
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
index 9f581feae4..93dd316713 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ScanVersionsCursor.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.storage.pagememory.mv;
-import static java.util.Collections.emptyIterator;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowVersionToResultNotFillingLastCommittedTs;
@@ -29,7 +28,6 @@ import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.util.Cursor;
-import org.jetbrains.annotations.Nullable;
/**
* Cursor reading all versions for {@link RowId}.
@@ -39,26 +37,21 @@ import org.jetbrains.annotations.Nullable;
class ScanVersionsCursor implements Cursor<ReadResult> {
private final AbstractPageMemoryMvPartitionStorage storage;
- private final @Nullable VersionChain versionChain;
+ private final VersionChain versionChain;
private final Iterator<RowVersion> rowVersionIterator;
/**
* Constructor.
*
- * @param rowId Row ID.
+ * @param versionChain Version chain.
* @param storage Multi-versioned partition storage.
* @throws StorageException If there is an error when collecting all
versions of the chain.
*/
- ScanVersionsCursor(
- RowId rowId,
- AbstractPageMemoryMvPartitionStorage storage
- ) {
+ ScanVersionsCursor(VersionChain versionChain,
AbstractPageMemoryMvPartitionStorage storage) {
this.storage = storage;
-
- versionChain = storage.readVersionChain(rowId);
-
- rowVersionIterator = versionChain == null ? emptyIterator() :
collectRowVersions();
+ this.versionChain = versionChain;
+ this.rowVersionIterator = collectRowVersions();
}
@Override
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolderTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolderTest.java
new file mode 100644
index 0000000000..f0802e7465
--- /dev/null
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolderTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.locks.Lock;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Class for testing {@link LockHolder}.
+ */
+public class LockHolderTest {
+ @Test
+ void testSimple() {
+ LockHolder<Lock> lockHolder = createLockHolder();
+
+ lockHolder.incrementHolders();
+
+ assertNotNull(lockHolder.getLock());
+
+ assertTrue(lockHolder.decrementHolders());
+ }
+
+ @Test
+ void testIncrementHoldersSameThreadTwice() {
+ LockHolder<Lock> lockHolder = createLockHolder();
+
+ lockHolder.incrementHolders();
+ lockHolder.incrementHolders();
+
+ assertNotNull(lockHolder.getLock());
+
+ assertFalse(lockHolder.decrementHolders());
+ assertTrue(lockHolder.decrementHolders());
+ }
+
+ @Test
+ void testTwoThreadSimple() {
+ LockHolder<Lock> lockHolder = createLockHolder();
+
+ lockHolder.incrementHolders();
+
+ assertNotNull(lockHolder.getLock());
+
+ assertThat(runAsync(() -> {
+ lockHolder.incrementHolders();
+
+ assertNotNull(lockHolder.getLock());
+
+ assertFalse(lockHolder.decrementHolders());
+ }), willCompleteSuccessfully());
+
+ assertNotNull(lockHolder.getLock());
+
+ assertTrue(lockHolder.decrementHolders());
+ }
+
+ private static LockHolder<Lock> createLockHolder() {
+ return new LockHolder<>(mock(Lock.class));
+ }
+}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
index b0ba265703..bdc0544e0d 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
@@ -29,7 +29,6 @@ import
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith;
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
@ExtendWith(WorkDirectoryExtension.class)
class PersistentPageMemoryMvPartitionStorageConcurrencyTest extends
AbstractMvPartitionStorageConcurrencyTest {
@InjectConfiguration("mock.checkpoint.checkpointDelayMillis = 0")
@@ -46,4 +45,40 @@ class PersistentPageMemoryMvPartitionStorageConcurrencyTest
extends AbstractMvPa
return new PersistentPageMemoryStorageEngine("test", engineConfig,
ioRegistry, workDir, null);
}
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+ @Override
+ public void testRegularGcAndRead(AddAndCommit addAndCommit) {
+ super.testRegularGcAndRead(addAndCommit);
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+ @Override
+ public void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
+ super.testTombstoneGcAndRead(addAndCommit);
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+ @Override
+ public void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
+ super.testTombstoneGcAndAddWrite(addAndCommit);
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+ @Override
+ public void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
+ super.testTombstoneGcAndCommitWrite(addAndCommit);
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+ @Override
+ public void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
+ super.testTombstoneGcAndAbortWrite(addAndCommit);
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+ @Override
+ public void testConcurrentGc(AddAndCommit addAndCommit) {
+ super.testConcurrentGc(addAndCommit);
+ }
}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
index 22f2efcf32..4645851483 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
@@ -28,7 +28,6 @@ import
org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith;
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
@ExtendWith(WorkDirectoryExtension.class)
class VolatilePageMemoryMvPartitionStorageConcurrencyTest extends
AbstractMvPartitionStorageConcurrencyTest {
@InjectConfiguration
@@ -42,4 +41,40 @@ class VolatilePageMemoryMvPartitionStorageConcurrencyTest
extends AbstractMvPart
return new VolatilePageMemoryStorageEngine("node", engineConfig,
ioRegistry, PageEvictionTrackerNoOp.INSTANCE);
}
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+ @Override
+ public void testRegularGcAndRead(AddAndCommit addAndCommit) {
+ super.testRegularGcAndRead(addAndCommit);
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+ @Override
+ public void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
+ super.testTombstoneGcAndRead(addAndCommit);
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+ @Override
+ public void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
+ super.testTombstoneGcAndAddWrite(addAndCommit);
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+ @Override
+ public void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
+ super.testTombstoneGcAndCommitWrite(addAndCommit);
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+ @Override
+ public void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
+ super.testTombstoneGcAndAbortWrite(addAndCommit);
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+ @Override
+ public void testConcurrentGc(AddAndCommit addAndCommit) {
+ super.testConcurrentGc(addAndCommit);
+ }
}