This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 62e8494fda IGNITE-22050 Fix incorrect partId in reused pages (#3789)
62e8494fda is described below
commit 62e8494fda57e2276849691cb10dcf2785bc7ad0
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon May 20 14:35:05 2024 +0300
IGNITE-22050 Fix incorrect partId in reused pages (#3789)
---
.../pagememory/datastructure/DataStructure.java | 18 ++++++++
.../internal/pagememory/freelist/PagesList.java | 2 +
.../table/distributed/gc/GcStorageHandler.java | 3 +-
.../ignite/internal/table/distributed/gc/MvGc.java | 10 ++++-
.../gc/AbstractGcUpdateHandlerTest.java | 52 ++++++++++++++++++++++
5 files changed, 81 insertions(+), 4 deletions(-)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datastructure/DataStructure.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datastructure/DataStructure.java
index a3c8970cc0..5007a62081 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datastructure/DataStructure.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/datastructure/DataStructure.java
@@ -166,6 +166,10 @@ public abstract class DataStructure implements
ManuallyCloseable {
// Recycled. "pollFreePage" result should be reinitialized to move
rotatedId to itemId.
if (pageId != 0) {
+ // Replace the partition ID, because the reused page might
have come from a different data structure if the reuse
+ // list is shared between them.
+ pageId = replacePartitionId(pageId);
+
pageId = reuseList.initRecycledPage(pageId, defaultPageFlag,
null);
}
}
@@ -183,6 +187,20 @@ public abstract class DataStructure implements
ManuallyCloseable {
return pageId;
}
+ /**
+ * Replaces the "partition ID" part of the given page ID with the
partition ID that his data structure is responsible for.
+ *
+ * @param pageId Original page ID.
+ * @return Page ID with replaced partition ID.
+ */
+ private long replacePartitionId(long pageId) {
+ long partitionIdZeroMask = ~(PageIdUtils.PART_ID_MASK <<
PageIdUtils.PAGE_IDX_SIZE);
+
+ long partitionIdMask = ((long) partId) << PageIdUtils.PAGE_IDX_SIZE;
+
+ return pageId & partitionIdZeroMask | partitionIdMask;
+ }
+
/**
* Allocates a new page.
*
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java
index 0979df874f..1393ff0f9a 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/freelist/PagesList.java
@@ -1377,6 +1377,8 @@ public abstract class PagesList extends DataStructure {
try {
long pageAddr = pageMem.writeLock(grpId, pageId, page);
+ assert pageAddr != 0;
+
try {
return initReusedPage(pageId, pageAddr, partitionId(pageId),
flag, initIo);
} finally {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
index 157a3b84d0..f2d8f74822 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.table.distributed.gc;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
/**
* Container for handling storage by the garbage collector.
@@ -28,7 +27,7 @@ class GcStorageHandler {
/**
* Handler of multi-versioned partition storage and its indexes for
garbage collection.
*
- * @see GcUpdateHandler#vacuumBatch(HybridTimestamp, int)
+ * @see GcUpdateHandler#vacuumBatch
*/
final GcUpdateHandler gcUpdateHandler;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
index 48ab7afefb..a52b8df005 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LO
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import java.util.concurrent.CompletableFuture;
@@ -226,10 +227,13 @@ public class MvGc implements ManuallyCloseable {
.thenApplyAsync(unused ->
gcUpdateHandler.vacuumBatch(lowWatermark, gcConfig.value().batchSize(), true),
executor)
.whenComplete((isGarbageLeft, throwable) -> {
if (throwable != null) {
- if (throwable instanceof TrackerClosedException
- || throwable.getCause() instanceof
TrackerClosedException) {
+ if (unwrapCause(throwable) instanceof
TrackerClosedException) {
+ LOG.debug("TrackerClosedException caught",
throwable);
+
currentGcFuture.complete(null);
} else {
+ LOG.error("Error when running GC",
throwable);
+
currentGcFuture.completeExceptionally(throwable);
}
@@ -245,6 +249,8 @@ public class MvGc implements ManuallyCloseable {
}
});
} catch (Throwable t) {
+ LOG.error("Error when running GC", t);
+
currentGcFuture.completeExceptionally(t);
}
});
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
index 5f795adaaf..a4f8741a3a 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.gc;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -32,6 +33,7 @@ import static org.mockito.Mockito.verify;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.IntStream;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -198,6 +200,56 @@ abstract class AbstractGcUpdateHandlerTest extends
BaseMvStoragesTest {
}
}
+ /**
+ * Tests a particular scenario when some data is inserted into multiple
partition storages, then removed by the GC and then inserted
+ * again.
+ */
+ @Test
+ void testVacuumThenInsert() {
+ int numPartitions = 3;
+
+ int numRows = 1000;
+
+ IndexUpdateHandler indexUpdateHandler = createIndexUpdateHandler();
+
+ List<TestPartitionDataStorage> partitionStorages = IntStream.range(0,
numPartitions)
+ .mapToObj(partId -> new TestPartitionDataStorage(TABLE_ID,
partId, getOrCreateMvPartition(tableStorage, partId)))
+ .collect(toList());
+
+ List<GcUpdateHandler> gcUpdateHandlers = partitionStorages.stream()
+ .map(partitionStorage ->
createGcUpdateHandler(partitionStorage, indexUpdateHandler))
+ .collect(toList());
+
+ BinaryRow row = binaryRow(new TestKey(0, "key"), new TestValue(0,
"value"));
+
+ HybridTimestamp timestamp = clock.now();
+
+ for (int i = 0; i < numPartitions; i++) {
+ TestPartitionDataStorage storage = partitionStorages.get(i);
+
+ for (int j = 0; j < numRows; j++) {
+ var rowId = new RowId(i);
+
+ addWriteCommitted(storage, rowId, row, timestamp);
+ addWriteCommitted(storage, rowId, null, timestamp);
+ }
+ }
+
+ for (GcUpdateHandler gcUpdateHandler : gcUpdateHandlers) {
+ gcUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE,
Integer.MAX_VALUE, true);
+ }
+
+ for (int i = 0; i < numPartitions; i++) {
+ TestPartitionDataStorage storage = partitionStorages.get(i);
+
+ for (int j = 0; j < numRows; j++) {
+ var rowId = new RowId(i);
+
+ addWriteCommitted(storage, rowId, row, timestamp);
+ }
+ }
+ }
+
private TestPartitionDataStorage createPartitionDataStorage() {
return new TestPartitionDataStorage(TABLE_ID, PARTITION_ID,
getOrCreateMvPartition(tableStorage, PARTITION_ID));
}