This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 24de97894c IGNITE-18843 Fix the behavior of 
MvPartitionStorage#pollForVacuum when trying to delete the same row in parallel 
(#1697)
24de97894c is described below

commit 24de97894c35e9bb1a0d63e152b61ab24fb0182a
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Feb 23 10:40:52 2023 +0300

    IGNITE-18843 Fix the behavior of MvPartitionStorage#pollForVacuum when 
trying to delete the same row in parallel (#1697)
---
 .../matchers/CompletableFutureMatcher.java         |  20 +++
 .../ignite/internal/storage/util}/LockHolder.java  |   2 +-
 .../storage/util/ReentrantLockByRowId.java         | 177 +++++++++++++++++++++
 .../internal/storage/util}/LockHolderTest.java     |   3 +-
 .../storage/util/ReentrantLockByRowIdTest.java     | 159 ++++++++++++++++++
 .../AbstractMvPartitionStorageConcurrencyTest.java |  31 +++-
 .../storage/impl/TestMvPartitionStorage.java       |  13 +-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   |  68 +++-----
 .../mv/PersistentPageMemoryMvPartitionStorage.java |   2 +
 .../mv/VolatilePageMemoryMvPartitionStorage.java   |   6 +-
 .../internal/storage/rocksdb/GarbageCollector.java |  90 ++++++++---
 .../internal/storage/rocksdb/GcRowVersion.java     |  76 +++++++++
 .../storage/rocksdb/PartitionDataHelper.java       |   3 +
 .../storage/rocksdb/RocksDbMvPartitionStorage.java |  18 ++-
 14 files changed, 592 insertions(+), 76 deletions(-)

diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
index 18bb7661cd..3c9333e18c 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
@@ -174,6 +174,26 @@ public class CompletableFutureMatcher<T> extends 
TypeSafeMatcher<CompletableFutu
         return new CompletableFutureMatcher<>(anything(), time, timeUnit, 
cause);
     }
 
+    /**
+     * Creates a matcher that matches a future that <strong>not</strong> 
completes decently fast.
+     *
+     * @return matcher.
+     */
+    public static CompletableFutureMatcher<Object> willTimeoutFast() {
+        return willTimeoutIn(250, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Creates a matcher that matches a future that <strong>not</strong> 
completes within the given timeout.
+     *
+     * @param time Timeout.
+     * @param timeUnit Time unit for timeout.
+     * @return matcher.
+     */
+    public static CompletableFutureMatcher<Object> willTimeoutIn(int time, 
TimeUnit timeUnit) {
+        return new CompletableFutureMatcher<>(anything(), time, timeUnit, 
TimeoutException.class);
+    }
+
     /**
      * Creates a matcher that matches a future that will be cancelled and 
decently fast.
      *
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolder.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockHolder.java
similarity index 97%
rename from 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolder.java
rename to 
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockHolder.java
index 91167cf785..6c121cb64d 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolder.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/LockHolder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.storage.pagememory.mv;
+package org.apache.ignite.internal.storage.util;
 
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/ReentrantLockByRowId.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/ReentrantLockByRowId.java
new file mode 100644
index 0000000000..e2a5eb4187
--- /dev/null
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/ReentrantLockByRowId.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.util;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.storage.RowId;
+
+/**
+ * {@link ReentrantLock} by row ID.
+ *
+ * <p>Allows synchronization of version chain update operations.
+ */
+public class ReentrantLockByRowId {
+    private final ThreadLocal<Object> lockedRowIds = new ThreadLocal<>();
+
+    private final ConcurrentMap<RowId, LockHolder<ReentrantLock>> 
lockHolderByRowId = new ConcurrentHashMap<>();
+
+    /**
+     * Executes the supplier under lock by row ID.
+     *
+     * @param <T> Return type.
+     * @param rowId Row ID.
+     * @param supplier Supplier to execute under the lock.
+     * @return Value.
+     */
+    public <T> T inLock(RowId rowId, Supplier<T> supplier) {
+        acquireLock0(rowId);
+
+        try {
+            return supplier.get();
+        } finally {
+            releaseLock0(rowId, false);
+        }
+    }
+
+    /**
+     * Acquires the lock by row ID.
+     *
+     * @param rowId Row ID.
+     */
+    public void acquireLock(RowId rowId) {
+        acquireLock0(rowId);
+
+        Object lockedRowIds = this.lockedRowIds.get();
+
+        if (lockedRowIds == null) {
+            this.lockedRowIds.set(rowId);
+        } else if (lockedRowIds instanceof RowId) {
+            RowId lockedRowId = (RowId) lockedRowIds;
+
+            if (!lockedRowId.equals(rowId)) {
+                Set<RowId> rowIds = new HashSet<>();
+
+                rowIds.add(lockedRowId);
+                rowIds.add(rowId);
+
+                this.lockedRowIds.set(rowIds);
+            }
+        } else {
+            ((Set<RowId>) lockedRowIds).add(rowId);
+        }
+    }
+
+    /**
+     * Releases the lock by row ID.
+     *
+     * @param rowId Row ID.
+     * @throws IllegalStateException If the lock could not be found by row ID.
+     * @throws IllegalMonitorStateException If the current thread does not 
hold this lock.
+     */
+    public void releaseLock(RowId rowId) {
+        releaseLock0(rowId, false);
+
+        LockHolder<ReentrantLock> lockHolder = lockHolderByRowId.get(rowId);
+
+        if (lockHolder != null && 
lockHolder.getLock().isHeldByCurrentThread()) {
+            return;
+        }
+
+        Object lockedRowIds = this.lockedRowIds.get();
+
+        assert lockedRowIds != null : rowId;
+
+        if (lockedRowIds instanceof RowId) {
+            RowId lockedRowId = (RowId) lockedRowIds;
+
+            assert lockedRowId.equals(rowId) : "rowId=" + rowId + ", 
lockedRowId=" + lockedRowId;
+
+            this.lockedRowIds.set(null);
+        } else {
+            Set<RowId> rowIds = ((Set<RowId>) lockedRowIds);
+
+            boolean remove = rowIds.remove(rowId);
+
+            assert remove : "rowId=" + rowId + ", lockedRowIds=" + rowIds;
+
+            if (rowIds.isEmpty()) {
+                this.lockedRowIds.set(null);
+            }
+        }
+    }
+
+    /**
+     * Releases all locks {@link #acquireLock(RowId) acquired} by the current 
thread if exists.
+     *
+     * <p>Order of releasing the locks is not defined, each lock will be 
released with all re-entries.
+     */
+    public void releaseAllLockByCurrentThread() {
+        Object lockedRowIds = this.lockedRowIds.get();
+
+        this.lockedRowIds.set(null);
+
+        if (lockedRowIds == null) {
+            return;
+        } else if (lockedRowIds instanceof RowId) {
+            releaseLock0(((RowId) lockedRowIds), true);
+        } else {
+            for (RowId rowId : ((Set<RowId>) lockedRowIds)) {
+                releaseLock0(rowId, true);
+            }
+        }
+    }
+
+    private void acquireLock0(RowId rowId) {
+        LockHolder<ReentrantLock> lockHolder = 
lockHolderByRowId.compute(rowId, (rowId1, reentrantLockLockHolder) -> {
+            if (reentrantLockLockHolder == null) {
+                reentrantLockLockHolder = new LockHolder<>(new 
ReentrantLock());
+            }
+
+            reentrantLockLockHolder.incrementHolders();
+
+            return reentrantLockLockHolder;
+        });
+
+        lockHolder.getLock().lock();
+    }
+
+    private void releaseLock0(RowId rowId, boolean allReentries) {
+        LockHolder<ReentrantLock> lockHolder = lockHolderByRowId.get(rowId);
+
+        if (lockHolder == null) {
+            throw new IllegalStateException("Could not find lock by row ID: " 
+ rowId);
+        }
+
+        ReentrantLock lock = lockHolder.getLock();
+
+        do {
+            lock.unlock();
+
+            lockHolderByRowId.compute(rowId, (rowId1, reentrantLockLockHolder) 
-> {
+                assert reentrantLockLockHolder != null;
+
+                return reentrantLockLockHolder.decrementHolders() ? null : 
reentrantLockLockHolder;
+            });
+        } while (allReentries && lock.isHeldByCurrentThread());
+    }
+}
diff --git 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolderTest.java
 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/LockHolderTest.java
similarity index 96%
rename from 
modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolderTest.java
rename to 
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/LockHolderTest.java
index f0802e7465..73782a5e5d 100644
--- 
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/LockHolderTest.java
+++ 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/LockHolderTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.storage.pagememory.mv;
+package org.apache.ignite.internal.storage.util;
 
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
 import java.util.concurrent.locks.Lock;
+import org.apache.ignite.internal.storage.util.LockHolder;
 import org.junit.jupiter.api.Test;
 
 /**
diff --git 
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/ReentrantLockByRowIdTest.java
 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/ReentrantLockByRowIdTest.java
new file mode 100644
index 0000000000..6749372b52
--- /dev/null
+++ 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/ReentrantLockByRowIdTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.util;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willTimeoutFast;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.storage.RowId;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Class for testing {@link ReentrantLockByRowId}.
+ */
+public class ReentrantLockByRowIdTest {
+    private ReentrantLockByRowId lockByRowId;
+
+    @BeforeEach
+    void setUp() {
+        lockByRowId = new ReentrantLockByRowId();
+    }
+
+    @AfterEach
+    void tearDown() {
+        lockByRowId.releaseAllLockByCurrentThread();
+    }
+
+    @Test
+    void testSimple() {
+        RowId rowId = new RowId(0);
+
+        lockByRowId.acquireLock(rowId);
+        lockByRowId.releaseLock(rowId);
+
+        assertEquals(1, lockByRowId.inLock(rowId, () -> 1));
+    }
+
+    @Test
+    void testSimpleReEnter() {
+        RowId rowId = new RowId(0);
+
+        lockByRowId.acquireLock(rowId);
+        lockByRowId.acquireLock(rowId);
+
+        lockByRowId.inLock(rowId, () -> {
+            lockByRowId.acquireLock(rowId);
+
+            lockByRowId.releaseLock(rowId);
+
+            return null;
+        });
+
+        lockByRowId.releaseLock(rowId);
+        lockByRowId.releaseLock(rowId);
+    }
+
+    @Test
+    void testReleaseError() {
+        assertThrows(IllegalStateException.class, () -> 
lockByRowId.releaseLock(new RowId(0)));
+
+        RowId rowId = new RowId(0);
+
+        assertThat(runAsync(() -> lockByRowId.acquireLock(rowId)), 
willCompleteSuccessfully());
+
+        assertThrows(IllegalMonitorStateException.class, () -> 
lockByRowId.releaseLock(rowId));
+    }
+
+    @Test
+    void testBlockSimple() {
+        RowId rowId = new RowId(0);
+
+        lockByRowId.acquireLock(rowId);
+        lockByRowId.acquireLock(rowId);
+
+        CompletableFuture<?> acquireLockFuture = runAsync(() -> {
+            lockByRowId.acquireLock(rowId);
+            lockByRowId.releaseLock(rowId);
+        });
+
+        assertThat(acquireLockFuture, willTimeoutFast());
+
+        lockByRowId.releaseLock(rowId);
+
+        assertThat(acquireLockFuture, willTimeoutFast());
+
+        lockByRowId.releaseLock(rowId);
+
+        assertThat(acquireLockFuture, willCompleteSuccessfully());
+
+        lockByRowId.acquireLock(rowId);
+    }
+
+    @Test
+    void testBlockSupplier() {
+        RowId rowId = new RowId(0);
+
+        lockByRowId.acquireLock(rowId);
+        lockByRowId.acquireLock(rowId);
+
+        CompletableFuture<?> acquireLockFuture = runAsync(() -> 
lockByRowId.inLock(rowId, () -> 1));
+
+        assertThat(acquireLockFuture, willTimeoutFast());
+
+        lockByRowId.releaseLock(rowId);
+
+        assertThat(acquireLockFuture, willTimeoutFast());
+
+        lockByRowId.releaseLock(rowId);
+
+        assertThat(acquireLockFuture, willCompleteSuccessfully());
+
+        lockByRowId.acquireLock(rowId);
+    }
+
+    @Test
+    void testReleaseAllLocksByCurrentThread() {
+        RowId rowId0 = new RowId(0);
+        RowId rowId1 = new RowId(0);
+
+        lockByRowId.acquireLock(rowId0);
+
+        lockByRowId.acquireLock(rowId1);
+        lockByRowId.acquireLock(rowId1);
+
+        CompletableFuture<?> acquireLockFuture = runAsync(() -> {
+            lockByRowId.acquireLock(rowId0);
+            lockByRowId.acquireLock(rowId1);
+
+            lockByRowId.releaseAllLockByCurrentThread();
+        });
+
+        assertThat(acquireLockFuture, willTimeoutFast());
+
+        lockByRowId.releaseAllLockByCurrentThread();
+
+        assertThat(acquireLockFuture, willCompleteSuccessfully());
+    }
+}
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index 69e031276c..dd323a441a 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -17,9 +17,18 @@
 
 package org.apache.ignite.internal.storage;
 
+import static java.util.stream.Collectors.toCollection;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Stream;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.jetbrains.annotations.Nullable;
@@ -193,19 +202,33 @@ public abstract class 
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
         for (int i = 0; i < REPEATS; i++) {
             addAndCommit.perform(this, TABLE_ROW);
 
+            addAndCommit.perform(this, TABLE_ROW2);
+
             addAndCommit.perform(this, null);
 
+            Collection<ByteBuffer> rows = Stream.of(TABLE_ROW, TABLE_ROW2)
+                    .map(BinaryRow::byteBuffer)
+                    .collect(toCollection(ConcurrentLinkedQueue::new));
+
             runRace(
-                    () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
-                    () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
-                    () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
-                    () -> pollForVacuum(HybridTimestamp.MAX_VALUE)
+                    () -> 
assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow().byteBuffer(),
 rows),
+                    () -> 
assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow().byteBuffer(),
 rows)
             );
 
+            assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
+
             assertNull(storage.closestRowId(ROW_ID));
+
+            assertThat(rows, empty());
         }
     }
 
+    private void assertRemoveRow(ByteBuffer rowBytes, Collection<ByteBuffer> 
rows) {
+        assertNotNull(rowBytes);
+
+        assertTrue(rows.remove(rowBytes), rowBytes.toString());
+    }
+
     @SuppressWarnings("ResultOfMethodCallIgnored")
     private void scanFirstEntry(HybridTimestamp firstCommitTs) {
         try (var cursor = scan(firstCommitTs)) {
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index f1dbbb7478..be754ecacd 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -39,6 +39,7 @@ import 
org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.storage.util.ReentrantLockByRowId;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
@@ -65,6 +66,8 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
     private volatile boolean rebalance;
 
+    final ReentrantLockByRowId lockByRowId = new ReentrantLockByRowId();
+
     public TestMvPartitionStorage(int partitionId) {
         this.partitionId = partitionId;
     }
@@ -108,7 +111,11 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
     public <V> V runConsistently(WriteClosure<V> closure) throws 
StorageException {
         checkStorageClosed();
 
-        return closure.execute();
+        try {
+            return closure.execute();
+        } finally {
+            lockByRowId.releaseAllLockByCurrentThread();
+        }
     }
 
     @Override
@@ -497,6 +504,10 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
         RowId rowId = dequeuedVersionChain.rowId;
 
+        // We must release the lock after executing WriteClosure#execute in 
MvPartitionStorage#runConsistently so that the indexes can be
+        // deleted consistently.
+        lockByRowId.acquireLock(rowId);
+
         VersionChain versionChainToRemove = dequeuedVersionChain.next;
         assert versionChainToRemove != null;
         assert versionChainToRemove.next == null;
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 2de9261ba0..5516a5f127 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
@@ -71,6 +70,7 @@ import 
org.apache.ignite.internal.storage.pagememory.index.sorted.SortedIndexTre
 import 
org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter;
 import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
 import org.apache.ignite.internal.storage.pagememory.mv.gc.GcRowVersion;
+import org.apache.ignite.internal.storage.util.ReentrantLockByRowId;
 import org.apache.ignite.internal.storage.util.StorageState;
 import org.apache.ignite.internal.storage.util.StorageUtils;
 import org.apache.ignite.internal.util.Cursor;
@@ -130,7 +130,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
     protected final AtomicReference<StorageState> state = new 
AtomicReference<>(StorageState.RUNNABLE);
 
     /** Version chain update lock by row ID. */
-    private final ConcurrentMap<RowId, LockHolder<ReentrantLock>> 
updateVersionChainLockByRowId = new ConcurrentHashMap<>();
+    protected final ReentrantLockByRowId updateVersionChainLockByRowId = new 
ReentrantLockByRowId();
 
     /**
      * Constructor.
@@ -941,29 +941,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
      * Organizes external synchronization of update operations for the same 
version chain.
      */
     protected <T> T inUpdateVersionChainLock(RowId rowId, Supplier<T> 
supplier) {
-        LockHolder<ReentrantLock> lockHolder = 
updateVersionChainLockByRowId.compute(rowId, (rowId1, reentrantLockLockHolder) 
-> {
-            if (reentrantLockLockHolder == null) {
-                reentrantLockLockHolder = new LockHolder<>(new 
ReentrantLock());
-            }
-
-            reentrantLockLockHolder.incrementHolders();
-
-            return reentrantLockLockHolder;
-        });
-
-        lockHolder.getLock().lock();
-
-        try {
-            return supplier.get();
-        } finally {
-            lockHolder.getLock().unlock();
-
-            updateVersionChainLockByRowId.compute(rowId, (rowId1, 
reentrantLockLockHolder) -> {
-                assert reentrantLockLockHolder != null;
-
-                return reentrantLockLockHolder.decrementHolders() ? null : 
reentrantLockLockHolder;
-            });
-        }
+        return updateVersionChainLockByRowId.inLock(rowId, supplier);
     }
 
     @Override
@@ -971,33 +949,39 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
         return busy(() -> {
             throwExceptionIfStorageNotInRunnableState();
 
-            GcRowVersion first = gcQueue.getFirst();
+            while (true) {
+                // TODO: IGNITE-18867 Get and delete in one call
+                GcRowVersion head = gcQueue.getFirst();
 
-            // Garbage collection queue is empty.
-            if (first == null) {
-                return null;
-            }
+                // Garbage collection queue is empty.
+                if (head == null) {
+                    return null;
+                }
 
-            HybridTimestamp rowTimestamp = first.getTimestamp();
+                HybridTimestamp rowTimestamp = head.getTimestamp();
 
-            // There are no versions in the garbage collection queue before 
watermark.
-            if (rowTimestamp.compareTo(lowWatermark) > 0) {
-                return null;
-            }
+                // There are no versions in the garbage collection queue 
before watermark.
+                if (rowTimestamp.compareTo(lowWatermark) > 0) {
+                    return null;
+                }
 
-            RowId rowId = first.getRowId();
+                RowId rowId = head.getRowId();
+
+                // If no one has processed the head of the gc queue in 
parallel, then we must release the lock after executing
+                // WriteClosure#execute in MvPartitionStorage#runConsistently 
so that the indexes can be deleted consistently.
+                updateVersionChainLockByRowId.acquireLock(rowId);
 
-            return inUpdateVersionChainLock(rowId, () -> {
                 // Someone processed the element in parallel.
-                // TODO: IGNITE-18843 Should try to get the RowVersion again
-                if (!gcQueue.remove(rowId, rowTimestamp, first.getLink())) {
-                    return null;
+                if (!gcQueue.remove(rowId, rowTimestamp, head.getLink())) {
+                    updateVersionChainLockByRowId.releaseLock(rowId);
+
+                    continue;
                 }
 
-                RowVersion removedRowVersion = removeWriteOnGc(rowId, 
rowTimestamp, first.getLink());
+                RowVersion removedRowVersion = removeWriteOnGc(rowId, 
rowTimestamp, head.getLink());
 
                 return new 
BinaryRowAndRowId(rowVersionToBinaryRow(removedRowVersion), rowId);
-            });
+            }
         });
     }
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 74f5fee5f9..181a0b0a70 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -145,6 +145,8 @@ public class PersistentPageMemoryMvPartitionStorage extends 
AbstractPageMemoryMv
             try {
                 return closure.execute();
             } finally {
+                updateVersionChainLockByRowId.releaseAllLockByCurrentThread();
+
                 checkpointTimeoutLock.checkpointReadUnlock();
             }
         });
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index 78de1781d0..318e1afffb 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -99,7 +99,11 @@ public class VolatilePageMemoryMvPartitionStorage extends 
AbstractPageMemoryMvPa
         return busy(() -> {
             throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(), 
this::createStorageInfo);
 
-            return closure.execute();
+            try {
+                return closure.execute();
+            } finally {
+                updateVersionChainLockByRowId.releaseAllLockByCurrentThread();
+            }
         });
     }
 
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
index 2fd34ea654..e5bb63e3a8 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -173,26 +173,51 @@ class GarbageCollector {
                 return null;
             }
 
-            ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
-            gcKeyBuffer.clear();
+            ByteBuffer gcKeyBuffer = readGcKey(gcIt);
 
-            gcIt.key(gcKeyBuffer);
+            GcRowVersion gcRowVersion = toGcRowVersion(gcKeyBuffer);
 
-            HybridTimestamp gcElementTimestamp = 
readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET);
+            while (true) {
+                if (gcRowVersion.getRowTimestamp().compareTo(lowWatermark) > 
0) {
+                    // No elements to garbage collect.
+                    return null;
+                }
 
-            if (gcElementTimestamp.compareTo(lowWatermark) > 0) {
-                // No elements to garbage collect.
-                return null;
-            }
+                // If no one has processed the head of the gc queue in 
parallel, then we must release the lock after write batch in
+                // WriteClosure#execute of MvPartitionStorage#runConsistently 
so that the indexes can be deleted consistently.
+                helper.lockByRowId.acquireLock(gcRowVersion.getRowId());
+
+                // We must refresh the iterator to try to read the head of the 
gc queue again and if someone deleted it in parallel,
+                // then read the new head of the queue.
+                refreshGcIterator(gcIt, gcKeyBuffer);
+
+                if (invalid(gcIt)) {
+                    // GC queue is empty.
+                    return null;
+                }
+
+                gcKeyBuffer = readGcKey(gcIt);
+
+                GcRowVersion oldGcRowVersion = gcRowVersion;
+
+                gcRowVersion = toGcRowVersion(gcKeyBuffer);
+
+                // Someone has processed the element in parallel, so we need 
to take a new head of the queue.
+                if (!gcRowVersion.equals(oldGcRowVersion)) {
+                    helper.lockByRowId.releaseLock(gcRowVersion.getRowId());
+
+                    continue;
+                }
 
-            RowId gcElementRowId = helper.getRowId(gcKeyBuffer, 
GC_KEY_ROW_ID_OFFSET);
+                break;
+            }
 
             // Delete element from the GC queue.
             batch.delete(gcQueueCf, gcKeyBuffer);
 
             try (RocksIterator it = db.newIterator(partCf, 
helper.upperBoundReadOpts)) {
                 // Process the element in data cf that triggered the addition 
to the GC queue.
-                boolean proceed = checkHasNewerRowAndRemoveTombstone(it, 
batch, gcElementRowId, gcElementTimestamp);
+                boolean proceed = checkHasNewerRowAndRemoveTombstone(it, 
batch, gcRowVersion);
 
                 if (!proceed) {
                     // No further processing required.
@@ -200,9 +225,8 @@ class GarbageCollector {
                 }
 
                 // Find the row that should be garbage collected.
-                ByteBuffer dataKey = getRowForGcKey(it, gcElementRowId);
+                ByteBuffer dataKey = getRowForGcKey(it, 
gcRowVersion.getRowId());
 
-                // TODO: IGNITE-18843 Should try to get the RowVersion again
                 if (dataKey == null) {
                     // No row for GC.
                     return null;
@@ -212,7 +236,7 @@ class GarbageCollector {
                 byte[] valueBytes = it.value();
 
                 var row = new 
ByteBufferRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
-                BinaryRowAndRowId retVal = new BinaryRowAndRowId(row, 
gcElementRowId);
+                BinaryRowAndRowId retVal = new BinaryRowAndRowId(row, 
gcRowVersion.getRowId());
 
                 // Delete the row from the data cf.
                 batch.delete(partCf, dataKey);
@@ -231,18 +255,21 @@ class GarbageCollector {
      *
      * @param it RocksDB data column family iterator.
      * @param batch Write batch.
-     * @param gcElementRowId Row id of the element from the GC queue/
+     * @param gcRowVersion Row version from the GC queue.
      * @return {@code true} if further processing by garbage collector is 
needed.
      */
-    private boolean checkHasNewerRowAndRemoveTombstone(RocksIterator it, 
WriteBatchWithIndex batch, RowId gcElementRowId,
-            HybridTimestamp gcElementTimestamp) throws RocksDBException {
+    private boolean checkHasNewerRowAndRemoveTombstone(
+            RocksIterator it,
+            WriteBatchWithIndex batch,
+            GcRowVersion gcRowVersion
+    ) throws RocksDBException {
         ByteBuffer dataKeyBuffer = MV_KEY_BUFFER.get();
         dataKeyBuffer.clear();
 
         ColumnFamilyHandle partCf = helper.partCf;
 
         // Set up the data key.
-        helper.putDataKey(dataKeyBuffer, gcElementRowId, gcElementTimestamp);
+        helper.putDataKey(dataKeyBuffer, gcRowVersion.getRowId(), 
gcRowVersion.getRowTimestamp());
 
         // Seek to the row id and timestamp from the GC queue.
         // Note that it doesn't mean that the element in this iterator has 
matching row id or even partition id.
@@ -256,7 +283,7 @@ class GarbageCollector {
 
             it.key(dataKeyBuffer);
 
-            if (!helper.getRowId(dataKeyBuffer, 
ROW_ID_OFFSET).equals(gcElementRowId)) {
+            if (!helper.getRowId(dataKeyBuffer, 
ROW_ID_OFFSET).equals(gcRowVersion.getRowId())) {
                 // There is no row for the GC queue element.
                 return false;
             }
@@ -318,4 +345,31 @@ class GarbageCollector {
     void deleteQueue(WriteBatch writeBatch) throws RocksDBException {
         writeBatch.deleteRange(gcQueueCf, helper.partitionStartPrefix(), 
helper.partitionEndPrefix());
     }
+
+    private ByteBuffer readGcKey(RocksIterator gcIt) {
+        ByteBuffer gcKeyBuffer = GC_KEY_BUFFER.get();
+        gcKeyBuffer.clear();
+
+        gcIt.key(gcKeyBuffer);
+
+        return gcKeyBuffer;
+    }
+
+    private GcRowVersion toGcRowVersion(ByteBuffer gcKeyBuffer) {
+        return new GcRowVersion(
+                helper.getRowId(gcKeyBuffer, GC_KEY_ROW_ID_OFFSET),
+                readTimestampNatural(gcKeyBuffer, GC_KEY_TS_OFFSET)
+        );
+    }
+
+    private void refreshGcIterator(RocksIterator gcIt, ByteBuffer gcKeyBuffer) 
throws RocksDBException {
+        gcIt.refresh();
+
+        gcIt.seekForPrev(gcKeyBuffer);
+
+        // Row version was removed from the gc queue by someone, back to the 
head of gc queue.
+        if (invalid(gcIt)) {
+            gcIt.seek(helper.partitionStartPrefix());
+        }
+    }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GcRowVersion.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GcRowVersion.java
new file mode 100644
index 0000000000..4a5d865352
--- /dev/null
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GcRowVersion.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Row versions for garbage collection.
+ */
+final class GcRowVersion {
+    @IgniteToStringInclude
+    private final RowId rowId;
+
+    @IgniteToStringInclude
+    private final HybridTimestamp rowTimestamp;
+
+    GcRowVersion(RowId rowId, HybridTimestamp rowTimestamp) {
+        this.rowId = rowId;
+        this.rowTimestamp = rowTimestamp;
+    }
+
+    RowId getRowId() {
+        return rowId;
+    }
+
+    HybridTimestamp getRowTimestamp() {
+        return rowTimestamp;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        GcRowVersion that = (GcRowVersion) o;
+
+        return rowId.equals(that.rowId) && 
rowTimestamp.equals(that.rowTimestamp);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = rowId.hashCode();
+
+        result = 31 * result + rowTimestamp.hashCode();
+
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(GcRowVersion.class, this);
+    }
+}
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
index db0130adf2..9551f5e186 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.util.ReentrantLockByRowId;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.Slice;
@@ -92,6 +93,8 @@ class PartitionDataHelper implements ManuallyCloseable {
     /** Read options for total order scans. */
     final ReadOptions scanReadOpts;
 
+    final ReentrantLockByRowId lockByRowId = new ReentrantLockByRowId();
+
     PartitionDataHelper(int partitionId, ColumnFamilyHandle partCf) {
         this.partitionId = partitionId;
         this.partCf = partCf;
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 151b165fc1..31278c1a90 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -223,21 +223,23 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
                     pendingAppliedTerm = lastAppliedTerm;
                     pendingGroupConfig = lastGroupConfig;
 
-                    V res = closure.execute();
-
                     try {
+                        V res = closure.execute();
+
                         if (writeBatch.count() > 0) {
                             db.write(writeOpts, writeBatch);
                         }
+
+                        lastAppliedIndex = pendingAppliedIndex;
+                        lastAppliedTerm = pendingAppliedTerm;
+                        lastGroupConfig = pendingGroupConfig;
+
+                        return res;
                     } catch (RocksDBException e) {
                         throw new StorageException("Unable to apply a write 
batch to RocksDB instance.", e);
+                    } finally {
+                        helper.lockByRowId.releaseAllLockByCurrentThread();
                     }
-
-                    lastAppliedIndex = pendingAppliedIndex;
-                    lastAppliedTerm = pendingAppliedTerm;
-                    lastGroupConfig = pendingGroupConfig;
-
-                    return res;
                 } finally {
                     threadLocalWriteBatch.set(null);
                 }


Reply via email to