This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 24de97894c IGNITE-18843 Fix the behavior of
MvPartitionStorage#pollForVacuum when trying to delete the same row in parallel
(#1697)
24de97894c is described below
commit 24de97894c35e9bb1a0d63e152b61ab24fb0182a
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Feb 23 10:40:52 2023 +0300
IGNITE-18843 Fix the behavior of MvPartitionStorage#pollForVacuum when
trying to delete the same row in parallel (#1697)
---
.../matchers/CompletableFutureMatcher.java | 20 +++
.../ignite/internal/storage/util}/LockHolder.java | 2 +-
.../storage/util/ReentrantLockByRowId.java | 177 +++++++++++++++++++++
.../internal/storage/util}/LockHolderTest.java | 3 +-
.../storage/util/ReentrantLockByRowIdTest.java | 159 ++++++++++++++++++
.../AbstractMvPartitionStorageConcurrencyTest.java | 31 +++-
.../storage/impl/TestMvPartitionStorage.java | 13 +-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 68 +++-----
.../mv/PersistentPageMemoryMvPartitionStorage.java | 2 +
.../mv/VolatilePageMemoryMvPartitionStorage.java | 6 +-
.../internal/storage/rocksdb/GarbageCollector.java | 90 ++++++++---
.../internal/storage/rocksdb/GcRowVersion.java | 76 +++++++++
.../storage/rocksdb/PartitionDataHelper.java | 3 +
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 18 ++-
14 files changed, 592 insertions(+), 76 deletions(-)
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
index 18bb7661cd..3c9333e18c 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
@@ -174,6 +174,26 @@ public class CompletableFutureMatcher<T> extends
TypeSafeMatcher<CompletableFutu
return new CompletableFutureMatcher<>(anything(), time, timeUnit,
cause);
}
+ /**
+ * Creates a matcher that matches a future that <strong>not</strong>
completes decently fast.
+ *
+ * @return matcher.
+ */
+ public static CompletableFutureMatcher<Object> willTimeoutFast() {
+ return willTimeoutIn(250, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Creates a matcher that matches a future that <strong>not</strong>
completes within the given timeout.
+ *
+ * @param time Timeout.
+ * @param timeUnit Time unit for timeout.
+ * @return matcher.
+ */
+ public static CompletableFutureMatcher<Object> willTimeoutIn(int time,
TimeUnit timeUnit) {
+ return new CompletableFutureMatcher<>(anything(), time, timeUnit,
TimeoutException.class);
+ }
+
/**
* Creates a matcher that matches a future that will be cancelled and
decently fast.
*
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolder.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockHolder.java
similarity index 97%
rename from
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolder.java
rename to
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockHolder.java
index 91167cf785..6c121cb64d 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolder.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockHolder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.pagememory.mv;
+package org.apache.ignite.internal.storage.util;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/ReentrantLockByRowId.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/ReentrantLockByRowId.java
new file mode 100644
index 0000000000..e2a5eb4187
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/ReentrantLockByRowId.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.util;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.storage.RowId;
+
+/**
+ * {@link ReentrantLock} by row ID.
+ *
+ * <p>Allows synchronization of version chain update operations.
+ */
+public class ReentrantLockByRowId {
+ private final ThreadLocal<Object> lockedRowIds = new ThreadLocal<>();
+
+ private final ConcurrentMap<RowId, LockHolder<ReentrantLock>>
lockHolderByRowId = new ConcurrentHashMap<>();
+
+ /**
+ * Executes the supplier under lock by row ID.
+ *
+ * @param <T> Return type.
+ * @param rowId Row ID.
+ * @param supplier Supplier to execute under the lock.
+ * @return Value.
+ */
+ public <T> T inLock(RowId rowId, Supplier<T> supplier) {
+ acquireLock0(rowId);
+
+ try {
+ return supplier.get();
+ } finally {
+ releaseLock0(rowId, false);
+ }
+ }
+
+ /**
+ * Acquires the lock by row ID.
+ *
+ * @param rowId Row ID.
+ */
+ public void acquireLock(RowId rowId) {
+ acquireLock0(rowId);
+
+ Object lockedRowIds = this.lockedRowIds.get();
+
+ if (lockedRowIds == null) {
+ this.lockedRowIds.set(rowId);
+ } else if (lockedRowIds instanceof RowId) {
+ RowId lockedRowId = (RowId) lockedRowIds;
+
+ if (!lockedRowId.equals(rowId)) {
+ Set<RowId> rowIds = new HashSet<>();
+
+ rowIds.add(lockedRowId);
+ rowIds.add(rowId);
+
+ this.lockedRowIds.set(rowIds);
+ }
+ } else {
+ ((Set<RowId>) lockedRowIds).add(rowId);
+ }
+ }
+
+ /**
+ * Releases the lock by row ID.
+ *
+ * @param rowId Row ID.
+ * @throws IllegalStateException If the lock could not be found by row ID.
+ * @throws IllegalMonitorStateException If the current thread does not
hold this lock.
+ */
+ public void releaseLock(RowId rowId) {
+ releaseLock0(rowId, false);
+
+ LockHolder<ReentrantLock> lockHolder = lockHolderByRowId.get(rowId);
+
+ if (lockHolder != null &&
lockHolder.getLock().isHeldByCurrentThread()) {
+ return;
+ }
+
+ Object lockedRowIds = this.lockedRowIds.get();
+
+ assert lockedRowIds != null : rowId;
+
+ if (lockedRowIds instanceof RowId) {
+ RowId lockedRowId = (RowId) lockedRowIds;
+
+ assert lockedRowId.equals(rowId) : "rowId=" + rowId + ",
lockedRowId=" + lockedRowId;
+
+ this.lockedRowIds.set(null);
+ } else {
+ Set<RowId> rowIds = ((Set<RowId>) lockedRowIds);
+
+ boolean remove = rowIds.remove(rowId);
+
+ assert remove : "rowId=" + rowId + ", lockedRowIds=" + rowIds;
+
+ if (rowIds.isEmpty()) {
+ this.lockedRowIds.set(null);
+ }
+ }
+ }
+
+ /**
+ * Releases all locks {@link #acquireLock(RowId) acquired} by the current
thread if exists.
+ *
+ * <p>Order of releasing the locks is not defined, each lock will be
released with all re-entries.
+ */
+ public void releaseAllLockByCurrentThread() {
+ Object lockedRowIds = this.lockedRowIds.get();
+
+ this.lockedRowIds.set(null);
+
+ if (lockedRowIds == null) {
+ return;
+ } else if (lockedRowIds instanceof RowId) {
+ releaseLock0(((RowId) lockedRowIds), true);
+ } else {
+ for (RowId rowId : ((Set<RowId>) lockedRowIds)) {
+ releaseLock0(rowId, true);
+ }
+ }
+ }
+
+ private void acquireLock0(RowId rowId) {
+ LockHolder<ReentrantLock> lockHolder =
lockHolderByRowId.compute(rowId, (rowId1, reentrantLockLockHolder) -> {
+ if (reentrantLockLockHolder == null) {
+ reentrantLockLockHolder = new LockHolder<>(new
ReentrantLock());
+ }
+
+ reentrantLockLockHolder.incrementHolders();
+
+ return reentrantLockLockHolder;
+ });
+
+ lockHolder.getLock().lock();
+ }
+
+ private void releaseLock0(RowId rowId, boolean allReentries) {
+ LockHolder<ReentrantLock> lockHolder = lockHolderByRowId.get(rowId);
+
+ if (lockHolder == null) {
+ throw new IllegalStateException("Could not find lock by row ID: "
+ rowId);
+ }
+
+ ReentrantLock lock = lockHolder.getLock();
+
+ do {
+ lock.unlock();
+
+ lockHolderByRowId.compute(rowId, (rowId1, reentrantLockLockHolder)
-> {
+ assert reentrantLockLockHolder != null;
+
+ return reentrantLockLockHolder.decrementHolders() ? null :
reentrantLockLockHolder;
+ });
+ } while (allReentries && lock.isHeldByCurrentThread());
+ }
+}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolderTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/LockHolderTest.java
similarity index 96%
rename from
modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolderTest.java
rename to
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/LockHolderTest.java
index f0802e7465..73782a5e5d 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolderTest.java
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/LockHolderTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.pagememory.mv;
+package org.apache.ignite.internal.storage.util;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import java.util.concurrent.locks.Lock;
+import org.apache.ignite.internal.storage.util.LockHolder;
import org.junit.jupiter.api.Test;
/**
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/ReentrantLockByRowIdTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/ReentrantLockByRowIdTest.java
new file mode 100644
index 0000000000..6749372b52
--- /dev/null
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/ReentrantLockByRowIdTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.util;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willTimeoutFast;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.storage.RowId;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Class for testing {@link ReentrantLockByRowId}.
+ */
+public class ReentrantLockByRowIdTest {
+ private ReentrantLockByRowId lockByRowId;
+
+ @BeforeEach
+ void setUp() {
+ lockByRowId = new ReentrantLockByRowId();
+ }
+
+ @AfterEach
+ void tearDown() {
+ lockByRowId.releaseAllLockByCurrentThread();
+ }
+
+ @Test
+ void testSimple() {
+ RowId rowId = new RowId(0);
+
+ lockByRowId.acquireLock(rowId);
+ lockByRowId.releaseLock(rowId);
+
+ assertEquals(1, lockByRowId.inLock(rowId, () -> 1));
+ }
+
+ @Test
+ void testSimpleReEnter() {
+ RowId rowId = new RowId(0);
+
+ lockByRowId.acquireLock(rowId);
+ lockByRowId.acquireLock(rowId);
+
+ lockByRowId.inLock(rowId, () -> {
+ lockByRowId.acquireLock(rowId);
+
+ lockByRowId.releaseLock(rowId);
+
+ return null;
+ });
+
+ lockByRowId.releaseLock(rowId);
+ lockByRowId.releaseLock(rowId);
+ }
+
+ @Test
+ void testReleaseError() {
+ assertThrows(IllegalStateException.class, () ->
lockByRowId.releaseLock(new RowId(0)));
+
+ RowId rowId = new RowId(0);
+
+ assertThat(runAsync(() -> lockByRowId.acquireLock(rowId)),
willCompleteSuccessfully());
+
+ assertThrows(IllegalMonitorStateException.class, () ->
lockByRowId.releaseLock(rowId));
+ }
+
+ @Test
+ void testBlockSimple() {
+ RowId rowId = new RowId(0);
+
+ lockByRowId.acquireLock(rowId);
+ lockByRowId.acquireLock(rowId);
+
+ CompletableFuture<?> acquireLockFuture = runAsync(() -> {
+ lockByRowId.acquireLock(rowId);
+ lockByRowId.releaseLock(rowId);
+ });
+
+ assertThat(acquireLockFuture, willTimeoutFast());
+
+ lockByRowId.releaseLock(rowId);
+
+ assertThat(acquireLockFuture, willTimeoutFast());
+
+ lockByRowId.releaseLock(rowId);
+
+ assertThat(acquireLockFuture, willCompleteSuccessfully());
+
+ lockByRowId.acquireLock(rowId);
+ }
+
+ @Test
+ void testBlockSupplier() {
+ RowId rowId = new RowId(0);
+
+ lockByRowId.acquireLock(rowId);
+ lockByRowId.acquireLock(rowId);
+
+ CompletableFuture<?> acquireLockFuture = runAsync(() ->
lockByRowId.inLock(rowId, () -> 1));
+
+ assertThat(acquireLockFuture, willTimeoutFast());
+
+ lockByRowId.releaseLock(rowId);
+
+ assertThat(acquireLockFuture, willTimeoutFast());
+
+ lockByRowId.releaseLock(rowId);
+
+ assertThat(acquireLockFuture, willCompleteSuccessfully());
+
+ lockByRowId.acquireLock(rowId);
+ }
+
+ @Test
+ void testReleaseAllLocksByCurrentThread() {
+ RowId rowId0 = new RowId(0);
+ RowId rowId1 = new RowId(0);
+
+ lockByRowId.acquireLock(rowId0);
+
+ lockByRowId.acquireLock(rowId1);
+ lockByRowId.acquireLock(rowId1);
+
+ CompletableFuture<?> acquireLockFuture = runAsync(() -> {
+ lockByRowId.acquireLock(rowId0);
+ lockByRowId.acquireLock(rowId1);
+
+ lockByRowId.releaseAllLockByCurrentThread();
+ });
+
+ assertThat(acquireLockFuture, willTimeoutFast());
+
+ lockByRowId.releaseAllLockByCurrentThread();
+
+ assertThat(acquireLockFuture, willCompleteSuccessfully());
+ }
+}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index 69e031276c..dd323a441a 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -17,9 +17,18 @@
package org.apache.ignite.internal.storage;
+import static java.util.stream.Collectors.toCollection;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.jetbrains.annotations.Nullable;
@@ -193,19 +202,33 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
for (int i = 0; i < REPEATS; i++) {
addAndCommit.perform(this, TABLE_ROW);
+ addAndCommit.perform(this, TABLE_ROW2);
+
addAndCommit.perform(this, null);
+ Collection<ByteBuffer> rows = Stream.of(TABLE_ROW, TABLE_ROW2)
+ .map(BinaryRow::byteBuffer)
+ .collect(toCollection(ConcurrentLinkedQueue::new));
+
runRace(
- () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
- () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
- () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
- () -> pollForVacuum(HybridTimestamp.MAX_VALUE)
+ () ->
assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow().byteBuffer(),
rows),
+ () ->
assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow().byteBuffer(),
rows)
);
+ assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
+
assertNull(storage.closestRowId(ROW_ID));
+
+ assertThat(rows, empty());
}
}
+ private void assertRemoveRow(ByteBuffer rowBytes, Collection<ByteBuffer>
rows) {
+ assertNotNull(rowBytes);
+
+ assertTrue(rows.remove(rowBytes), rowBytes.toString());
+ }
+
@SuppressWarnings("ResultOfMethodCallIgnored")
private void scanFirstEntry(HybridTimestamp firstCommitTs) {
try (var cursor = scan(firstCommitTs)) {
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index f1dbbb7478..be754ecacd 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -39,6 +39,7 @@ import
org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.storage.util.ReentrantLockByRowId;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -65,6 +66,8 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
private volatile boolean rebalance;
+ final ReentrantLockByRowId lockByRowId = new ReentrantLockByRowId();
+
public TestMvPartitionStorage(int partitionId) {
this.partitionId = partitionId;
}
@@ -108,7 +111,11 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
public <V> V runConsistently(WriteClosure<V> closure) throws
StorageException {
checkStorageClosed();
- return closure.execute();
+ try {
+ return closure.execute();
+ } finally {
+ lockByRowId.releaseAllLockByCurrentThread();
+ }
}
@Override
@@ -497,6 +504,10 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
RowId rowId = dequeuedVersionChain.rowId;
+ // We must release the lock after executing WriteClosure#execute in
MvPartitionStorage#runConsistently so that the indexes can be
+ // deleted consistently.
+ lockByRowId.acquireLock(rowId);
+
VersionChain versionChainToRemove = dequeuedVersionChain.next;
assert versionChainToRemove != null;
assert versionChainToRemove.next == null;
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 2de9261ba0..5516a5f127 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -71,6 +70,7 @@ import
org.apache.ignite.internal.storage.pagememory.index.sorted.SortedIndexTre
import
org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter;
import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
import org.apache.ignite.internal.storage.pagememory.mv.gc.GcRowVersion;
+import org.apache.ignite.internal.storage.util.ReentrantLockByRowId;
import org.apache.ignite.internal.storage.util.StorageState;
import org.apache.ignite.internal.storage.util.StorageUtils;
import org.apache.ignite.internal.util.Cursor;
@@ -130,7 +130,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
protected final AtomicReference<StorageState> state = new
AtomicReference<>(StorageState.RUNNABLE);
/** Version chain update lock by row ID. */
- private final ConcurrentMap<RowId, LockHolder<ReentrantLock>>
updateVersionChainLockByRowId = new ConcurrentHashMap<>();
+ protected final ReentrantLockByRowId updateVersionChainLockByRowId = new
ReentrantLockByRowId();
/**
* Constructor.
@@ -941,29 +941,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
* Organizes external synchronization of update operations for the same
version chain.
*/
protected <T> T inUpdateVersionChainLock(RowId rowId, Supplier<T>
supplier) {
- LockHolder<ReentrantLock> lockHolder =
updateVersionChainLockByRowId.compute(rowId, (rowId1, reentrantLockLockHolder)
-> {
- if (reentrantLockLockHolder == null) {
- reentrantLockLockHolder = new LockHolder<>(new
ReentrantLock());
- }
-
- reentrantLockLockHolder.incrementHolders();
-
- return reentrantLockLockHolder;
- });
-
- lockHolder.getLock().lock();
-
- try {
- return supplier.get();
- } finally {
- lockHolder.getLock().unlock();
-
- updateVersionChainLockByRowId.compute(rowId, (rowId1,
reentrantLockLockHolder) -> {
- assert reentrantLockLockHolder != null;
-
- return reentrantLockLockHolder.decrementHolders() ? null :
reentrantLockLockHolder;
- });
- }
+ return updateVersionChainLockByRowId.inLock(rowId, supplier);
}
@Override
@@ -971,33 +949,39 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
return busy(() -> {
throwExceptionIfStorageNotInRunnableState();
- GcRowVersion first = gcQueue.getFirst();
+ while (true) {
+ // TODO: IGNITE-18867 Get and delete in one call
+ GcRowVersion head = gcQueue.getFirst();
- // Garbage collection queue is empty.
- if (first == null) {
- return null;
- }
+ // Garbage collection queue is empty.
+ if (head == null) {
+ return null;
+ }
- HybridTimestamp rowTimestamp = first.getTimestamp();
+ HybridTimestamp rowTimestamp = head.getTimestamp();
- // There are no versions in the garbage collection queue before
watermark.
- if (rowTimestamp.compareTo(lowWatermark) > 0) {
- return null;
- }
+ // There are no versions in the garbage collection queue
before watermark.
+ if (rowTimestamp.compareTo(lowWatermark) > 0) {
+ return null;
+ }
- RowId rowId = first.getRowId();
+ RowId rowId = head.getRowId();
+
+ // If no one has processed the head of the gc queue in
parallel, then we must release the lock after executing
+ // WriteClosure#execute in MvPartitionStorage#runConsistently
so that the indexes can be deleted consistently.
+ updateVersionChainLockByRowId.acquireLock(rowId);
- return inUpdateVersionChainLock(rowId, () -> {
// Someone processed the element in parallel.
- // TODO: IGNITE-18843 Should try to get the RowVersion again
- if (!gcQueue.remove(rowId, rowTimestamp, first.getLink())) {
- return null;
+ if (!gcQueue.remove(rowId, rowTimestamp, head.getLink())) {
+ updateVersionChainLockByRowId.releaseLock(rowId);
+
+ continue;
}
- RowVersion removedRowVersion = removeWriteOnGc(rowId,
rowTimestamp, first.getLink());
+ RowVersion removedRowVersion = removeWriteOnGc(rowId,
rowTimestamp, head.getLink());
return new
BinaryRowAndRowId(rowVersionToBinaryRow(removedRowVersion), rowId);
- });
+ }
});
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 74f5fee5f9..181a0b0a70 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -145,6 +145,8 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
try {
return closure.execute();
} finally {
+ updateVersionChainLockByRowId.releaseAllLockByCurrentThread();
+
checkpointTimeoutLock.checkpointReadUnlock();
}
});
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index 78de1781d0..318e1afffb 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -99,7 +99,11 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
return busy(() -> {
throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(),
this::createStorageInfo);
- return closure.execute();
+ try {
+ return closure.execute();
+ } finally {
+ updateVersionChainLockByRowId.releaseAllLockByCurrentThread();
+ }
});
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
index 2fd34ea654..e5bb63e3a8 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -173,26 +173,51 @@ class GarbageCollector {
return null;
}
- ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
- gcKeyBuffer.clear();
+ ByteBuffer gcKeyBuffer = readGcKey(gcIt);
- gcIt.key(gcKeyBuffer);
+ GcRowVersion gcRowVersion = toGcRowVersion(gcKeyBuffer);
- HybridTimestamp gcElementTimestamp =
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+ while (true) {
+ if (gcRowVersion.getRowTimestamp().compareTo(lowWatermark) >
0) {
+ // No elements to garbage collect.
+ return null;
+ }
- if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
- // No elements to garbage collect.
- return null;
- }
+ // If no one has processed the head of the gc queue in
parallel, then we must release the lock after write batch in
+ // WriteClosure#execute of MvPartitionStorage#runConsistently
so that the indexes can be deleted consistently.
+ helper.lockByRowId.acquireLock(gcRowVersion.getRowId());
+
+ // We must refresh the iterator to try to read the head of the
gc queue again and if someone deleted it in parallel,
+ // then read the new head of the queue.
+ refreshGcIterator(gcIt, gcKeyBuffer);
+
+ if (invalid(gcIt)) {
+ // GC queue is empty.
+ return null;
+ }
+
+ gcKeyBuffer = readGcKey(gcIt);
+
+ GcRowVersion oldGcRowVersion = gcRowVersion;
+
+ gcRowVersion = toGcRowVersion(gcKeyBuffer);
+
+ // Someone has processed the element in parallel, so we need
to take a new head of the queue.
+ if (!gcRowVersion.equals(oldGcRowVersion)) {
+ helper.lockByRowId.releaseLock(gcRowVersion.getRowId());
+
+ continue;
+ }
- RowId gcElementRowId = helper.getRowId(gcKeyBuffer,
GC_KEY_ROW_ID_OFFSET);
+ break;
+ }
// Delete element from the GC queue.
batch.delete(gcQueueCf, gcKeyBuffer);
try (RocksIterator it = db.newIterator(partCf,
helper.upperBoundReadOpts)) {
// Process the element in data cf that triggered the addition
to the GC queue.
- boolean proceed = checkHasNewerRowAndRemoveTombstone(it,
batch, gcElementRowId, gcElementTimestamp);
+ boolean proceed = checkHasNewerRowAndRemoveTombstone(it,
batch, gcRowVersion);
if (!proceed) {
// No further processing required.
@@ -200,9 +225,8 @@ class GarbageCollector {
}
// Find the row that should be garbage collected.
- ByteBuffer dataKey = getRowForGcKey(it, gcElementRowId);
+ ByteBuffer dataKey = getRowForGcKey(it,
gcRowVersion.getRowId());
- // TODO: IGNITE-18843 Should try to get the RowVersion again
if (dataKey == null) {
// No row for GC.
return null;
@@ -212,7 +236,7 @@ class GarbageCollector {
byte[] valueBytes = it.value();
var row = new
ByteBufferRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
- BinaryRowAndRowId retVal = new BinaryRowAndRowId(row,
gcElementRowId);
+ BinaryRowAndRowId retVal = new BinaryRowAndRowId(row,
gcRowVersion.getRowId());
// Delete the row from the data cf.
batch.delete(partCf, dataKey);
@@ -231,18 +255,21 @@ class GarbageCollector {
*
* @param it RocksDB data column family iterator.
* @param batch Write batch.
- * @param gcElementRowId Row id of the element from the GC queue/
+ * @param gcRowVersion Row version from the GC queue.
* @return {@code true} if further processing by garbage collector is
needed.
*/
- private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it,
WriteBatchWithIndex batch, RowId gcElementRowId,
- HybridTimestamp gcElementTimestamp) throws RocksDBException {
+ private boolean checkHasNewerRowAndRemoveTombstone(
+ RocksIterator it,
+ WriteBatchWithIndex batch,
+ GcRowVersion gcRowVersion
+ ) throws RocksDBException {
ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
dataKeyBuffer.clear();
ColumnFamilyHandle partCf = helper.partCf;
// Set up the data key.
- helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+ helper.putDataKey(dataKeyBuffer, gcRowVersion.getRowId(),
gcRowVersion.getRowTimestamp());
// Seek to the row id and timestamp from the GC queue.
// Note that it doesn't mean that the element in this iterator has
matching row id or even partition id.
@@ -256,7 +283,7 @@ class GarbageCollector {
it.key(dataKeyBuffer);
- if (!helper.getRowId(dataKeyBuffer,
ROW_ID_OFFSET).equals(gcElementRowId)) {
+ if (!helper.getRowId(dataKeyBuffer,
ROW_ID_OFFSET).equals(gcRowVersion.getRowId())) {
// There is no row for the GC queue element.
return false;
}
@@ -318,4 +345,31 @@ class GarbageCollector {
void deleteQueue(WriteBatch writeBatch) throws RocksDBException {
writeBatch.deleteRange(gcQueueCf, helper.partitionStartPrefix(),
helper.partitionEndPrefix());
}
+
+ private ByteBuffer readGcKey(RocksIterator gcIt) {
+ ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+ gcKeyBuffer.clear();
+
+ gcIt.key(gcKeyBuffer);
+
+ return gcKeyBuffer;
+ }
+
+ private GcRowVersion toGcRowVersion(ByteBuffer gcKeyBuffer) {
+ return new GcRowVersion(
+ helper.getRowId(gcKeyBuffer, GC_KEY_ROW_ID_OFFSET),
+ readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET)
+ );
+ }
+
+ private void refreshGcIterator(RocksIterator gcIt, ByteBuffer gcKeyBuffer)
throws RocksDBException {
+ gcIt.refresh();
+
+ gcIt.seekForPrev(gcKeyBuffer);
+
+ // Row version was removed from the gc queue by someone, back to the
head of gc queue.
+ if (invalid(gcIt)) {
+ gcIt.seek(helper.partitionStartPrefix());
+ }
+ }
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GcRowVersion.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GcRowVersion.java
new file mode 100644
index 0000000000..4a5d865352
--- /dev/null
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GcRowVersion.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Row versions for garbage collection.
+ */
+final class GcRowVersion {
+ @IgniteToStringInclude
+ private final RowId rowId;
+
+ @IgniteToStringInclude
+ private final HybridTimestamp rowTimestamp;
+
+ GcRowVersion(RowId rowId, HybridTimestamp rowTimestamp) {
+ this.rowId = rowId;
+ this.rowTimestamp = rowTimestamp;
+ }
+
+ RowId getRowId() {
+ return rowId;
+ }
+
+ HybridTimestamp getRowTimestamp() {
+ return rowTimestamp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ GcRowVersion that = (GcRowVersion) o;
+
+ return rowId.equals(that.rowId) &&
rowTimestamp.equals(that.rowTimestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = rowId.hashCode();
+
+ result = 31 * result + rowTimestamp.hashCode();
+
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(GcRowVersion.class, this);
+ }
+}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
index db0130adf2..9551f5e186 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.util.ReentrantLockByRowId;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.Slice;
@@ -92,6 +93,8 @@ class PartitionDataHelper implements ManuallyCloseable {
/** Read options for total order scans. */
final ReadOptions scanReadOpts;
+ final ReentrantLockByRowId lockByRowId = new ReentrantLockByRowId();
+
PartitionDataHelper(int partitionId, ColumnFamilyHandle partCf) {
this.partitionId = partitionId;
this.partCf = partCf;
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 151b165fc1..31278c1a90 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -223,21 +223,23 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
pendingAppliedTerm = lastAppliedTerm;
pendingGroupConfig = lastGroupConfig;
- V res = closure.execute();
-
try {
+ V res = closure.execute();
+
if (writeBatch.count() > 0) {
db.write(writeOpts, writeBatch);
}
+
+ lastAppliedIndex = pendingAppliedIndex;
+ lastAppliedTerm = pendingAppliedTerm;
+ lastGroupConfig = pendingGroupConfig;
+
+ return res;
} catch (RocksDBException e) {
throw new StorageException("Unable to apply a write
batch to RocksDB instance.", e);
+ } finally {
+ helper.lockByRowId.releaseAllLockByCurrentThread();
}
-
- lastAppliedIndex = pendingAppliedIndex;
- lastAppliedTerm = pendingAppliedTerm;
- lastGroupConfig = pendingGroupConfig;
-
- return res;
} finally {
threadLocalWriteBatch.set(null);
}