This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3179cb58e3 IGNITE-18023 Implement GC API in page Memory based MV
partition storages (#1673)
3179cb58e3 is described below
commit 3179cb58e36f8b67c09b63fe3958ec46babd2bcb
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Feb 20 16:30:07 2023 +0300
IGNITE-18023 Implement GC API in page Memory based MV partition storages
(#1673)
---
.../pagememory/persistence/PartitionMeta.java | 37 ++++-
.../pagememory/persistence/io/PartitionMetaIo.java | 27 ++-
.../internal/pagememory/util/PageIdUtils.java | 9 +
.../persistence/PartitionMetaManagerTest.java | 3 +-
.../pagememory/persistence/PartitionMetaTest.java | 2 +-
.../persistence/checkpoint/CheckpointerTest.java | 2 +-
.../AbstractMvPartitionStorageConcurrencyTest.java | 12 +-
.../storage/AbstractMvPartitionStorageGcTest.java | 6 +
.../storage/AbstractMvTableStorageTest.java | 7 +-
.../PersistentPageMemoryTableStorage.java | 58 ++++++-
.../pagememory/VolatilePageMemoryTableStorage.java | 29 +++-
.../pagememory/mv/AbortWriteInvokeClosure.java | 14 +-
.../mv/AbstractPageMemoryMvPartitionStorage.java | 105 +++++++++++-
.../mv/AddWriteCommittedInvokeClosure.java | 55 ++++++-
.../pagememory/mv/AddWriteInvokeClosure.java | 5 +-
.../pagememory/mv/CommitWriteInvokeClosure.java | 65 ++++++--
.../storage/pagememory/mv/FindRowVersion.java | 143 ++++++++++++++++
.../storage/pagememory/mv/HybridTimestamps.java | 10 +-
.../storage/pagememory/mv/MvPageIoModule.java | 9 +-
.../mv/{VersionChainKey.java => MvPageTypes.java} | 47 +++---
.../mv/PersistentPageMemoryMvPartitionStorage.java | 13 +-
.../storage/pagememory/mv/ReadRowVersion.java | 13 +-
.../mv/RemoveWriteOnGcInvokeClosure.java | 183 +++++++++++++++++++++
.../internal/storage/pagememory/mv/RowVersion.java | 27 +--
.../storage/pagememory/mv/RowVersionFreeList.java | 36 +++-
.../storage/pagememory/mv/VersionChain.java | 24 ++-
.../storage/pagememory/mv/VersionChainKey.java | 4 +-
.../mv/VolatilePageMemoryMvPartitionStorage.java | 28 +++-
.../internal/storage/pagememory/mv/gc/GcQueue.java | 138 ++++++++++++++++
.../{VersionChainKey.java => gc/GcRowVersion.java} | 38 ++++-
.../io/GcInnerIo.java} | 36 ++--
.../internal/storage/pagememory/mv/gc/io/GcIo.java | 145 ++++++++++++++++
.../storage/pagememory/mv/gc/io/GcLeafIo.java | 65 ++++++++
.../io/GcMetaIo.java} | 19 +--
.../storage/pagememory/mv/io/BlobFragmentIo.java | 4 +-
.../storage/pagememory/mv/io/RowVersionDataIo.java | 21 ++-
.../pagememory/mv/io/VersionChainInnerIo.java | 5 +-
.../pagememory/mv/io/VersionChainLeafIo.java | 5 +-
.../pagememory/mv/io/VersionChainMetaIo.java | 5 +-
.../storage/pagememory/mv/BlobStorageTest.java | 3 +-
...ageMemoryMvPartitionStorageConcurrencyTest.java | 37 -----
...sistentPageMemoryMvPartitionStorageGcTest.java} | 41 +----
...ageMemoryMvPartitionStorageConcurrencyTest.java | 40 -----
...olatilePageMemoryMvPartitionStorageGcTest.java} | 44 +----
.../internal/storage/rocksdb/GarbageCollector.java | 1 +
45 files changed, 1284 insertions(+), 336 deletions(-)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java
index 7de2046a92..85e9c8424e 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java
@@ -60,6 +60,8 @@ public class PartitionMeta {
private volatile long indexTreeMetaPageId;
+ private volatile long gcQueueMetaPageId;
+
private volatile int pageCount;
private volatile PartitionMetaSnapshot metaSnapshot;
@@ -90,6 +92,7 @@ public class PartitionMeta {
long indexColumnsFreeListRootPageId,
long versionChainTreeRootPageId,
long indexTreeMetaPageId,
+ long gcQueueMetaPageId,
int pageCount
) {
this.lastAppliedIndex = lastAppliedIndex;
@@ -99,6 +102,7 @@ public class PartitionMeta {
this.indexColumnsFreeListRootPageId = indexColumnsFreeListRootPageId;
this.versionChainTreeRootPageId = versionChainTreeRootPageId;
this.indexTreeMetaPageId = indexTreeMetaPageId;
+ this.gcQueueMetaPageId = gcQueueMetaPageId;
this.pageCount = pageCount;
metaSnapshot = new PartitionMetaSnapshot(checkpointId, this);
@@ -121,6 +125,7 @@ public class PartitionMeta {
metaIo.getIndexColumnsFreeListRootPageId(pageAddr),
metaIo.getVersionChainTreeRootPageId(pageAddr),
metaIo.getIndexTreeMetaPageId(pageAddr),
+ metaIo.getGcQueueMetaPageId(pageAddr),
metaIo.getPageCount(pageAddr)
);
}
@@ -248,6 +253,25 @@ public class PartitionMeta {
this.indexTreeMetaPageId = indexTreeMetaPageId;
}
+ /**
+ * Returns garbage collection queue meta page id.
+ */
+ public long gcQueueMetaPageId() {
+ return gcQueueMetaPageId;
+ }
+
+ /**
+ * Sets a garbage collection queue meta page id.
+ *
+ * @param checkpointId Checkpoint id.
+ * @param gcQueueMetaPageId Garbage collection queue meta page id.
+ */
+ public void gcQueueMetaPageId(@Nullable UUID checkpointId, long
gcQueueMetaPageId) {
+ updateSnapshot(checkpointId);
+
+ this.gcQueueMetaPageId = gcQueueMetaPageId;
+ }
+
/**
* Returns count of pages in the partition.
*/
@@ -289,7 +313,6 @@ public class PartitionMeta {
}
}
- /** {@inheritDoc} */
@Override
public String toString() {
return S.toString(PartitionMeta.class, this);
@@ -315,6 +338,8 @@ public class PartitionMeta {
private final long indexTreeMetaPageId;
+ private final long gcQueueMetaPageId;
+
private final int pageCount;
/**
@@ -332,6 +357,7 @@ public class PartitionMeta {
rowVersionFreeListRootPageId =
partitionMeta.rowVersionFreeListRootPageId;
indexColumnsFreeListRootPageId =
partitionMeta.indexColumnsFreeListRootPageId;
indexTreeMetaPageId = partitionMeta.indexTreeMetaPageId;
+ gcQueueMetaPageId = partitionMeta.gcQueueMetaPageId;
pageCount = partitionMeta.pageCount;
}
@@ -384,6 +410,13 @@ public class PartitionMeta {
return indexTreeMetaPageId;
}
+ /**
+ * Returns garbage collection queue meta page id.
+ */
+ public long gcQueueMetaPageId() {
+ return gcQueueMetaPageId;
+ }
+
/**
* Returns count of pages in the partition.
*/
@@ -405,10 +438,10 @@ public class PartitionMeta {
metaIo.setIndexColumnsFreeListRootPageId(pageAddr,
indexColumnsFreeListRootPageId);
metaIo.setRowVersionFreeListRootPageId(pageAddr,
rowVersionFreeListRootPageId);
metaIo.setIndexTreeMetaPageId(pageAddr, indexTreeMetaPageId);
+ metaIo.setGcQueueMetaPageId(pageAddr, gcQueueMetaPageId);
metaIo.setPageCount(pageAddr, pageCount);
}
- /** {@inheritDoc} */
@Override
public String toString() {
return S.toString(PartitionMetaSnapshot.class, this);
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java
index aef9099502..8f7436d5c8 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java
@@ -45,7 +45,9 @@ public class PartitionMetaIo extends PageIo {
public static final int INDEX_TREE_META_PAGE_ID_OFF =
VERSION_CHAIN_TREE_ROOT_PAGE_ID_OFF + Long.BYTES;
- private static final int PAGE_COUNT_OFF = INDEX_TREE_META_PAGE_ID_OFF +
Long.BYTES;
+ private static final int GC_QUEUE_META_PAGE_ID_OFF =
INDEX_TREE_META_PAGE_ID_OFF + Long.BYTES;
+
+ private static final int PAGE_COUNT_OFF = GC_QUEUE_META_PAGE_ID_OFF +
Long.BYTES;
/** Page IO type. */
public static final short T_TABLE_PARTITION_META_IO = 7;
@@ -74,6 +76,7 @@ public class PartitionMetaIo extends PageIo {
setIndexColumnsFreeListRootPageId(pageAddr, 0);
setVersionChainTreeRootPageId(pageAddr, 0);
setIndexTreeMetaPageId(pageAddr, 0);
+ setGcQueueMetaPageId(pageAddr, 0);
setPageCount(pageAddr, 0);
}
@@ -224,6 +227,27 @@ public class PartitionMetaIo extends PageIo {
return getLong(pageAddr, INDEX_TREE_META_PAGE_ID_OFF);
}
+ /**
+ * Sets a garbage collection queue meta page id.
+ *
+ * @param pageAddr Page address.
+ * @param pageId Meta page id.
+ */
+ public void setGcQueueMetaPageId(long pageAddr, long pageId) {
+ assertPageType(pageAddr);
+
+ putLong(pageAddr, GC_QUEUE_META_PAGE_ID_OFF, pageId);
+ }
+
+ /**
+ * Returns an garbage collection queue meta page id.
+ *
+ * @param pageAddr Page address.
+ */
+ public long getGcQueueMetaPageId(long pageAddr) {
+ return getLong(pageAddr, GC_QUEUE_META_PAGE_ID_OFF);
+ }
+
/**
* Sets the count of pages.
*
@@ -256,6 +280,7 @@ public class PartitionMetaIo extends PageIo {
.app("indexColumnsFreeListRootPageId(=").appendHex(getIndexColumnsFreeListRootPageId(addr)).nl()
.app("versionChainTreeRootPageId=").appendHex(getVersionChainTreeRootPageId(addr)).nl()
.app("indexTreeMetaPageId=").appendHex(getIndexTreeMetaPageId(addr)).nl()
+
.app("gcQueueMetaPageId=").appendHex(getGcQueueMetaPageId(addr)).nl()
.app("pageCount=").app(getPageCount(addr)).nl()
.app(']');
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
index fdc8e85d41..126bd03fd5 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
@@ -182,6 +182,15 @@ public final class PageIdUtils {
return (int) ((pageId >>> PAGE_IDX_SIZE) & PART_ID_MASK);
}
+ /**
+ * Extracts partition ID from the page link.
+ *
+ * @param link Page link.
+ */
+ public static int partitionIdFromLink(long link) {
+ return partitionId(pageId(link));
+ }
+
/**
* Extracts rotation ID from the page ID.
*
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java
index 2c74e21921..3e30e9e465 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java
@@ -138,7 +138,7 @@ public class PartitionMetaManagerTest {
try (FilePageStore filePageStore =
createFilePageStore(testFilePath)) {
manager.writeMetaToBuffer(
partId,
- new PartitionMeta(UUID.randomUUID(), 100, 10, 34, 900,
500, 300, 200, 4).metaSnapshot(null),
+ new PartitionMeta(UUID.randomUUID(), 100, 10, 34, 900,
500, 300, 200, 400, 4).metaSnapshot(null),
buffer.rewind()
);
@@ -160,6 +160,7 @@ public class PartitionMetaManagerTest {
assertEquals(500, meta.indexColumnsFreeListRootPageId());
assertEquals(300, meta.versionChainTreeRootPageId());
assertEquals(200, meta.indexTreeMetaPageId());
+ assertEquals(400, meta.gcQueueMetaPageId());
assertEquals(4, meta.pageCount());
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java
index 5481a99b6a..b42cc3dd14 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java
@@ -130,7 +130,7 @@ public class PartitionMetaTest {
void testSnapshot() {
UUID checkpointId = null;
- PartitionMeta meta = new PartitionMeta(checkpointId, 0, 0, 0, 0, 0, 0,
0, 0);
+ PartitionMeta meta = new PartitionMeta(checkpointId, 0, 0, 0, 0, 0, 0,
0, 0, 0);
checkSnapshot(meta.metaSnapshot(checkpointId), 0, 0, 0, 0, 0, 0);
checkSnapshot(meta.metaSnapshot(checkpointId = UUID.randomUUID()), 0,
0, 0, 0, 0, 0);
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index fce7e1834a..edd3bc64d9 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -348,7 +348,7 @@ public class CheckpointerTest {
partitionMetaManager.addMeta(
new GroupPartitionId(0, 0),
- new PartitionMeta(null, 0, 0, 0, 0, 0, 0, 0, 3)
+ new PartitionMeta(null, 0, 0, 0, 0, 0, 0, 0, 0, 3)
);
FilePageStore filePageStore = mock(FilePageStore.class);
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index f96e3d42ef..69e031276c 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -88,7 +88,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
- public void testRegularGcAndRead(AddAndCommit addAndCommit) {
+ void testRegularGcAndRead(AddAndCommit addAndCommit) {
for (int i = 0; i < REPEATS; i++) {
HybridTimestamp firstCommitTs = addAndCommit(TABLE_ROW);
@@ -108,7 +108,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
- public void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
+ void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
for (int i = 0; i < REPEATS; i++) {
HybridTimestamp firstCommitTs = addAndCommit.perform(this,
TABLE_ROW);
@@ -126,7 +126,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
- public void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
+ void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
for (int i = 0; i < REPEATS; i++) {
addAndCommit.perform(this, TABLE_ROW);
@@ -147,7 +147,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
- public void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
+ void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
for (int i = 0; i < REPEATS; i++) {
addAndCommit.perform(this, TABLE_ROW);
@@ -170,7 +170,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
- public void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
+ void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
for (int i = 0; i < REPEATS; i++) {
addAndCommit.perform(this, TABLE_ROW);
@@ -189,7 +189,7 @@ public abstract class
AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
@ParameterizedTest
@EnumSource(AddAndCommit.class)
- public void testConcurrentGc(AddAndCommit addAndCommit) {
+ void testConcurrentGc(AddAndCommit addAndCommit) {
for (int i = 0; i < REPEATS; i++) {
addAndCommit.perform(this, TABLE_ROW);
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
index 3d16b8f86c..c0c3abff25 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
@@ -80,6 +80,9 @@ public abstract class AbstractMvPartitionStorageGcTest
extends BaseMvPartitionSt
assertNull(read(ROW_ID, secondCommitTs));
// Check that tombstone is also deleted from the partition. It must be
empty at this point.
+ assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
+
+ // Let's check that the storage is empty.
assertNull(storage.closestRowId(ROW_ID));
}
@@ -97,6 +100,9 @@ public abstract class AbstractMvPartitionStorageGcTest
extends BaseMvPartitionSt
assertNull(read(ROW_ID, lastCommitTs));
// Check that all tombstones are deleted from the partition. It must
be empty at this point.
+ assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
+
+ // Let's check that the storage is empty.
assertNull(storage.closestRowId(ROW_ID));
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 9a0d8fd757..e644696f00 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -64,7 +64,6 @@ import
org.apache.ignite.internal.schema.testutils.definition.TableDefinition;
import
org.apache.ignite.internal.schema.testutils.definition.index.IndexDefinition;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageEngine;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
@@ -873,11 +872,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
assertThrows(StorageRebalanceException.class, () ->
storage.closestRowId(rowId));
assertThrows(StorageRebalanceException.class, storage::rowsCount);
- // TODO: IGNITE-18020 Add check
- // TODO: IGNITE-18023 Add check
- if (storage instanceof TestMvPartitionStorage) {
- assertThrows(StorageRebalanceException.class, () ->
storage.pollForVacuum(clock.now()));
- }
+ assertThrows(StorageRebalanceException.class, () ->
storage.pollForVacuum(clock.now()));
return null;
});
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 634e1c15b7..f732709438 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -45,6 +45,7 @@ import
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPart
import
org.apache.ignite.internal.storage.pagememory.mv.PersistentPageMemoryMvPartitionStorage;
import org.apache.ignite.internal.storage.pagememory.mv.RowVersionFreeList;
import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.jetbrains.annotations.Nullable;
@@ -127,6 +128,8 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
IndexMetaTree indexMetaTree = createIndexMetaTree(tableView,
partitionId, rowVersionFreeList, pageMemory, meta);
+ GcQueue gcQueue = createGcQueue(tableView, partitionId,
rowVersionFreeList, pageMemory, meta);
+
return new PersistentPageMemoryMvPartitionStorage(
this,
partitionId,
@@ -134,7 +137,8 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
rowVersionFreeList,
indexColumnsFreeList,
versionChainTree,
- indexMetaTree
+ indexMetaTree,
+ gcQueue
);
});
}
@@ -378,6 +382,53 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
}
}
+ /**
+ * Returns new {@link GcQueue} instance for partition.
+ *
+ * @param tableView Table configuration.
+ * @param partitionId Partition ID.
+ * @param reuseList Reuse list.
+ * @param pageMemory Persistent page memory instance.
+ * @param meta Partition metadata.
+ * @throws StorageException If failed.
+ */
+ private GcQueue createGcQueue(
+ TableView tableView,
+ int partitionId,
+ ReuseList reuseList,
+ PersistentPageMemory pageMemory,
+ PartitionMeta meta
+ ) {
+ try {
+ boolean initNew = false;
+
+ if (meta.gcQueueMetaPageId() == 0) {
+ long rootPageId = pageMemory.allocatePage(tableView.tableId(),
partitionId, FLAG_AUX);
+
+ meta.gcQueueMetaPageId(lastCheckpointId(), rootPageId);
+
+ initNew = true;
+ }
+
+ return new GcQueue(
+ tableView.tableId(),
+ tableView.name(),
+ partitionId,
+ dataRegion.pageMemory(),
+ PageLockListenerNoOp.INSTANCE,
+ new AtomicLong(),
+ meta.gcQueueMetaPageId(),
+ reuseList,
+ initNew
+ );
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(
+ String.format("Error creating GarbageCollectionTree
[tableName=%s, partitionId=%s]", tableView.name(), partitionId),
+ e
+ );
+ }
+ }
+
@Override
CompletableFuture<Void>
destroyMvPartitionStorage(AbstractPageMemoryMvPartitionStorage
mvPartitionStorage) {
// It is enough for us to close the partition storage and its indexes
(do not destroy). Prepare the data region, checkpointer, and
@@ -410,12 +461,15 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
IndexMetaTree indexMetaTree = createIndexMetaTree(tableView,
partitionId, rowVersionFreeList, pageMemory, meta);
+ GcQueue gcQueue = createGcQueue(tableView, partitionId,
rowVersionFreeList, pageMemory, meta);
+
((PersistentPageMemoryMvPartitionStorage)
mvPartitionStorage).updateDataStructures(
meta,
rowVersionFreeList,
indexColumnsFreeList,
versionChainTree,
- indexMetaTree
+ indexMetaTree,
+ gcQueue
);
return null;
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index 584b54e29a..38bdffff31 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -32,6 +32,7 @@ import
org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
import
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage;
import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
import
org.apache.ignite.internal.storage.pagememory.mv.VolatilePageMemoryMvPartitionStorage;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
import org.apache.ignite.lang.IgniteInternalCheckedException;
/**
@@ -72,11 +73,14 @@ public class VolatilePageMemoryTableStorage extends
AbstractPageMemoryTableStora
IndexMetaTree indexMetaTree = createIndexMetaTree(partitionId,
tableCfg.value());
+ GcQueue gcQueue = createGarbageCollectionTree(partitionId,
tableCfg.value());
+
return new VolatilePageMemoryMvPartitionStorage(
this,
partitionId,
versionChainTree,
indexMetaTree,
+ gcQueue,
destructionExecutor
);
}
@@ -103,6 +107,28 @@ public class VolatilePageMemoryTableStorage extends
AbstractPageMemoryTableStora
}
}
+ private GcQueue createGarbageCollectionTree(int partitionId, TableView
tableCfgView) {
+ int grpId = tableCfgView.tableId();
+
+ long metaPageId = dataRegion.pageMemory().allocatePage(grpId,
partitionId, FLAG_AUX);
+
+ try {
+ return new GcQueue(
+ grpId,
+ tableCfgView.name(),
+ partitionId,
+ dataRegion.pageMemory(),
+ PageLockListenerNoOp.INSTANCE,
+ new AtomicLong(),
+ metaPageId,
+ dataRegion.reuseList(),
+ true
+ );
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(e);
+ }
+ }
+
@Override
public boolean isVolatile() {
return true;
@@ -156,7 +182,8 @@ public class VolatilePageMemoryTableStorage extends
AbstractPageMemoryTableStora
volatilePartitionStorage.updateDataStructures(
createVersionChainTree(partitionId, tableView),
- createIndexMetaTree(partitionId, tableView)
+ createIndexMetaTree(partitionId, tableView),
+ createGarbageCollectionTree(partitionId, tableView)
);
return completedFuture(null);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
index ad176c65d6..1cb8845d4a 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.storage.pagememory.mv;
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
+import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.DONT_LOAD_VALUE;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
@@ -33,10 +33,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Implementation of {@link InvokeClosure} for {@link
AbstractPageMemoryMvPartitionStorage#abortWrite(RowId)}.
*
- * <p>Synchronization between reading and updating the version chain occurs
due to the locks (read and write) of the page of the tree on
- * which the version chain is located.
- *
- * <p>Synchronization between update operations for the version chain must be
external (by {@link RowId row ID}).
+ * <p>See {@link AbstractPageMemoryMvPartitionStorage} about synchronization.
*
* <p>Operation may throw {@link StorageException} which will cause form
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
*/
@@ -74,10 +71,9 @@ class AbortWriteInvokeClosure implements
InvokeClosure<VersionChain> {
toRemove = latestVersion;
if (latestVersion.hasNextLink()) {
- // Next can be safely replaced with any value (like 0), because
this field is only used when there
- // is some uncommitted value, but when we add an uncommitted
value, we 'fix' such placeholder value
- // (like 0) by replacing it with a valid value.
- newRow = VersionChain.createCommitted(rowId,
latestVersion.nextLink(), NULL_LINK);
+ RowVersion nextVersion =
storage.readRowVersion(latestVersion.nextLink(), DONT_LOAD_VALUE);
+
+ newRow = VersionChain.createCommitted(rowId,
latestVersion.nextLink(), nextVersion.nextLink());
operationType = OperationType.PUT;
} else {
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 0c4394f552..5a800b9508 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.schema.configuration.index.HashIndexView;
import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
@@ -68,6 +69,9 @@ import
org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
import
org.apache.ignite.internal.storage.pagememory.index.sorted.PageMemorySortedIndexStorage;
import
org.apache.ignite.internal.storage.pagememory.index.sorted.SortedIndexTree;
+import
org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcRowVersion;
import org.apache.ignite.internal.storage.util.StorageState;
import org.apache.ignite.internal.storage.util.StorageUtils;
import org.apache.ignite.internal.util.Cursor;
@@ -96,6 +100,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
static final Predicate<HybridTimestamp> ALWAYS_LOAD_VALUE = timestamp ->
true;
+ static final Predicate<HybridTimestamp> DONT_LOAD_VALUE = timestamp ->
false;
+
protected final int partitionId;
protected final int groupId;
@@ -110,6 +116,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
protected volatile IndexMetaTree indexMetaTree;
+ protected volatile GcQueue gcQueue;
+
protected final DataPageReader rowVersionDataPageReader;
protected final ConcurrentMap<UUID, PageMemoryHashIndexStorage>
hashIndexes = new ConcurrentHashMap<>();
@@ -134,6 +142,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
* @param indexFreeList Free list fot {@link IndexColumns}.
* @param versionChainTree Table tree for {@link VersionChain}.
* @param indexMetaTree Tree that contains SQL indexes' metadata.
+ * @param gcQueue Garbage collection queue.
*/
protected AbstractPageMemoryMvPartitionStorage(
int partitionId,
@@ -141,7 +150,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
RowVersionFreeList rowVersionFreeList,
IndexColumnsFreeList indexFreeList,
VersionChainTree versionChainTree,
- IndexMetaTree indexMetaTree
+ IndexMetaTree indexMetaTree,
+ GcQueue gcQueue
) {
this.partitionId = partitionId;
this.tableStorage = tableStorage;
@@ -152,6 +162,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
this.versionChainTree = versionChainTree;
this.indexMetaTree = indexMetaTree;
+ this.gcQueue = gcQueue;
+
PageMemory pageMemory = tableStorage.dataRegion().pageMemory();
groupId = tableStorage.configuration().value().tableId();
@@ -384,6 +396,24 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
return read.result();
}
+ @Nullable RowVersion findRowVersion(VersionChain versionChain,
RowVersionFilter filter, boolean loadValueBytes) {
+ assert versionChain.hasHeadLink();
+
+ FindRowVersion findRowVersion = new FindRowVersion(partitionId,
loadValueBytes);
+
+ try {
+ rowVersionDataPageReader.traverse(versionChain.headLink(),
findRowVersion, filter);
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(
+ "Error when looking up row version in version chain:
[rowId={}, headLink={}, {}]",
+ e,
+ versionChain.rowId(), versionChain.headLink(),
createStorageInfo()
+ );
+ }
+
+ return findRowVersion.getResult();
+ }
+
@Nullable BinaryRow rowVersionToBinaryRow(RowVersion rowVersion) {
if (rowVersion.isTombstone()) {
return null;
@@ -575,7 +605,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
return inUpdateVersionChainLock(rowId, () -> {
try {
- versionChainTree.invoke(new VersionChainKey(rowId), null,
new CommitWriteInvokeClosure(timestamp, this));
+ CommitWriteInvokeClosure commitWrite = new
CommitWriteInvokeClosure(rowId, timestamp, this);
+
+ versionChainTree.invoke(new VersionChainKey(rowId), null,
commitWrite);
+
+ commitWrite.afterCompletion();
return null;
} catch (IgniteInternalCheckedException e) {
@@ -604,11 +638,12 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
return inUpdateVersionChainLock(rowId, () -> {
try {
- versionChainTree.invoke(
- new VersionChainKey(rowId),
- null,
- new AddWriteCommittedInvokeClosure(rowId, row,
commitTimestamp, this)
- );
+ AddWriteCommittedInvokeClosure addWriteCommitted = new
AddWriteCommittedInvokeClosure(rowId, row, commitTimestamp,
+ this);
+
+ versionChainTree.invoke(new VersionChainKey(rowId), null,
addWriteCommitted);
+
+ addWriteCommitted.afterCompletion();
return null;
} catch (IgniteInternalCheckedException e) {
@@ -744,6 +779,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
resources.add(versionChainTree::close);
resources.add(indexMetaTree::close);
+ resources.add(gcQueue::close);
hashIndexes.values().forEach(index -> resources.add(index::close));
sortedIndexes.values().forEach(index -> resources.add(index::close));
@@ -930,4 +966,59 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
});
}
}
+
+ @Override
+ public @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableState();
+
+ GcRowVersion first = gcQueue.getFirst();
+
+ // Garbage collection queue is empty.
+ if (first == null) {
+ return null;
+ }
+
+ HybridTimestamp rowTimestamp = first.getTimestamp();
+
+ // There are no versions in the garbage collection queue before
watermark.
+ if (rowTimestamp.compareTo(lowWatermark) > 0) {
+ return null;
+ }
+
+ RowId rowId = first.getRowId();
+
+ return inUpdateVersionChainLock(rowId, () -> {
+ // Someone processed the element in parallel.
+ // TODO: IGNITE-18843 Should try to get the RowVersion again
+ if (!gcQueue.remove(rowId, rowTimestamp, first.getLink())) {
+ return null;
+ }
+
+ RowVersion removedRowVersion = removeWriteOnGc(rowId,
rowTimestamp, first.getLink());
+
+ return new
BinaryRowAndRowId(rowVersionToBinaryRow(removedRowVersion), rowId);
+ });
+ });
+ }
+
+ private RowVersion removeWriteOnGc(RowId rowId, HybridTimestamp
rowTimestamp, long rowLink) {
+ RemoveWriteOnGcInvokeClosure removeWriteOnGc = new
RemoveWriteOnGcInvokeClosure(rowId, rowTimestamp, rowLink, this);
+
+ try {
+ versionChainTree.invoke(new VersionChainKey(rowId), null,
removeWriteOnGc);
+ } catch (IgniteInternalCheckedException e) {
+ throwStorageExceptionIfItCause(e);
+
+ throw new StorageException(
+ "Error removing row version from version chain on garbage
collection: [rowId={}, rowTimestamp={}, {}]",
+ e,
+ rowId, rowTimestamp, createStorageInfo()
+ );
+ }
+
+ removeWriteOnGc.afterCompletion();
+
+ return removeWriteOnGc.getResult();
+ }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
index f33493c6e3..3fddbc1249 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.storage.pagememory.mv;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.DONT_LOAD_VALUE;
import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.rowBytes;
import java.nio.ByteBuffer;
@@ -25,6 +26,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
@@ -35,10 +37,7 @@ import org.jetbrains.annotations.Nullable;
* Implementation of {@link InvokeClosure} for
* {@link AbstractPageMemoryMvPartitionStorage#addWriteCommitted(RowId,
BinaryRow, HybridTimestamp)}.
*
- * <p>Synchronization between reading and updating the version chain occurs
due to the locks (read and write) of the page of the tree on
- * which the version chain is located.
- *
- * <p>Synchronization between update operations for the version chain must be
external (by {@link RowId row ID}).
+ * <p>See {@link AbstractPageMemoryMvPartitionStorage} about synchronization.
*
* <p>Operation may throw {@link StorageException} which will cause form
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
*/
@@ -51,8 +50,18 @@ class AddWriteCommittedInvokeClosure implements
InvokeClosure<VersionChain> {
private final AbstractPageMemoryMvPartitionStorage storage;
+ private OperationType operationType;
+
private @Nullable VersionChain newRow;
+ /**
+ * Row version that will be added to the garbage collection queue when the
{@link #afterCompletion() closure completes}.
+ *
+ * <p>Row version must be committed. It will be a {@link
PageIdUtils#NULL_LINK} if the current and the previous row versions are
+ * tombstones or have only one row version in the version chain.
+ */
+ private long rowLinkForAddToGcQueue = NULL_LINK;
+
AddWriteCommittedInvokeClosure(
RowId rowId,
@Nullable BinaryRow row,
@@ -72,23 +81,42 @@ class AddWriteCommittedInvokeClosure implements
InvokeClosure<VersionChain> {
throw new StorageException("Write intent exists: [rowId={}, {}]",
oldRow.rowId(), storage.createStorageInfo());
}
- long nextLink = oldRow == null ? NULL_LINK :
oldRow.newestCommittedLink();
+ if (oldRow == null) {
+ operationType = OperationType.PUT;
+
+ RowVersion newVersion = insertCommittedRowVersion(row,
commitTimestamp, NULL_LINK);
+
+ newRow = VersionChain.createCommitted(rowId, newVersion.link(),
newVersion.nextLink());
+ } else {
+ RowVersion current = storage.readRowVersion(oldRow.headLink(),
DONT_LOAD_VALUE);
+
+ // If the current and new version are tombstones, then there is no
need to add a new version.
+ if (current.isTombstone() && row == null) {
+ operationType = OperationType.NOOP;
+ } else {
+ operationType = OperationType.PUT;
- RowVersion newVersion = insertCommittedRowVersion(row,
commitTimestamp, nextLink);
+ RowVersion newVersion = insertCommittedRowVersion(row,
commitTimestamp, oldRow.headLink());
- newRow = VersionChain.createCommitted(rowId, newVersion.link(),
newVersion.nextLink());
+ newRow = VersionChain.createCommitted(rowId,
newVersion.link(), newVersion.nextLink());
+
+ rowLinkForAddToGcQueue = newVersion.link();
+ }
+ }
}
@Override
public @Nullable VersionChain newRow() {
- assert newRow != null;
+ assert operationType == OperationType.PUT ? newRow != null : newRow ==
null : "newRow=" + newRow + ", op=" + operationType;
return newRow;
}
@Override
public OperationType operationType() {
- return OperationType.PUT;
+ assert operationType != null;
+
+ return operationType;
}
private RowVersion insertCommittedRowVersion(@Nullable BinaryRow row,
HybridTimestamp commitTimestamp, long nextPartitionlessLink) {
@@ -100,4 +128,13 @@ class AddWriteCommittedInvokeClosure implements
InvokeClosure<VersionChain> {
return rowVersion;
}
+
+ /**
+ * Method to call after {@link BplusTree#invoke(Object, Object,
InvokeClosure)} has completed.
+ */
+ void afterCompletion() {
+ if (rowLinkForAddToGcQueue != NULL_LINK) {
+ storage.gcQueue.add(rowId, commitTimestamp,
rowLinkForAddToGcQueue);
+ }
+ }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
index bf66c5a1f1..74f81301c5 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteInvokeClosure.java
@@ -37,10 +37,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Implementation of {@link InvokeClosure} for {@link
AbstractPageMemoryMvPartitionStorage#addWrite(RowId, BinaryRow, UUID, UUID,
int)}.
*
- * <p>Synchronization between reading and updating the version chain occurs
due to the locks (read and write) of the page of the tree on
- * which the version chain is located.
- *
- * <p>Synchronization between update operations for the version chain must be
external (by {@link RowId row ID}).
+ * <p>See {@link AbstractPageMemoryMvPartitionStorage} about synchronization.
*
* <p>Operation may throw {@link StorageException} and {@link
TxIdMismatchException} which will cause form
* {@link BplusTree#invoke(Object, Object, InvokeClosure)}.
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
index 01b1be8e63..3a9b6e28e5 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
@@ -17,10 +17,14 @@
package org.apache.ignite.internal.storage.pagememory.mv;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.DONT_LOAD_VALUE;
+
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -29,14 +33,13 @@ import org.jetbrains.annotations.Nullable;
/**
* Implementation of {@link InvokeClosure} for {@link
AbstractPageMemoryMvPartitionStorage#commitWrite(RowId, HybridTimestamp)}.
*
- * <p>Synchronization between reading and updating the version chain occurs
due to the locks (read and write) of the page of the tree on
- * which the version chain is located.
- *
- * <p>Synchronization between update operations for the version chain must be
external (by {@link RowId row ID}).
+ * <p>See {@link AbstractPageMemoryMvPartitionStorage} about synchronization.
*
* <p>Operation may throw {@link StorageException} which will cause form
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
*/
class CommitWriteInvokeClosure implements InvokeClosure<VersionChain> {
+ private final RowId rowId;
+
private final HybridTimestamp timestamp;
private final AbstractPageMemoryMvPartitionStorage storage;
@@ -45,9 +48,20 @@ class CommitWriteInvokeClosure implements
InvokeClosure<VersionChain> {
private @Nullable VersionChain newRow;
- private @Nullable Long updateTimestampLink;
+ private long updateTimestampLink = NULL_LINK;
+
+ private @Nullable RowVersion toRemove;
+
+ /**
+ * Row version that will be added to the garbage collection queue when the
{@link #afterCompletion() closure completes}.
+ *
+ * <p>Row version must be committed. It will be a {@link
PageIdUtils#NULL_LINK} if the current and the previous row versions are
+ * tombstones or have only one row version in the version chain.
+ */
+ private long rowLinkForAddToGcQueue = NULL_LINK;
- CommitWriteInvokeClosure(HybridTimestamp timestamp,
AbstractPageMemoryMvPartitionStorage storage) {
+ CommitWriteInvokeClosure(RowId rowId, HybridTimestamp timestamp,
AbstractPageMemoryMvPartitionStorage storage) {
+ this.rowId = rowId;
this.timestamp = timestamp;
this.storage = storage;
}
@@ -61,11 +75,25 @@ class CommitWriteInvokeClosure implements
InvokeClosure<VersionChain> {
return;
}
- updateTimestampLink = oldRow.headLink();
-
operationType = OperationType.PUT;
- newRow = VersionChain.createCommitted(oldRow.rowId(),
oldRow.headLink(), oldRow.nextLink());
+ RowVersion current = storage.readRowVersion(oldRow.headLink(),
DONT_LOAD_VALUE);
+ RowVersion next = oldRow.hasNextLink() ?
storage.readRowVersion(oldRow.nextLink(), DONT_LOAD_VALUE) : null;
+
+ // If the previous and current version are tombstones, then delete the
current version.
+ if (next != null && current.isTombstone() && next.isTombstone()) {
+ toRemove = current;
+
+ newRow = VersionChain.createCommitted(oldRow.rowId(), next.link(),
next.nextLink());
+ } else {
+ updateTimestampLink = oldRow.headLink();
+
+ newRow = VersionChain.createCommitted(oldRow.rowId(),
oldRow.headLink(), oldRow.nextLink());
+
+ if (oldRow.hasNextLink()) {
+ rowLinkForAddToGcQueue = oldRow.headLink();
+ }
+ }
}
@Override
@@ -84,10 +112,10 @@ class CommitWriteInvokeClosure implements
InvokeClosure<VersionChain> {
@Override
public void onUpdate() {
- assert operationType == OperationType.PUT ? updateTimestampLink !=
null : updateTimestampLink == null :
+ assert operationType == OperationType.PUT || updateTimestampLink ==
NULL_LINK :
"link=" + updateTimestampLink + ", op=" + operationType;
- if (updateTimestampLink != null) {
+ if (updateTimestampLink != NULL_LINK) {
try {
storage.rowVersionFreeList.updateTimestamp(updateTimestampLink, timestamp);
} catch (IgniteInternalCheckedException e) {
@@ -98,4 +126,19 @@ class CommitWriteInvokeClosure implements
InvokeClosure<VersionChain> {
}
}
}
+
+ /**
+ * Method to call after {@link BplusTree#invoke(Object, Object,
InvokeClosure)} has completed.
+ */
+ void afterCompletion() {
+ assert operationType == OperationType.PUT || toRemove == null :
"toRemove=" + toRemove + ", op=" + operationType;
+
+ if (toRemove != null) {
+ storage.removeRowVersion(toRemove);
+ }
+
+ if (rowLinkForAddToGcQueue != NULL_LINK) {
+ storage.gcQueue.add(rowId, timestamp, rowLinkForAddToGcQueue);
+ }
+ }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
new file mode 100644
index 0000000000..b42e3e9417
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/FindRowVersion.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
+import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionIdFromLink;
+import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
+import org.apache.ignite.internal.pagememory.io.DataPagePayload;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import
org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Search for a row version in version chains.
+ */
+class FindRowVersion implements PageMemoryTraversal<RowVersionFilter> {
+ private final int partitionId;
+
+ private final boolean loadValueBytes;
+
+ private boolean rowVersionFound;
+
+ private final ReadRowVersionValue readRowVersionValue = new
ReadRowVersionValue();
+
+ private long rowLink = NULL_LINK;
+
+ private @Nullable HybridTimestamp rowTimestamp;
+
+ private long rowNextLink = NULL_LINK;
+
+ private int rowValueSize;
+
+ private @Nullable RowVersion result;
+
+ FindRowVersion(int partitionId, boolean loadValueBytes) {
+ this.partitionId = partitionId;
+ this.loadValueBytes = loadValueBytes;
+ }
+
+ @Override
+ public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, RowVersionFilter filter) {
+ if (rowVersionFound) {
+ return readRowVersionValue.consumePagePayload(link, pageAddr,
payload, null);
+ }
+
+ long nextLink = readPartitionless(partitionId, pageAddr,
payload.offset() + RowVersion.NEXT_LINK_OFFSET);
+
+ if (!filter.apply(link, pageAddr + payload.offset())) {
+ return nextLink;
+ }
+
+ rowVersionFound = true;
+
+ rowLink = link;
+ rowTimestamp = HybridTimestamps.readTimestamp(pageAddr,
payload.offset() + RowVersion.TIMESTAMP_OFFSET);
+ rowNextLink = nextLink;
+
+ if (loadValueBytes) {
+ return readRowVersionValue.consumePagePayload(link, pageAddr,
payload, null);
+ }
+
+ rowValueSize = PageUtils.getInt(pageAddr, payload.offset() +
RowVersion.VALUE_SIZE_OFFSET);
+
+ return STOP_TRAVERSAL;
+ }
+
+ @Override
+ public void finish() {
+ if (!rowVersionFound) {
+ return;
+ }
+
+ if (loadValueBytes) {
+ readRowVersionValue.finish();
+
+ byte[] valueBytes = readRowVersionValue.result();
+
+ ByteBuffer value =
ByteBuffer.wrap(valueBytes).order(ByteBufferRow.ORDER);
+
+ result = new RowVersion(partitionId, rowLink, rowTimestamp,
rowNextLink, value);
+ } else {
+ result = new RowVersion(partitionId, rowLink, rowTimestamp,
rowNextLink, rowValueSize);
+ }
+ }
+
+ /**
+ * Returns the found version in the version chain, {@code null} if not
found.
+ */
+ @Nullable RowVersion getResult() {
+ return result;
+ }
+
+ /**
+ * Row version filter in the version chain.
+ */
+ @FunctionalInterface
+ interface RowVersionFilter {
+ /**
+ * Returns {@code true} if the version matches.
+ *
+ * @param rowVersionLink Row version link;
+ * @param rowVersionAddr Address of row version (including page
address + offset within it).
+ */
+ boolean apply(long rowVersionLink, long rowVersionAddr);
+
+ static RowVersionFilter equalsByTimestamp(@Nullable HybridTimestamp
timestamp) {
+ return (rowVersionLink, rowVersionAddr) -> {
+ HybridTimestamp readTimestamp =
HybridTimestamps.readTimestamp(rowVersionAddr, RowVersion.TIMESTAMP_OFFSET);
+
+ return Objects.equals(timestamp, readTimestamp);
+ };
+ }
+
+ static RowVersionFilter equalsByNextLink(long nextLink) {
+ return (rowVersionLink, rowVersionAddr) -> {
+ long readNextLink =
readPartitionless(partitionIdFromLink(rowVersionLink), rowVersionAddr,
RowVersion.NEXT_LINK_OFFSET);
+
+ return readNextLink == nextLink;
+ };
+ }
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/HybridTimestamps.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/HybridTimestamps.java
index e04848be7b..15e9b7b3b3 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/HybridTimestamps.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/HybridTimestamps.java
@@ -31,14 +31,10 @@ import org.jetbrains.annotations.Nullable;
* Code to work with {@link HybridTimestamp}s.
*/
public class HybridTimestamps {
- /**
- * Physical time component to store for {@code null} hybrid timestamp
values.
- */
+ /** Physical time component to store for {@code null} hybrid timestamp
values. */
private static final long NULL_PHYSICAL_TIME = 0L;
- /**
- * Logical time component to store for {@code null} hybrid timestamp
values.
- */
+ /** Logical time component to store for {@code null} hybrid timestamp
values. */
private static final int NULL_LOGICAL_TIME = 0;
/**
@@ -47,7 +43,7 @@ public class HybridTimestamps {
* @param pageAddr Address where page data starts.
* @param offset Offset to the timestamp value relative to pageAddr.
*/
- static @Nullable HybridTimestamp readTimestamp(long pageAddr, int offset) {
+ public static @Nullable HybridTimestamp readTimestamp(long pageAddr, int
offset) {
long physical = getLong(pageAddr, offset);
int logical = getInt(pageAddr, offset + Long.BYTES);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/MvPageIoModule.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/MvPageIoModule.java
index 939c010ce8..dccc507abc 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/MvPageIoModule.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/MvPageIoModule.java
@@ -22,6 +22,9 @@ import java.util.Collection;
import java.util.List;
import org.apache.ignite.internal.pagememory.io.IoVersions;
import org.apache.ignite.internal.pagememory.io.PageIoModule;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.io.GcInnerIo;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.io.GcLeafIo;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.io.GcMetaIo;
import org.apache.ignite.internal.storage.pagememory.mv.io.BlobFragmentIo;
import org.apache.ignite.internal.storage.pagememory.mv.io.RowVersionDataIo;
import org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainInnerIo;
@@ -34,7 +37,6 @@ import
org.apache.ignite.internal.storage.pagememory.mv.io.VersionChainMetaIo;
*/
@AutoService(PageIoModule.class)
public class MvPageIoModule implements PageIoModule {
- /** {@inheritDoc} */
@Override
public Collection<IoVersions<?>> ioVersions() {
return List.of(
@@ -42,7 +44,10 @@ public class MvPageIoModule implements PageIoModule {
VersionChainInnerIo.VERSIONS,
VersionChainLeafIo.VERSIONS,
RowVersionDataIo.VERSIONS,
- BlobFragmentIo.VERSIONS
+ BlobFragmentIo.VERSIONS,
+ GcMetaIo.VERSIONS,
+ GcInnerIo.VERSIONS,
+ GcLeafIo.VERSIONS
);
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainKey.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/MvPageTypes.java
similarity index 52%
copy from
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainKey.java
copy to
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/MvPageTypes.java
index 577ad7efbd..8b7d0f4673 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainKey.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/MvPageTypes.java
@@ -17,28 +17,31 @@
package org.apache.ignite.internal.storage.pagememory.mv;
-import org.apache.ignite.internal.storage.RowId;
-
/**
- * Search key for the {@link VersionChainTree}.
+ * Collection of all page types that relate to multi-versioned partition
storage.
*/
-public class VersionChainKey {
- /** Row id. */
- private final RowId rowId;
-
- /**
- * Constructor.
- *
- * @param rowId Search row id.
- */
- public VersionChainKey(RowId rowId) {
- this.rowId = rowId;
- }
-
- /**
- * Returns a row id.
- */
- public RowId rowId() {
- return rowId;
- }
+public interface MvPageTypes {
+ /** Version chain tree meta page IO type. */
+ short T_VERSION_CHAIN_META_IO = 9;
+
+ /** Version chain tree inner page IO type. */
+ short T_VERSION_CHAIN_INNER_IO = 10;
+
+ /** Version chain tree leaf page IO type. */
+ short T_VERSION_CHAIN_LEAF_IO = 11;
+
+ /** Row version data page IO type. */
+ short T_ROW_VERSION_DATA_IO = 12;
+
+ /** Blob fragment page IO type. */
+ short T_BLOB_FRAGMENT_IO = 13;
+
+ /** Garbage collection queue meta page IO type. */
+ short T_GC_META_IO = 14;
+
+ /** Garbage collection queue inner page IO type. */
+ short T_GC_INNER_IO = 15;
+
+ /** Garbage collection queue leaf page IO type. */
+ short T_GC_LEAF_IO = 16;
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index cb697aac5c..1d01d14e68 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -48,6 +48,7 @@ import
org.apache.ignite.internal.storage.pagememory.index.hash.PageMemoryHashIn
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
import
org.apache.ignite.internal.storage.pagememory.index.sorted.PageMemorySortedIndexStorage;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.jetbrains.annotations.Nullable;
@@ -86,6 +87,7 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
* @param indexFreeList Free list fot {@link IndexColumns}.
* @param versionChainTree Table tree for {@link VersionChain}.
* @param indexMetaTree Tree that contains SQL indexes' metadata.
+ * @param gcQueue Garbage collection queue.
*/
public PersistentPageMemoryMvPartitionStorage(
PersistentPageMemoryTableStorage tableStorage,
@@ -94,9 +96,10 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
RowVersionFreeList rowVersionFreeList,
IndexColumnsFreeList indexFreeList,
VersionChainTree versionChainTree,
- IndexMetaTree indexMetaTree
+ IndexMetaTree indexMetaTree,
+ GcQueue gcQueue
) {
- super(partitionId, tableStorage, rowVersionFreeList, indexFreeList,
versionChainTree, indexMetaTree);
+ super(partitionId, tableStorage, rowVersionFreeList, indexFreeList,
versionChainTree, indexMetaTree, gcQueue);
checkpointManager = tableStorage.engine().checkpointManager();
checkpointTimeoutLock = checkpointManager.checkpointTimeoutLock();
@@ -371,6 +374,7 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
* @param indexFreeList Free list fot {@link IndexColumns}.
* @param versionChainTree Table tree for {@link VersionChain}.
* @param indexMetaTree Tree that contains SQL indexes' metadata.
+ * @param gcQueue Garbage collection queue.
* @throws StorageException If failed.
*/
public void updateDataStructures(
@@ -378,7 +382,8 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
RowVersionFreeList rowVersionFreeList,
IndexColumnsFreeList indexFreeList,
VersionChainTree versionChainTree,
- IndexMetaTree indexMetaTree
+ IndexMetaTree indexMetaTree,
+ GcQueue gcQueue
) {
throwExceptionIfStorageNotInCleanupOrRebalancedState(state.get(),
this::createStorageInfo);
@@ -388,6 +393,7 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
this.indexFreeList = indexFreeList;
this.versionChainTree = versionChainTree;
this.indexMetaTree = indexMetaTree;
+ this.gcQueue = gcQueue;
this.blobStorage = new BlobStorage(
rowVersionFreeList,
@@ -419,6 +425,7 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
indexFreeList::close,
versionChainTree::close,
indexMetaTree::close,
+ gcQueue::close,
blobStorage::close
);
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
index e5f83b0650..ef44a3a70c 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/ReadRowVersion.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.storage.pagememory.mv;
+import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionIdFromLink;
import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.readPartitionless;
import java.nio.ByteBuffer;
@@ -24,7 +25,7 @@ import java.util.function.Predicate;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.pagememory.datapage.PageMemoryTraversal;
import org.apache.ignite.internal.pagememory.io.DataPagePayload;
-import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.jetbrains.annotations.Nullable;
@@ -50,7 +51,6 @@ class ReadRowVersion implements
PageMemoryTraversal<Predicate<HybridTimestamp>>
this.partitionId = partitionId;
}
- /** {@inheritDoc} */
@Override
public long consumePagePayload(long link, long pageAddr, DataPagePayload
payload, Predicate<HybridTimestamp> loadValue) {
if (readingFirstSlot) {
@@ -69,7 +69,9 @@ class ReadRowVersion implements
PageMemoryTraversal<Predicate<HybridTimestamp>>
nextLink = readPartitionless(partitionId, pageAddr, payload.offset() +
RowVersion.NEXT_LINK_OFFSET);
if (!loadValue.test(timestamp)) {
- result = new RowVersion(partitionIdFromLink(link),
firstFragmentLink, timestamp, nextLink, null);
+ int valueSize = PageUtils.getInt(pageAddr, payload.offset() +
RowVersion.VALUE_SIZE_OFFSET);
+
+ result = new RowVersion(partitionIdFromLink(link),
firstFragmentLink, timestamp, nextLink, valueSize);
return STOP_TRAVERSAL;
}
@@ -77,11 +79,6 @@ class ReadRowVersion implements
PageMemoryTraversal<Predicate<HybridTimestamp>>
return readRowVersionValue.consumePagePayload(link, pageAddr, payload,
null);
}
- private int partitionIdFromLink(long link) {
- return PageIdUtils.partitionId(PageIdUtils.pageId(link));
- }
-
- /** {@inheritDoc} */
@Override
public void finish() {
if (result != null) {
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RemoveWriteOnGcInvokeClosure.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RemoveWriteOnGcInvokeClosure.java
new file mode 100644
index 0000000000..e81e38d70f
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RemoveWriteOnGcInvokeClosure.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
+import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.DONT_LOAD_VALUE;
+import static
org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter.equalsByNextLink;
+
+import java.util.List;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} for deleting a row version in
version chain on garbage collection in
+ * {@link AbstractPageMemoryMvPartitionStorage#pollForVacuum(HybridTimestamp)}.
+ *
+ * <p>See {@link AbstractPageMemoryMvPartitionStorage} about synchronization.
+ *
+ * <p>Operation may throw {@link StorageException} which will cause form
{@link BplusTree#invoke(Object, Object, InvokeClosure)}.
+ */
+public class RemoveWriteOnGcInvokeClosure implements
InvokeClosure<VersionChain> {
+ private final RowId rowId;
+
+ private final HybridTimestamp timestamp;
+
+ private final long link;
+
+ private final AbstractPageMemoryMvPartitionStorage storage;
+
+ private OperationType operationType;
+
+ private @Nullable VersionChain newRow;
+
+ private List<RowVersion> toRemove;
+
+ private RowVersion result;
+
+ private @Nullable RowVersion toUpdate;
+
+ RemoveWriteOnGcInvokeClosure(RowId rowId, HybridTimestamp timestamp, long
link, AbstractPageMemoryMvPartitionStorage storage) {
+ this.rowId = rowId;
+ this.timestamp = timestamp;
+ this.link = link;
+ this.storage = storage;
+ }
+
+ @Override
+ public void call(@Nullable VersionChain oldRow) throws
IgniteInternalCheckedException {
+ assert oldRow != null : "rowId=" + rowId + ", storage=" +
storage.createStorageInfo();
+ assert oldRow.hasNextLink() : oldRow;
+
+ RowVersion rowVersion = readRowVersionWithChecks(oldRow);
+ RowVersion nextRowVersion =
storage.readRowVersion(rowVersion.nextLink(), ALWAYS_LOAD_VALUE);
+
+ result = nextRowVersion;
+
+ // If the found version is a tombstone, then we must remove it as well.
+ if (rowVersion.isTombstone()) {
+ toRemove = List.of(nextRowVersion, rowVersion);
+
+ // If the found version is the head of the chain, then delete the
entire chain.
+ if (oldRow.headLink() == rowVersion.link()) {
+ operationType = OperationType.REMOVE;
+ } else if (oldRow.nextLink() == rowVersion.link()) {
+ operationType = OperationType.PUT;
+
+ // Find the version for which this version is
RowVersion#nextLink.
+ toUpdate = storage.readRowVersion(oldRow.headLink(),
DONT_LOAD_VALUE);
+
+ newRow = oldRow.withNextLink(nextRowVersion.nextLink());
+ } else {
+ operationType = OperationType.PUT;
+
+ newRow = oldRow;
+
+ // Find the version for which this version is
RowVersion#nextLink.
+ toUpdate = storage.findRowVersion(oldRow,
equalsByNextLink(rowVersion.link()), false);
+ }
+ } else {
+ operationType = OperationType.PUT;
+
+ toRemove = List.of(nextRowVersion);
+
+ toUpdate = rowVersion;
+
+ if (oldRow.headLink() == rowVersion.link()) {
+ newRow = oldRow.withNextLink(nextRowVersion.nextLink());
+ } else {
+ newRow = oldRow;
+ }
+ }
+ }
+
+ @Override
+ public @Nullable VersionChain newRow() {
+ assert operationType == OperationType.PUT ? newRow != null : newRow ==
null : "newRow=" + newRow + ", op=" + operationType;
+
+ return newRow;
+ }
+
+ @Override
+ public OperationType operationType() {
+ assert operationType != null;
+
+ return operationType;
+ }
+
+ @Override
+ public void onUpdate() {
+ if (toUpdate != null) {
+ try {
+ storage.rowVersionFreeList.updateNextLink(toUpdate.link(),
result.nextLink());
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(
+ "Error updating the next link: [rowId={},
timestamp={}, rowLink={}, nextLink={}, {}]",
+ e,
+ newRow.rowId(), timestamp, toUpdate.link(),
result.nextLink(), storage.createStorageInfo()
+ );
+ }
+ }
+ }
+
+ private RowVersion readRowVersionWithChecks(VersionChain versionChain) {
+ RowVersion rowVersion = storage.readRowVersion(link,
ALWAYS_LOAD_VALUE);
+
+ if (rowVersion == null) {
+ throw new StorageException(
+ "Could not find row version in the version chain:
[rowId={}, timestamp={}, {}]",
+ versionChain.rowId(), timestamp,
storage.createStorageInfo()
+ );
+ }
+
+ if (!rowVersion.hasNextLink()) {
+ throw new StorageException(
+ "Missing next row version: [rowId={}, timestamp={}, {}]",
+ versionChain.rowId(), timestamp,
storage.createStorageInfo()
+ );
+ }
+
+ return rowVersion;
+ }
+
+ /**
+ * Method to call after {@link BplusTree#invoke(Object, Object,
InvokeClosure)} has completed.
+ */
+ void afterCompletion() {
+ toRemove.forEach(storage::removeRowVersion);
+
+ if (toUpdate != null && !result.hasNextLink()) {
+ storage.gcQueue.remove(rowId, toUpdate.timestamp(),
toUpdate.link());
+ }
+ }
+
+ /**
+ * Returns the version that was removed in the version chain on garbage
collection.
+ */
+ RowVersion getResult() {
+ assert result != null;
+
+ return result;
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
index 200ddee03f..a006e8d57c 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersion.java
@@ -21,7 +21,6 @@ import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SI
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK;
import java.nio.ByteBuffer;
-import java.util.Objects;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.pagememory.Storable;
import org.apache.ignite.internal.pagememory.io.AbstractDataPageIo;
@@ -80,10 +79,23 @@ public final class RowVersion implements Storable {
this.timestamp = timestamp;
this.nextLink = nextLink;
- this.valueSize = value == null ? -1 : value.limit();
+ this.valueSize = value == null ? 0 : value.limit();
this.value = value;
}
+ /**
+ * Constructor.
+ */
+ public RowVersion(int partitionId, long link, @Nullable HybridTimestamp
timestamp, long nextLink, int valueSize) {
+ this.partitionId = partitionId;
+ link(link);
+
+ this.timestamp = timestamp;
+ this.nextLink = nextLink;
+ this.valueSize = valueSize;
+ this.value = null;
+ }
+
public @Nullable HybridTimestamp timestamp() {
return timestamp;
}
@@ -99,8 +111,8 @@ public final class RowVersion implements Storable {
return valueSize;
}
- public ByteBuffer value() {
- return Objects.requireNonNull(value);
+ public @Nullable ByteBuffer value() {
+ return value;
}
public boolean hasNextLink() {
@@ -127,25 +139,21 @@ public final class RowVersion implements Storable {
return timestamp != null;
}
- /** {@inheritDoc} */
@Override
public final void link(long link) {
this.link = link;
}
- /** {@inheritDoc} */
@Override
public final long link() {
return link;
}
- /** {@inheritDoc} */
@Override
public final int partition() {
return partitionId;
}
- /** {@inheritDoc} */
@Override
public int size() {
assert value != null;
@@ -153,19 +161,16 @@ public final class RowVersion implements Storable {
return headerSize() + value.limit();
}
- /** {@inheritDoc} */
@Override
public int headerSize() {
return HYBRID_TIMESTAMP_SIZE + NEXT_LINK_STORE_SIZE_BYTES +
VALUE_SIZE_STORE_SIZE_BYTES;
}
- /** {@inheritDoc} */
@Override
public IoVersions<? extends AbstractDataPageIo<?>> ioVersions() {
return RowVersionDataIo.VERSIONS;
}
- /** {@inheritDoc} */
@Override
public String toString() {
return S.toString(RowVersion.class, this);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
index 3bc3cac725..b2a8253cef 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RowVersionFreeList.java
@@ -43,6 +43,8 @@ public class RowVersionFreeList extends
AbstractFreeList<RowVersion> {
private final UpdateTimestampHandler updateTimestampHandler = new
UpdateTimestampHandler();
+ private final UpdateNextLinkHandler updateNextLinkHandler = new
UpdateNextLinkHandler();
+
/**
* Constructor.
*
@@ -109,6 +111,17 @@ public class RowVersionFreeList extends
AbstractFreeList<RowVersion> {
updateDataRow(link, updateTimestampHandler, newTimestamp, statHolder);
}
+ /**
+ * Updates row version's next link.
+ *
+ * @param link Row version link.
+ * @param nextLink Next row version link to set.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public void updateNextLink(long link, long nextLink) throws
IgniteInternalCheckedException {
+ updateDataRow(link, updateNextLinkHandler, nextLink, statHolder);
+ }
+
/**
* Removes a row by link.
*
@@ -120,7 +133,6 @@ public class RowVersionFreeList extends
AbstractFreeList<RowVersion> {
}
private class UpdateTimestampHandler implements
PageHandler<HybridTimestamp, Object> {
- /** {@inheritDoc} */
@Override
public Object run(
int groupId,
@@ -142,6 +154,28 @@ public class RowVersionFreeList extends
AbstractFreeList<RowVersion> {
}
}
+ private class UpdateNextLinkHandler implements PageHandler<Long, Object> {
+ @Override
+ public Object run(
+ int groupId,
+ long pageId,
+ long page,
+ long pageAddr,
+ PageIo io,
+ Long nextLink,
+ int itemId,
+ IoStatisticsHolder statHolder
+ ) throws IgniteInternalCheckedException {
+ RowVersionDataIo dataIo = (RowVersionDataIo) io;
+
+ dataIo.updateNextLink(pageAddr, itemId, pageSize(), nextLink);
+
+ evictionTracker.touchPage(pageId);
+
+ return true;
+ }
+ }
+
/**
* Shortcut method for {@link #saveMetadata(IoStatisticsHolder)} with
statistics holder.
*
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
index b1b2a399e3..85466065c3 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChain.java
@@ -131,7 +131,29 @@ public class VersionChain extends VersionChainKey {
return newestCommittedLink() != NULL_LINK;
}
- /** {@inheritDoc} */
+ /**
+ * Returns {@code true} if there is a link to the next version.
+ */
+ public boolean hasNextLink() {
+ return nextLink != NULL_LINK;
+ }
+
+ /**
+ * Returns {@code true} if there is a link to the head version.
+ */
+ public boolean hasHeadLink() {
+ return headLink != NULL_LINK;
+ }
+
+ /**
+ * Creates a copy of the version chain with new next link.
+ *
+ * @param nextLink New next link.
+ */
+ public VersionChain withNextLink(long nextLink) {
+ return new VersionChain(rowId, transactionId, commitTableId,
commitPartitionId, headLink, nextLink);
+ }
+
@Override
public String toString() {
return S.toString(VersionChain.class, this);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainKey.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainKey.java
index 577ad7efbd..dce4cc5436 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainKey.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainKey.java
@@ -18,13 +18,15 @@
package org.apache.ignite.internal.storage.pagememory.mv;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
/**
* Search key for the {@link VersionChainTree}.
*/
public class VersionChainKey {
/** Row id. */
- private final RowId rowId;
+ @IgniteToStringInclude
+ protected final RowId rowId;
/**
* Constructor.
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index 1d005b4ed9..41b058143b 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -39,6 +39,7 @@ import
org.apache.ignite.internal.storage.pagememory.index.hash.PageMemoryHashIn
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
import
org.apache.ignite.internal.storage.pagememory.index.sorted.PageMemorySortedIndexStorage;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.Nullable;
@@ -71,12 +72,14 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
* @param versionChainTree Table tree for {@link VersionChain}.
* @param indexMetaTree Tree that contains SQL indexes' metadata.
* @param destructionExecutor Executor used to destruct partitions.
+ * @param gcQueue Garbage collection queue.
*/
public VolatilePageMemoryMvPartitionStorage(
VolatilePageMemoryTableStorage tableStorage,
int partitionId,
VersionChainTree versionChainTree,
IndexMetaTree indexMetaTree,
+ GcQueue gcQueue,
GradualTaskExecutor destructionExecutor
) {
super(
@@ -85,7 +88,8 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
tableStorage.dataRegion().rowVersionFreeList(),
tableStorage.dataRegion().indexColumnsFreeList(),
versionChainTree,
- indexMetaTree
+ indexMetaTree,
+ gcQueue
);
this.destructionExecutor = destructionExecutor;
@@ -214,6 +218,7 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
private void destroyStructures(boolean removeIndexDescriptors) {
startMvDataDestruction();
startIndexMetaTreeDestruction();
+ startGarbageCollectionTreeDestruction();
hashIndexes.values().forEach(indexStorage ->
indexStorage.startDestructionOn(destructionExecutor));
sortedIndexes.values().forEach(indexStorage ->
indexStorage.startDestructionOn(destructionExecutor));
@@ -276,9 +281,23 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
}
}
+ private void startGarbageCollectionTreeDestruction() {
+ try {
+ destructionExecutor.execute(
+ gcQueue.startGradualDestruction(null, false)
+ ).whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.error("Garbage collection tree destruction failed in
group={}, partition={}", ex, groupId, partitionId);
+ }
+ });
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Cannot destroy garbage collection tree
in group=" + groupId + ", partition=" + partitionId, e);
+ }
+ }
+
@Override
List<AutoCloseable> getResourcesToCloseOnCleanup() {
- return List.of(versionChainTree::close, indexMetaTree::close);
+ return List.of(versionChainTree::close, indexMetaTree::close,
gcQueue::close);
}
/**
@@ -286,16 +305,19 @@ public class VolatilePageMemoryMvPartitionStorage extends
AbstractPageMemoryMvPa
*
* @param versionChainTree Table tree for {@link VersionChain}.
* @param indexMetaTree Tree that contains SQL indexes' metadata.
+ * @param gcQueue Garbage collection queue.
* @throws StorageException If failed.
*/
public void updateDataStructures(
VersionChainTree versionChainTree,
- IndexMetaTree indexMetaTree
+ IndexMetaTree indexMetaTree,
+ GcQueue gcQueue
) {
throwExceptionIfStorageNotInCleanupOrRebalancedState(state.get(),
this::createStorageInfo);
this.versionChainTree = versionChainTree;
this.indexMetaTree = indexMetaTree;
+ this.gcQueue = gcQueue;
for (PageMemoryHashIndexStorage indexStorage : hashIndexes.values()) {
indexStorage.updateDataStructures(
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/GcQueue.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/GcQueue.java
new file mode 100644
index 0000000000..961ccde772
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/GcQueue.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv.gc;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.reuse.ReuseList;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.util.PageLockListener;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.io.GcInnerIo;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.io.GcIo;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.io.GcLeafIo;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.io.GcMetaIo;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link BplusTree} implementation for garbage collection of obsolete row
versions in version chains.
+ */
+public class GcQueue extends BplusTree<GcRowVersion, GcRowVersion> {
+ /**
+ * Constructor.
+ *
+ * @param grpId Group ID.
+ * @param grpName Group name.
+ * @param partId Partition id.
+ * @param pageMem Page memory.
+ * @param lockLsnr Page lock listener.
+ * @param globalRmvId Global remove ID.
+ * @param metaPageId Meta page ID.
+ * @param reuseList Reuse list.
+ * @param initNew {@code True} if new tree should be created.
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ public GcQueue(
+ int grpId,
+ String grpName,
+ int partId,
+ PageMemory pageMem,
+ PageLockListener lockLsnr,
+ AtomicLong globalRmvId,
+ long metaPageId,
+ @Nullable ReuseList reuseList,
+ boolean initNew
+ ) throws IgniteInternalCheckedException {
+ super(
+ "GarbageCollectionTree_" + grpId,
+ grpId,
+ grpName,
+ partId,
+ pageMem,
+ lockLsnr,
+ globalRmvId,
+ metaPageId,
+ reuseList
+ );
+
+ setIos(GcInnerIo.VERSIONS, GcLeafIo.VERSIONS, GcMetaIo.VERSIONS);
+
+ initTree(initNew);
+ }
+
+ @Override
+ protected int compare(BplusIo<GcRowVersion> io, long pageAddr, int idx,
GcRowVersion row) {
+ GcIo gcIo = (GcIo) io;
+
+ return gcIo.compare(pageAddr, idx, row);
+ }
+
+ @Override
+ public GcRowVersion getRow(BplusIo<GcRowVersion> io, long pageAddr, int
idx, Object x) {
+ GcIo gcIo = (GcIo) io;
+
+ return gcIo.getRow(pageAddr, idx, partId);
+ }
+
+ /**
+ * Adds a row version to the garbage collection queue.
+ */
+ public void add(RowId rowId, HybridTimestamp timestamp, long link) {
+ try {
+ put(new GcRowVersion(rowId, timestamp, link));
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(
+ "Error occurred while adding row version to the garbage
collection queue: [rowId={}, timestamp={}, {}]",
+ e,
+ rowId, timestamp
+ );
+ }
+ }
+
+ /**
+ * Removes a row version from the garbage collection queue.
+ *
+ * @return {@code true} if the row version was removed from the garbage
collection queue.
+ */
+ public boolean remove(RowId rowId, HybridTimestamp timestamp, long link) {
+ try {
+ return remove(new GcRowVersion(rowId, timestamp, link)) != null;
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(
+ "Error occurred while deleting row version form the
garbage collection queue: [rowId={}, timestamp={}, {}]",
+ e,
+ rowId, timestamp
+ );
+ }
+ }
+
+ /**
+ * Returns the first element from the garbage collection queue, {@code
null} if the queue is empty.
+ */
+ public @Nullable GcRowVersion getFirst() {
+ try {
+ return findFirst();
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException("Error occurred while getting the first
element from the garbage collection queue", e);
+ }
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainKey.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/GcRowVersion.java
similarity index 55%
copy from
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainKey.java
copy to
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/GcRowVersion.java
index 577ad7efbd..517ec62269 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VersionChainKey.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/GcRowVersion.java
@@ -15,30 +15,52 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.pagememory.mv;
+package org.apache.ignite.internal.storage.pagememory.mv.gc;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.storage.RowId;
/**
- * Search key for the {@link VersionChainTree}.
+ * Row version in the version chain that should be garbage collected.
*/
-public class VersionChainKey {
- /** Row id. */
+public class GcRowVersion {
private final RowId rowId;
+ private final HybridTimestamp timestamp;
+
+ private final long link;
+
/**
* Constructor.
*
- * @param rowId Search row id.
+ * @param rowId Row ID.
+ * @param timestamp Row timestamp.
+ * @param link Row version link.
*/
- public VersionChainKey(RowId rowId) {
+ public GcRowVersion(RowId rowId, HybridTimestamp timestamp, long link) {
this.rowId = rowId;
+ this.timestamp = timestamp;
+ this.link = link;
}
/**
- * Returns a row id.
+ * Returns row ID.
*/
- public RowId rowId() {
+ public RowId getRowId() {
return rowId;
}
+
+ /**
+ * Returns row timestamp.
+ */
+ public HybridTimestamp getTimestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Returns row version link.
+ */
+ public long getLink() {
+ return link;
+ }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainInnerIo.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GcInnerIo.java
similarity index 51%
copy from
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainInnerIo.java
copy to
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GcInnerIo.java
index edc32eadb2..7a4ccf284a 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainInnerIo.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GcInnerIo.java
@@ -15,51 +15,45 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.pagememory.mv.io;
+package org.apache.ignite.internal.storage.pagememory.mv.gc.io;
+
+import static
org.apache.ignite.internal.storage.pagememory.mv.MvPageTypes.T_GC_INNER_IO;
import org.apache.ignite.internal.pagememory.io.IoVersions;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.pagememory.tree.io.BplusInnerIo;
import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
-import org.apache.ignite.internal.storage.pagememory.mv.VersionChainKey;
-import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcRowVersion;
/**
- * IO routines for {@link VersionChainTree} inner pages.
- *
- * <p>Structure: link(long).
+ * IO routines for {@link GcQueue} inner pages.
*/
-public final class VersionChainInnerIo extends BplusInnerIo<VersionChainKey>
implements VersionChainIo {
- /** Page IO type. */
- public static final short T_VERSION_CHAIN_INNER_IO = 10;
-
+public class GcInnerIo extends BplusInnerIo<GcRowVersion> implements GcIo {
/** I/O versions. */
- public static final IoVersions<VersionChainInnerIo> VERSIONS = new
IoVersions<>(new VersionChainInnerIo(1));
+ public static final IoVersions<GcInnerIo> VERSIONS = new IoVersions<>(new
GcInnerIo(1));
/**
* Constructor.
*
* @param ver Page format version.
*/
- private VersionChainInnerIo(int ver) {
- super(T_VERSION_CHAIN_INNER_IO, ver, true, SIZE_IN_BYTES);
+ private GcInnerIo(int ver) {
+ super(T_GC_INNER_IO, ver, true, SIZE_IN_BYTES);
}
- /** {@inheritDoc} */
@Override
- public void store(long dstPageAddr, int dstIdx, BplusIo<VersionChainKey>
srcIo, long srcPageAddr, int srcIdx) {
- VersionChainIo.super.store(dstPageAddr, dstIdx, srcIo, srcPageAddr,
srcIdx);
+ public void store(long dstPageAddr, int dstIdx, BplusIo<GcRowVersion>
srcIo, long srcPageAddr, int srcIdx) {
+ GcIo.super.store(dstPageAddr, dstIdx, srcIo, srcPageAddr, srcIdx);
}
- /** {@inheritDoc} */
@Override
- public void storeByOffset(long pageAddr, int off, VersionChainKey row) {
- VersionChainIo.super.storeByOffset(pageAddr, off, row);
+ public void storeByOffset(long pageAddr, int off, GcRowVersion row) {
+ GcIo.super.storeByOffset(pageAddr, off, row);
}
- /** {@inheritDoc} */
@Override
- public VersionChainKey getLookupRow(BplusTree<VersionChainKey, ?> tree,
long pageAddr, int idx) {
+ public GcRowVersion getLookupRow(BplusTree<GcRowVersion, ?> tree, long
pageAddr, int idx) {
return getRow(pageAddr, idx, 0xFFFF);
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GcIo.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GcIo.java
new file mode 100644
index 0000000000..b0df0326f5
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GcIo.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv.gc.io;
+
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.HYBRID_TIMESTAMP_SIZE;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
+import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.internal.pagememory.util.PartitionlessLinks;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.pagememory.mv.HybridTimestamps;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcRowVersion;
+
+/**
+ * Interface for {@link GcQueue}-related IO.
+ *
+ * <p>Defines a following data layout:
+ * <ul>
+ * <li>Row ID (16 bytes);</li>
+ * <li>Row timestamp (12 bytes);</li>
+ * <li>Row link (6 bytes).</li>
+ * </ul>
+ */
+public interface GcIo {
+ /** Offset of rowId's most significant bits, 8 bytes. */
+ int ROW_ID_MSB_OFFSET = 0;
+
+ /** Offset of rowId's least significant bits, 8 bytes. */
+ int ROW_ID_LSB_OFFSET = ROW_ID_MSB_OFFSET + Long.BYTES;
+
+ /** Offset of row timestamp, 12 bytes. */
+ int ROW_TIMESTAMP_OFFSET = ROW_ID_LSB_OFFSET + Long.BYTES;
+
+ /** Offset of row link, 6 bytes. */
+ int ROW_LINK_OFFSET = ROW_TIMESTAMP_OFFSET + HYBRID_TIMESTAMP_SIZE;
+
+ /** Payload size in bytes. */
+ int SIZE_IN_BYTES = ROW_LINK_OFFSET + PARTITIONLESS_LINK_SIZE_BYTES;
+
+ /**
+ * Returns an offset of the element inside the page.
+ *
+ * @see BplusIo#offset(int)
+ */
+ int offset(int idx);
+
+ /**
+ * Stores a row version for garbage collection, copied from another page.
+ *
+ * @see BplusIo#store(long, int, BplusIo, long, int)
+ */
+ default void store(long dstPageAddr, int dstIdx, BplusIo<GcRowVersion>
srcIo, long srcPageAddr, int srcIdx) {
+ int dstOffset = offset(dstIdx);
+ int srcOffset = offset(srcIdx);
+
+ PageUtils.copyMemory(srcPageAddr, srcOffset, dstPageAddr, dstOffset,
SIZE_IN_BYTES);
+ }
+
+ /**
+ * Stores a row version for garbage collection chain in the page.
+ *
+ * @see BplusIo#storeByOffset(long, int, Object)
+ */
+ default void storeByOffset(long pageAddr, int off, GcRowVersion row) {
+ RowId rowId = row.getRowId();
+
+ putLong(pageAddr, off + ROW_ID_MSB_OFFSET,
rowId.mostSignificantBits());
+ putLong(pageAddr, off + ROW_ID_LSB_OFFSET,
rowId.leastSignificantBits());
+
+ HybridTimestamps.writeTimestampToMemory(pageAddr, off +
ROW_TIMESTAMP_OFFSET, row.getTimestamp());
+
+ PartitionlessLinks.writePartitionless(pageAddr + off +
ROW_LINK_OFFSET, row.getLink());
+ }
+
+ /**
+ * Compare the row version for garbage collection from the page with
passed row version, thus defining the order of element in the
+ * {@link GcQueue}.
+ *
+ * @param pageAddr Page address.
+ * @param idx Element's index.
+ * @param rowVersion Row version for garbage collection.
+ * @return Comparison result.
+ */
+ default int compare(long pageAddr, int idx, GcRowVersion rowVersion) {
+ int offset = offset(idx);
+
+ HybridTimestamp readTimestamp =
HybridTimestamps.readTimestamp(pageAddr, offset + ROW_TIMESTAMP_OFFSET);
+
+ int cmp = readTimestamp.compareTo(rowVersion.getTimestamp());
+
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ RowId rowId = rowVersion.getRowId();
+
+ cmp = Long.compare(getLong(pageAddr, offset + ROW_ID_MSB_OFFSET),
rowId.mostSignificantBits());
+
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ return Long.compare(getLong(pageAddr, offset + ROW_ID_LSB_OFFSET),
rowId.leastSignificantBits());
+ }
+
+ /**
+ * Reads a row version for garbage collection from the page.
+ *
+ * @param pageAddr Page address.
+ * @param idx Element's index.
+ * @param partitionId Partition id to enrich read partitionless links.
+ */
+ default GcRowVersion getRow(long pageAddr, int idx, int partitionId) {
+ int offset = offset(idx);
+
+ long rowIdMsb = getLong(pageAddr, offset + ROW_ID_MSB_OFFSET);
+ long rowIdLsb = getLong(pageAddr, offset + ROW_ID_LSB_OFFSET);
+
+ return new GcRowVersion(
+ new RowId(partitionId, rowIdMsb, rowIdLsb),
+ HybridTimestamps.readTimestamp(pageAddr, offset +
ROW_TIMESTAMP_OFFSET),
+ PartitionlessLinks.readPartitionless(partitionId, pageAddr,
offset + ROW_LINK_OFFSET)
+ );
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GcLeafIo.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GcLeafIo.java
new file mode 100644
index 0000000000..a742657d6b
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GcLeafIo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv.gc.io;
+
+import static
org.apache.ignite.internal.storage.pagememory.mv.MvPageTypes.T_GC_LEAF_IO;
+
+import org.apache.ignite.internal.pagememory.io.IoVersions;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
+import org.apache.ignite.internal.pagememory.tree.io.BplusIo;
+import org.apache.ignite.internal.pagememory.tree.io.BplusLeafIo;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcRowVersion;
+
+/**
+ * IO routines for {@link GcQueue} leaf pages.
+ */
+public class GcLeafIo extends BplusLeafIo<GcRowVersion> implements GcIo {
+ /** I/O versions. */
+ public static final IoVersions<GcLeafIo> VERSIONS = new IoVersions<>(new
GcLeafIo(1));
+
+ /**
+ * Constructor.
+ *
+ * @param ver Page format version.
+ */
+ private GcLeafIo(int ver) {
+ super(T_GC_LEAF_IO, ver, SIZE_IN_BYTES);
+ }
+
+ @Override
+ public void store(long dstPageAddr, int dstIdx, BplusIo<GcRowVersion>
srcIo, long srcPageAddr, int srcIdx) {
+ GcIo.super.store(dstPageAddr, dstIdx, srcIo, srcPageAddr, srcIdx);
+ }
+
+ @Override
+ public void storeByOffset(long pageAddr, int off, GcRowVersion row) {
+ GcIo.super.storeByOffset(pageAddr, off, row);
+ }
+
+ @Override
+ public GcRowVersion getLookupRow(BplusTree<GcRowVersion, ?> tree, long
pageAddr, int idx) {
+ return getRow(pageAddr, idx, getPartitionId(pageAddr));
+ }
+
+ private static int getPartitionId(long pageAddr) {
+ long pageId = getPageId(pageAddr);
+ return PageIdUtils.partitionId(pageId);
+ }
+}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainMetaIo.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GcMetaIo.java
similarity index 66%
copy from
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainMetaIo.java
copy to
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GcMetaIo.java
index 019fb7bc4b..c6a0463090 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainMetaIo.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/gc/io/GcMetaIo.java
@@ -15,28 +15,27 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.pagememory.mv.io;
+package org.apache.ignite.internal.storage.pagememory.mv.gc.io;
+
+import static
org.apache.ignite.internal.storage.pagememory.mv.MvPageTypes.T_GC_META_IO;
import org.apache.ignite.internal.pagememory.io.IoVersions;
import org.apache.ignite.internal.pagememory.tree.io.BplusMetaIo;
-import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
/**
- * IO routines for {@link VersionChainTree} meta pages.
+ * IO routines for {@link GcQueue} meta pages.
*/
-public class VersionChainMetaIo extends BplusMetaIo {
- /** Page IO type. */
- public static final short T_VERSION_CHAIN_META_IO = 9;
-
+public class GcMetaIo extends BplusMetaIo {
/** I/O versions. */
- public static final IoVersions<VersionChainMetaIo> VERSIONS = new
IoVersions<>(new VersionChainMetaIo(1));
+ public static final IoVersions<GcMetaIo> VERSIONS = new IoVersions<>(new
GcMetaIo(1));
/**
* Constructor.
*
* @param ver Page format version.
*/
- protected VersionChainMetaIo(int ver) {
- super(T_VERSION_CHAIN_META_IO, ver);
+ protected GcMetaIo(int ver) {
+ super(T_GC_META_IO, ver);
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/BlobFragmentIo.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/BlobFragmentIo.java
index 32456df999..c5d53d4c07 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/BlobFragmentIo.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/BlobFragmentIo.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.pagememory.util.PageUtils.getInt;
import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong;
import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong;
+import static
org.apache.ignite.internal.storage.pagememory.mv.MvPageTypes.T_BLOB_FRAGMENT_IO;
import org.apache.ignite.internal.pagememory.io.IoVersions;
import org.apache.ignite.internal.pagememory.io.PageIo;
@@ -39,9 +40,6 @@ import org.apache.ignite.lang.IgniteStringBuilder;
* </ul>
*/
public class BlobFragmentIo extends PageIo {
- /** Page IO type. */
- public static final short T_BLOB_FRAGMENT_IO = 13;
-
private static final int NEXT_PAGE_ID_OFF = PageIo.COMMON_HEADER_END;
private static final int FRAGMENT_BYTES_OR_TOTAL_LENGTH_OFF =
NEXT_PAGE_ID_OFF + Long.BYTES;
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
index a65c335e18..9c5d8037d7 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/RowVersionDataIo.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.internal.pagememory.util.PageUtils.putByteBuffer
import static org.apache.ignite.internal.pagememory.util.PageUtils.putInt;
import static org.apache.ignite.internal.pagememory.util.PageUtils.putShort;
import static
org.apache.ignite.internal.pagememory.util.PartitionlessLinks.writePartitionless;
+import static
org.apache.ignite.internal.storage.pagememory.mv.MvPageTypes.T_ROW_VERSION_DATA_IO;
import java.nio.ByteBuffer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -36,9 +37,6 @@ import org.jetbrains.annotations.Nullable;
* Data pages IO for {@link RowVersion}.
*/
public class RowVersionDataIo extends AbstractDataPageIo<RowVersion> {
- /** Page IO type. */
- public static final short T_ROW_VERSION_DATA_IO = 12;
-
/** I/O versions. */
public static final IoVersions<RowVersionDataIo> VERSIONS = new
IoVersions<>(new RowVersionDataIo(1));
@@ -51,7 +49,6 @@ public class RowVersionDataIo extends
AbstractDataPageIo<RowVersion> {
super(T_ROW_VERSION_DATA_IO, ver);
}
- /** {@inheritDoc} */
@Override
protected void writeRowData(long pageAddr, int dataOff, int payloadSize,
RowVersion row, boolean newRow) {
assertPageType(pageAddr);
@@ -71,7 +68,6 @@ public class RowVersionDataIo extends
AbstractDataPageIo<RowVersion> {
putByteBuffer(addr, 0, row.value());
}
- /** {@inheritDoc} */
@Override
protected void writeFragmentData(RowVersion row, ByteBuffer pageBuf, int
rowOff, int payloadSize) {
assertPageType(pageBuf);
@@ -110,7 +106,20 @@ public class RowVersionDataIo extends
AbstractDataPageIo<RowVersion> {
HybridTimestamps.writeTimestampToMemory(pageAddr, payloadOffset +
RowVersion.TIMESTAMP_OFFSET, timestamp);
}
- /** {@inheritDoc} */
+ /**
+ * Updates next link leaving the rest untouched.
+ *
+ * @param pageAddr Page address.
+ * @param itemId Item ID of the slot where row version (or its first
fragment) is stored in this page.
+ * @param pageSize Size of the page.
+ * @param nextLink Next link to store.
+ */
+ public void updateNextLink(long pageAddr, int itemId, int pageSize, long
nextLink) {
+ int payloadOffset = getPayloadOffset(pageAddr, itemId, pageSize, 0);
+
+ writePartitionless(pageAddr + payloadOffset +
RowVersion.NEXT_LINK_OFFSET, nextLink);
+ }
+
@Override
protected void printPage(long addr, int pageSize, IgniteStringBuilder sb) {
sb.app("RowVersionDataIo [\n");
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainInnerIo.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainInnerIo.java
index edc32eadb2..b40f3453d0 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainInnerIo.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainInnerIo.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.storage.pagememory.mv.io;
+import static
org.apache.ignite.internal.storage.pagememory.mv.MvPageTypes.T_VERSION_CHAIN_INNER_IO;
+
import org.apache.ignite.internal.pagememory.io.IoVersions;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.pagememory.tree.io.BplusInnerIo;
@@ -30,9 +32,6 @@ import
org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
* <p>Structure: link(long).
*/
public final class VersionChainInnerIo extends BplusInnerIo<VersionChainKey>
implements VersionChainIo {
- /** Page IO type. */
- public static final short T_VERSION_CHAIN_INNER_IO = 10;
-
/** I/O versions. */
public static final IoVersions<VersionChainInnerIo> VERSIONS = new
IoVersions<>(new VersionChainInnerIo(1));
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java
index d21972b67a..1d6bbfecce 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainLeafIo.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.storage.pagememory.mv.io;
+import static
org.apache.ignite.internal.storage.pagememory.mv.MvPageTypes.T_VERSION_CHAIN_LEAF_IO;
+
import java.util.function.Consumer;
import org.apache.ignite.internal.pagememory.io.IoVersions;
import org.apache.ignite.internal.pagememory.tree.BplusTree;
@@ -33,9 +35,6 @@ import
org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
* <p>Structure: link(long).
*/
public final class VersionChainLeafIo extends BplusLeafIo<VersionChainKey>
implements VersionChainIo {
- /** Page IO type. */
- public static final short T_VERSION_CHAIN_LEAF_IO = 11;
-
/** I/O versions. */
public static final IoVersions<VersionChainLeafIo> VERSIONS = new
IoVersions<>(new VersionChainLeafIo(1));
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainMetaIo.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainMetaIo.java
index 019fb7bc4b..83a2d9ddd7 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainMetaIo.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/io/VersionChainMetaIo.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.storage.pagememory.mv.io;
+import static
org.apache.ignite.internal.storage.pagememory.mv.MvPageTypes.T_VERSION_CHAIN_META_IO;
+
import org.apache.ignite.internal.pagememory.io.IoVersions;
import org.apache.ignite.internal.pagememory.tree.io.BplusMetaIo;
import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
@@ -25,9 +27,6 @@ import
org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
* IO routines for {@link VersionChainTree} meta pages.
*/
public class VersionChainMetaIo extends BplusMetaIo {
- /** Page IO type. */
- public static final short T_VERSION_CHAIN_META_IO = 9;
-
/** I/O versions. */
public static final IoVersions<VersionChainMetaIo> VERSIONS = new
IoVersions<>(new VersionChainMetaIo(1));
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/BlobStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/BlobStorageTest.java
index a1ee663501..7532c36178 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/BlobStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/BlobStorageTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.storage.pagememory.mv;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.ignite.internal.storage.pagememory.mv.MvPageTypes.T_BLOB_FRAGMENT_IO;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
@@ -70,7 +71,7 @@ class BlobStorageTest {
void createStorage() {
PageIoRegistry pageIoRegistry = new PageIoRegistry() {
{
- ioVersions[BlobFragmentIo.T_BLOB_FRAGMENT_IO] =
BlobFragmentIo.VERSIONS;
+ ioVersions[T_BLOB_FRAGMENT_IO] = BlobFragmentIo.VERSIONS;
}
};
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
index bdc0544e0d..048a2bc569 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
@@ -26,7 +26,6 @@ import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorage
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(WorkDirectoryExtension.class)
@@ -45,40 +44,4 @@ class PersistentPageMemoryMvPartitionStorageConcurrencyTest
extends AbstractMvPa
return new PersistentPageMemoryStorageEngine("test", engineConfig,
ioRegistry, workDir, null);
}
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testRegularGcAndRead(AddAndCommit addAndCommit) {
- super.testRegularGcAndRead(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndRead(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndAddWrite(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndCommitWrite(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndAbortWrite(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testConcurrentGc(AddAndCommit addAndCommit) {
- super.testConcurrentGc(addAndCommit);
- }
}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java
similarity index 58%
copy from
modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
copy to
modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java
index bdc0544e0d..5450ba9ba2 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageGcTest.java
@@ -20,17 +20,16 @@ package org.apache.ignite.internal.storage.pagememory.mv;
import java.nio.file.Path;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
-import
org.apache.ignite.internal.storage.AbstractMvPartitionStorageConcurrencyTest;
+import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest;
import org.apache.ignite.internal.storage.engine.StorageEngine;
import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(WorkDirectoryExtension.class)
-class PersistentPageMemoryMvPartitionStorageConcurrencyTest extends
AbstractMvPartitionStorageConcurrencyTest {
+class PersistentPageMemoryMvPartitionStorageGcTest extends
AbstractMvPartitionStorageGcTest {
@InjectConfiguration("mock.checkpoint.checkpointDelayMillis = 0")
private PersistentPageMemoryStorageEngineConfiguration engineConfig;
@@ -45,40 +44,4 @@ class PersistentPageMemoryMvPartitionStorageConcurrencyTest
extends AbstractMvPa
return new PersistentPageMemoryStorageEngine("test", engineConfig,
ioRegistry, workDir, null);
}
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testRegularGcAndRead(AddAndCommit addAndCommit) {
- super.testRegularGcAndRead(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndRead(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndAddWrite(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndCommitWrite(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndAbortWrite(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testConcurrentGc(AddAndCommit addAndCommit) {
- super.testConcurrentGc(addAndCommit);
- }
}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
index 4645851483..f213ba81f2 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
@@ -24,11 +24,7 @@ import
org.apache.ignite.internal.storage.AbstractMvPartitionStorageConcurrencyT
import org.apache.ignite.internal.storage.engine.StorageEngine;
import
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.extension.ExtendWith;
-@ExtendWith(WorkDirectoryExtension.class)
class VolatilePageMemoryMvPartitionStorageConcurrencyTest extends
AbstractMvPartitionStorageConcurrencyTest {
@InjectConfiguration
private VolatilePageMemoryStorageEngineConfiguration engineConfig;
@@ -41,40 +37,4 @@ class VolatilePageMemoryMvPartitionStorageConcurrencyTest
extends AbstractMvPart
return new VolatilePageMemoryStorageEngine("node", engineConfig,
ioRegistry, PageEvictionTrackerNoOp.INSTANCE);
}
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testRegularGcAndRead(AddAndCommit addAndCommit) {
- super.testRegularGcAndRead(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndRead(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndAddWrite(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndCommitWrite(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndAbortWrite(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testConcurrentGc(AddAndCommit addAndCommit) {
- super.testConcurrentGc(addAndCommit);
- }
}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageGcTest.java
similarity index 52%
copy from
modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
copy to
modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageGcTest.java
index 4645851483..918f7d50a8 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageGcTest.java
@@ -20,16 +20,12 @@ package org.apache.ignite.internal.storage.pagememory.mv;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
-import
org.apache.ignite.internal.storage.AbstractMvPartitionStorageConcurrencyTest;
+import org.apache.ignite.internal.storage.AbstractMvPartitionStorageGcTest;
import org.apache.ignite.internal.storage.engine.StorageEngine;
import
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.extension.ExtendWith;
-@ExtendWith(WorkDirectoryExtension.class)
-class VolatilePageMemoryMvPartitionStorageConcurrencyTest extends
AbstractMvPartitionStorageConcurrencyTest {
+class VolatilePageMemoryMvPartitionStorageGcTest extends
AbstractMvPartitionStorageGcTest {
@InjectConfiguration
private VolatilePageMemoryStorageEngineConfiguration engineConfig;
@@ -41,40 +37,4 @@ class VolatilePageMemoryMvPartitionStorageConcurrencyTest
extends AbstractMvPart
return new VolatilePageMemoryStorageEngine("node", engineConfig,
ioRegistry, PageEvictionTrackerNoOp.INSTANCE);
}
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testRegularGcAndRead(AddAndCommit addAndCommit) {
- super.testRegularGcAndRead(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndRead(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndAddWrite(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndCommitWrite(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
- super.testTombstoneGcAndAbortWrite(addAndCommit);
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
- @Override
- public void testConcurrentGc(AddAndCommit addAndCommit) {
- super.testConcurrentGc(addAndCommit);
- }
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
index 4f21782584..2fd34ea654 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -202,6 +202,7 @@ class GarbageCollector {
// Find the row that should be garbage collected.
ByteBuffer dataKey = getRowForGcKey(it, gcElementRowId);
+ // TODO: IGNITE-18843 Should try to get the RowVersion again
if (dataKey == null) {
// No row for GC.
return null;