This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ba9c1f88d1 IGNITE-20419 Fix partition meta loss after idle cluster
restart. (#2594)
ba9c1f88d1 is described below
commit ba9c1f88d13a79d069e1ab6798b8932e8367a01d
Author: Ivan Bessonov <[email protected]>
AuthorDate: Mon Sep 18 13:10:08 2023 +0300
IGNITE-20419 Fix partition meta loss after idle cluster restart. (#2594)
---
.../persistence/PersistentPageMemory.java | 1 +
.../persistence/checkpoint/CheckpointManager.java | 7 ++
.../checkpoint/CheckpointPagesWriter.java | 5 +
.../persistence/checkpoint/CheckpointWorkflow.java | 45 ++++++++
.../persistence/checkpoint/Checkpointer.java | 8 ++
.../internal/pagememory/util/PageIdUtils.java | 6 +-
.../checkpoint/CheckpointWorkflowTest.java | 90 +++++++++++++++-
.../internal/storage/engine/StorageEngine.java | 5 +
.../storage/AbstractMvTableStorageTest.java | 19 ++--
.../storage/engine/AbstractStorageEngineTest.java | 119 +++++++++++++++++++++
.../internal/storage/impl/TestStorageEngine.java | 5 +
.../PersistentPageMemoryStorageEngine.java | 5 +
.../VolatilePageMemoryStorageEngine.java | 5 +
.../mv/PersistentPageMemoryMvPartitionStorage.java | 58 ++++++----
.../PersistentPageMemoryStorageEngineTest.java | 58 ++++++++++
.../storage/rocksdb/RocksDbStorageEngine.java | 5 +
.../rocksdb/engine/RocksDbStorageEngineTest.java | 51 +++++++++
17 files changed, 456 insertions(+), 36 deletions(-)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
index 4f11ab1454..e160645826 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java
@@ -1230,6 +1230,7 @@ public class PersistentPageMemory implements PageMemory {
if (dirty) {
assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
+ assert pageIndex(pageId.pageId()) != 0 : "Partition meta should
only be updated via the instance of PartitionMeta.";
if (!wasDirty || forceAdd) {
Segment seg = segment(pageId.groupId(), pageId.pageId());
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index f2dac1a081..74bfbad288 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -236,6 +236,13 @@ public class CheckpointManager {
return checkpointer.lastCheckpointProgress();
}
+ /**
+ * Marks partition as dirty, forcing partition's meta-page to be written
on disk during next checkpoint.
+ */
+ public void markPartitionAsDirty(DataRegion<?> dataRegion, int groupId,
int partitionId) {
+ checkpointer.markPartitionAsDirty(dataRegion, groupId, partitionId);
+ }
+
/**
* Returns {@link true} if it is safe for all {@link DataRegion data
regions} to update their {@link PageMemory}.
*
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index f133eb4aa0..093cdba516 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -229,6 +229,11 @@ public class CheckpointPagesWriter implements Runnable {
pm -> createPageStoreWriter(pm, pageIdsToRetry)
);
+ if (fullId.pageIdx() == 0) {
+ // Skip meta-pages, they are written by
"writePartitionMeta".
+ continue;
+ }
+
// Should also be done for partitions that will be destroyed
to remove their pages from the data region.
pageMemory.checkpointWritePage(fullId, tmpWriteBuf,
pageStoreWriter, tracker);
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
index d5f0a71afc..129532caf5 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+import static java.util.concurrent.ConcurrentHashMap.newKeySet;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
+import static
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.partitionMetaPageId;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.DIRTY_PAGE_COMPARATOR;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_RUNNER_THREAD_PREFIX;
@@ -36,7 +38,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
@@ -48,8 +54,10 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.DataRegion;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.jetbrains.annotations.Nullable;
@@ -97,6 +105,13 @@ class CheckpointWorkflow {
*/
private final @Nullable ThreadPoolExecutor callbackListenerThreadPool;
+ /**
+ * Contains meta-page IDs for all partitions, that were explicitly marked
dirty by {@link #markPartitionAsDirty(DataRegion, int, int)}.
+ * Not required to be volatile, read/write is protected by a {@link
#checkpointReadWriteLock}. Publication of the initial value should
+ * be guaranteed by external user. {@link CheckpointManager}, in
particular.
+ */
+ private Map<DataRegion<?>, Set<FullPageId>> dirtyPartitionsMap = new
ConcurrentHashMap<>();
+
/**
* Constructor.
*
@@ -161,6 +176,15 @@ class CheckpointWorkflow {
}
}
+ /**
+ * Marks partition as dirty, forcing partition's meta-page to be written
on disk during next checkpoint.
+ */
+ public void markPartitionAsDirty(DataRegion<?> dataRegion, int groupId,
int partitionId) {
+ Set<FullPageId> dirtyMetaPageIds =
dirtyPartitionsMap.computeIfAbsent(dataRegion, unused -> newKeySet());
+
+ dirtyMetaPageIds.add(new FullPageId(partitionMetaPageId(partitionId),
groupId));
+ }
+
/**
* First stage of checkpoint which collects demanded information (dirty
pages mostly).
*
@@ -343,14 +367,35 @@ class CheckpointWorkflow {
Collection<? extends DataRegion<PersistentPageMemory>> dataRegions,
CompletableFuture<?> allowToReplace
) {
+ Map<DataRegion<?>, Set<FullPageId>> dirtyPartitionsMap =
this.dirtyPartitionsMap;
+
+ this.dirtyPartitionsMap = new ConcurrentHashMap<>();
+
Collection<DataRegionDirtyPages<Collection<FullPageId>>>
dataRegionsDirtyPages = new ArrayList<>(dataRegions.size());
+ // First, we iterate all regions that have dirty pages.
for (DataRegion<PersistentPageMemory> dataRegion : dataRegions) {
Collection<FullPageId> dirtyPages =
dataRegion.pageMemory().beginCheckpoint(allowToReplace);
+ Set<FullPageId> dirtyMetaPageIds =
dirtyPartitionsMap.remove(dataRegion);
+
+ if (dirtyMetaPageIds != null) {
+ // Merge these two collections. There should be no
intersections.
+ dirtyPages = CollectionUtils.concat(dirtyMetaPageIds,
dirtyPages);
+ }
+
dataRegionsDirtyPages.add(new
DataRegionDirtyPages<>(dataRegion.pageMemory(), dirtyPages));
}
+ // Then we iterate regions that don't have dirty pages, but somehow
have dirty partitions.
+ for (Entry<DataRegion<?>, Set<FullPageId>> entry :
dirtyPartitionsMap.entrySet()) {
+ PageMemory pageMemory = entry.getKey().pageMemory();
+
+ assert pageMemory instanceof PersistentPageMemory;
+
+ dataRegionsDirtyPages.add(new
DataRegionDirtyPages<>((PersistentPageMemory) pageMemory, entry.getValue()));
+ }
+
return new DataRegionsDirtyPages(dataRegionsDirtyPages);
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index fe750b6271..786b29aee8 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -45,6 +45,7 @@ import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.DataRegion;
import org.apache.ignite.internal.pagememory.FullPageId;
import
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
import
org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
@@ -263,6 +264,13 @@ public class Checkpointer extends IgniteWorker {
return current;
}
+ /**
+ * Marks partition as dirty, forcing partition's meta-page to be written
on disk during next checkpoint.
+ */
+ void markPartitionAsDirty(DataRegion<?> dataRegion, int groupId, int
partitionId) {
+ checkpointWorkflow.markPartitionAsDirty(dataRegion, groupId,
partitionId);
+ }
+
/**
* Executes a checkpoint.
*
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
index 6e290a0226..20338302fb 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageIdUtils.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.pagememory.util;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_AUX;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.PageIdAllocator;
import org.apache.ignite.internal.util.HexStringUtils;
+import org.intellij.lang.annotations.MagicConstant;
/**
* Utility class for page ID parts manipulation.
@@ -115,11 +117,11 @@ public final class PageIdUtils {
* Creates page ID from its components.
*
* @param partitionId Partition ID.
- * @param flag Flag: {@link PageIdAllocator#FLAG_DATA} of {@link
PageIdAllocator#FLAG_AUX}.
+ * @param flag Flag: {@link PageIdAllocator#FLAG_DATA} or {@link
PageIdAllocator#FLAG_AUX}.
* @param pageIdx Page index, monotonically growing number within each
partition.
* @return Page ID constructed from the given pageIdx and partition ID,
see {@link FullPageId}.
*/
- public static long pageId(int partitionId, byte flag, int pageIdx) {
+ public static long pageId(int partitionId, @MagicConstant(intValues =
{FLAG_DATA, FLAG_AUX}) byte flag, int pageIdx) {
long pageId = flag & FLAG_MASK;
pageId = (pageId << PART_ID_SIZE) | (partitionId & PART_ID_MASK);
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
index 0a648dd03a..b9a176652b 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
@@ -20,6 +20,8 @@ package
org.apache.ignite.internal.pagememory.persistence.checkpoint;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static
org.apache.ignite.internal.pagememory.persistence.PartitionMeta.partitionMetaPageId;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.EMPTY;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
@@ -32,12 +34,14 @@ import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.Check
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.BEFORE_CHECKPOINT_BEGIN;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_CHECKPOINT_BEGIN;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_MARK_CHECKPOINT_BEGIN;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -67,7 +71,6 @@ import org.apache.ignite.internal.pagememory.DataRegion;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
-import org.apache.ignite.internal.pagememory.util.PageIdUtils;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.jetbrains.annotations.Nullable;
@@ -496,6 +499,89 @@ public class CheckpointWorkflowTest extends
BaseIgniteAbstractTest {
);
}
+ /**
+ * Tests that dirty partition with no dirty pages will be checkpointed.
+ */
+ @Test
+ void testDirtyPartitionWithoutDirtyPages() throws Exception {
+ PersistentPageMemory pageMemory = mock(PersistentPageMemory.class);
+
+ DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory;
+
+ workflow = new CheckpointWorkflow(
+ "test",
+ newReadWriteLock(log),
+ List.of(dataRegion),
+ 1
+ );
+
+ workflow.start();
+
+ int groupId = 10;
+ int partitionId = 20;
+
+ FullPageId metaPageId = new
FullPageId(partitionMetaPageId(partitionId), groupId);
+ workflow.markPartitionAsDirty(dataRegion, groupId, partitionId);
+
+ Checkpoint checkpoint = workflow.markCheckpointBegin(
+ coarseCurrentTimeMillis(),
+ mock(CheckpointProgressImpl.class),
+ mock(CheckpointMetricsTracker.class),
+ () -> {},
+ () -> {}
+ );
+
+ assertEquals(1, checkpoint.dirtyPagesSize);
+
+ CheckpointDirtyPagesView dirtyPagesView =
checkpoint.dirtyPages.nextPartitionView(null);
+
+ assertNotNull(dirtyPagesView);
+ assertThat(toListDirtyPageIds(dirtyPagesView),
is(List.of(metaPageId)));
+ }
+
+ /**
+ * Tests that dirty partition with dirty pages will be checkpointed.
+ */
+ @Test
+ void testDirtyPartitionWithDirtyPages() throws Exception {
+ PersistentPageMemory pageMemory = mock(PersistentPageMemory.class);
+
+ DataRegion<PersistentPageMemory> dataRegion = () -> pageMemory;
+
+ workflow = new CheckpointWorkflow(
+ "test",
+ newReadWriteLock(log),
+ List.of(dataRegion),
+ 1
+ );
+
+ workflow.start();
+
+ int groupId = 10;
+ int partitionId = 20;
+
+ FullPageId metaPageId = new
FullPageId(partitionMetaPageId(partitionId), groupId);
+ FullPageId dataPageId = new FullPageId(pageId(partitionId, FLAG_DATA,
1), groupId);
+
+ workflow.markPartitionAsDirty(dataRegion, groupId, partitionId);
+
when(pageMemory.beginCheckpoint(any())).thenReturn(List.of(dataPageId));
+
+ Checkpoint checkpoint = workflow.markCheckpointBegin(
+ coarseCurrentTimeMillis(),
+ mock(CheckpointProgressImpl.class),
+ mock(CheckpointMetricsTracker.class),
+ () -> {},
+ () -> {}
+ );
+
+ assertEquals(2, checkpoint.dirtyPagesSize);
+
+ CheckpointDirtyPagesView dirtyPagesView =
checkpoint.dirtyPages.nextPartitionView(null);
+
+ assertNotNull(dirtyPagesView);
+ assertThat(toListDirtyPageIds(dirtyPagesView), is(List.of(metaPageId,
dataPageId)));
+ }
+
@Test
void testAwaitPendingTasksOfListenerCallback() {
workflow = new CheckpointWorkflow(
@@ -592,7 +678,7 @@ public class CheckpointWorkflowTest extends
BaseIgniteAbstractTest {
}
private static FullPageId of(int grpId, int partId, int pageIdx) {
- return new FullPageId(PageIdUtils.pageId(partId, (byte) 0, pageIdx),
grpId);
+ return new FullPageId(pageId(partId, (byte) 0, pageIdx), grpId);
}
/**
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
index a1e4252f24..908d62a9c1 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
@@ -43,6 +43,11 @@ public interface StorageEngine {
*/
void stop() throws StorageException;
+ /**
+ * Whether the data is lost upon engine restart or not.
+ */
+ boolean isVolatile();
+
/**
* Creates new table storage.
*
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 765c4f7e9f..3728e9f41d 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -434,7 +434,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage,
rowsOnRebalance);
- checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS,
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+ checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS,
REBALANCE_IN_PROGRESS);
assertNull(mvPartitionStorage.committedGroupConfiguration());
// Let's finish rebalancing.
@@ -459,7 +459,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
checkForMissingRows(mvPartitionStorage, hashIndexStorage,
sortedIndexStorage, rowsBeforeRebalanceStart);
checkForPresenceRows(mvPartitionStorage, hashIndexStorage,
sortedIndexStorage, rowsOnRebalance);
- checkLastApplied(mvPartitionStorage, 10, 10, 20);
+ checkLastApplied(mvPartitionStorage, 10, 20);
checkRaftGroupConfigs(raftGroupConfig,
mvPartitionStorage.committedGroupConfiguration());
}
@@ -493,7 +493,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage,
rowsOnRebalance);
- checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS,
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+ checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS,
REBALANCE_IN_PROGRESS);
// Let's abort rebalancing.
@@ -506,7 +506,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
checkForMissingRows(mvPartitionStorage, hashIndexStorage,
sortedIndexStorage, rowsBeforeRebalanceStart);
checkForMissingRows(mvPartitionStorage, hashIndexStorage,
sortedIndexStorage, rowsOnRebalance);
- checkLastApplied(mvPartitionStorage, 0, 0, 0);
+ checkLastApplied(mvPartitionStorage, 0, 0);
assertNull(mvPartitionStorage.committedGroupConfiguration());
}
@@ -609,11 +609,11 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
// Let's check the repositories: they should be empty.
checkForMissingRows(mvPartitionStorage, hashIndexStorage,
sortedIndexStorage, rows);
- checkLastApplied(mvPartitionStorage, 0, 0, 0);
+ checkLastApplied(mvPartitionStorage, 0, 0);
} else {
checkForPresenceRows(mvPartitionStorage, hashIndexStorage,
sortedIndexStorage, rows);
- checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS,
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+ checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS,
REBALANCE_IN_PROGRESS);
}
}
@@ -631,7 +631,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
// Let's check the cleanup for an empty partition.
assertThat(tableStorage.clearPartition(PARTITION_ID),
willCompleteSuccessfully());
- checkLastApplied(mvPartitionStorage, 0, 0, 0);
+ checkLastApplied(mvPartitionStorage, 0, 0);
assertNull(mvPartitionStorage.committedGroupConfiguration());
// Let's fill the storages and clean them.
@@ -655,7 +655,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
// Let's clear the storages and check them out.
assertThat(tableStorage.clearPartition(PARTITION_ID),
willCompleteSuccessfully());
- checkLastApplied(mvPartitionStorage, 0, 0, 0);
+ checkLastApplied(mvPartitionStorage, 0, 0);
assertNull(mvPartitionStorage.committedGroupConfiguration());
checkForMissingRows(mvPartitionStorage, hashIndexStorage,
sortedIndexStorage, rows);
@@ -912,7 +912,7 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
}
private void
checkMvPartitionStorageMethodsAfterStartRebalance(MvPartitionStorage storage) {
- checkLastApplied(storage, REBALANCE_IN_PROGRESS,
REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
+ checkLastApplied(storage, REBALANCE_IN_PROGRESS,
REBALANCE_IN_PROGRESS);
assertNull(storage.committedGroupConfiguration());
@@ -1056,7 +1056,6 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
private static void checkLastApplied(
MvPartitionStorage storage,
long expLastAppliedIndex,
- long expPersistentIndex,
long expLastAppliedTerm
) {
assertEquals(expLastAppliedIndex, storage.lastAppliedIndex());
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
new file mode 100644
index 0000000000..ce4249927e
--- /dev/null
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/engine/AbstractStorageEngineTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.engine;
+
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_DATA_REGION;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assumptions.assumeFalse;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.storage.AbstractMvTableStorageTest;
+import org.apache.ignite.internal.storage.BaseMvStoragesTest;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests basic functionality of storage engines. Allows for more complex
scenarios than {@link AbstractMvTableStorageTest}, because it
+ * doesn't limit the usage of the engine with a single table.
+ */
+public abstract class AbstractStorageEngineTest extends BaseMvStoragesTest {
+ /** Engine instance. */
+ private StorageEngine storageEngine;
+
+ @BeforeEach
+ void createEngineBeforeTest() {
+ storageEngine = createEngine();
+
+ storageEngine.start();
+ }
+
+ @AfterEach
+ void stopEngineAfterTest() {
+ if (storageEngine != null) {
+ storageEngine.stop();
+ }
+ }
+
+ /**
+ * Creates a new storage engine instance. For persistent engines, the
instances within a single test method should point to the same
+ * directory.
+ */
+ protected abstract StorageEngine createEngine();
+
+ /**
+ * Tests that explicitly flushed data remains persistent on the device,
when the engine is restarted.
+ */
+ @Test
+ void testRestartAfterFlush() throws Exception {
+ assumeFalse(storageEngine.isVolatile());
+
+ StorageTableDescriptor tableDescriptor = new StorageTableDescriptor(1,
1, DEFAULT_DATA_REGION);
+ StorageIndexDescriptorSupplier indexSupplier =
mock(StorageIndexDescriptorSupplier.class);
+
+ MvTableStorage mvTableStorage =
storageEngine.createMvTable(tableDescriptor, indexSupplier);
+
+ mvTableStorage.start();
+ try (AutoCloseable ignored0 = mvTableStorage::stop) {
+ CompletableFuture<MvPartitionStorage> mvPartitionStorageFuture =
mvTableStorage.createMvPartition(0);
+
+ assertThat(mvPartitionStorageFuture, willCompleteSuccessfully());
+ MvPartitionStorage mvPartitionStorage =
mvPartitionStorageFuture.join();
+
+ try (AutoCloseable ignored1 = mvTableStorage::stop) {
+ // Flush. Persist the table itself, not the update.
+ assertThat(mvPartitionStorage.flush(),
willCompleteSuccessfully());
+
+ mvPartitionStorage.runConsistently(locker -> {
+ // Update of basic storage data.
+ mvPartitionStorage.lastApplied(10, 20);
+
+ return null;
+ });
+
+ // Flush.
+ assertThat(mvPartitionStorage.flush(),
willCompleteSuccessfully());
+ }
+ }
+
+ // Restart.
+ stopEngineAfterTest();
+ createEngineBeforeTest();
+
+ mvTableStorage = storageEngine.createMvTable(tableDescriptor,
indexSupplier);
+
+ mvTableStorage.start();
+ try (AutoCloseable ignored0 = mvTableStorage::close) {
+ CompletableFuture<MvPartitionStorage> mvPartitionStorageFuture =
mvTableStorage.createMvPartition(0);
+
+ assertThat(mvPartitionStorageFuture, willCompleteSuccessfully());
+ MvPartitionStorage mvPartitionStorage =
mvPartitionStorageFuture.join();
+
+ try (AutoCloseable ignored1 = mvTableStorage::stop) {
+ // Check that data has been persisted.
+ assertEquals(10, mvPartitionStorage.lastAppliedIndex());
+ assertEquals(20, mvPartitionStorage.lastAppliedTerm());
+ }
+ }
+ }
+}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
index dc94832078..c983c8bbf3 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
@@ -47,6 +47,11 @@ public class TestStorageEngine implements StorageEngine {
// No-op.
}
+ @Override
+ public boolean isVolatile() {
+ return true;
+ }
+
@Override
public TestMvTableStorage createMvTable(
StorageTableDescriptor tableDescriptor,
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index 48f644bef2..43c19d2f27 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -184,6 +184,11 @@ public class PersistentPageMemoryStorageEngine implements
StorageEngine {
}
}
+ @Override
+ public boolean isVolatile() {
+ return false;
+ }
+
@Override
public PersistentPageMemoryTableStorage createMvTable(
StorageTableDescriptor tableDescriptor,
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
index 98d13fb97b..0cd413e343 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
@@ -125,6 +125,11 @@ public class VolatilePageMemoryStorageEngine implements
StorageEngine {
}
}
+ @Override
+ public boolean isVolatile() {
+ return true;
+ }
+
@Override
public VolatilePageMemoryTableStorage createMvTable(
StorageTableDescriptor tableDescriptor,
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index a59e59cb73..a34bb3060d 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -212,13 +212,30 @@ public class PersistentPageMemoryMvPartitionStorage
extends AbstractPageMemoryMv
}
private void lastAppliedBusy(long lastAppliedIndex, long lastAppliedTerm)
throws StorageException {
+ updateMeta((lastCheckpointId, meta) ->
meta.lastApplied(lastCheckpointId, lastAppliedIndex, lastAppliedTerm));
+ }
+
+ /**
+ * Closure interface for {@link #update(UUID, PartitionMeta)}.
+ */
+ @FunctionalInterface
+ private interface MetaUpdateClosure {
+ void update(UUID lastCheckpointId, PartitionMeta meta);
+ }
+
+ /**
+ * Updates partition meta. Hides all the necessary boilderplate in a
single place.
+ */
+ private void updateMeta(MetaUpdateClosure closure) {
assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
CheckpointProgress lastCheckpoint =
checkpointManager.lastCheckpointProgress();
UUID lastCheckpointId = lastCheckpoint == null ? null :
lastCheckpoint.id();
- meta.lastApplied(lastCheckpointId, lastAppliedIndex, lastAppliedTerm);
+ closure.update(lastCheckpointId, meta);
+
+ checkpointManager.markPartitionAsDirty(tableStorage.dataRegion(),
tableStorage.getTableId(), partitionId);
}
@Override
@@ -262,30 +279,27 @@ public class PersistentPageMemoryMvPartitionStorage
extends AbstractPageMemoryMv
}
private void committedGroupConfigurationBusy(byte[] groupConfigBytes) {
- assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
-
- CheckpointProgress lastCheckpoint =
checkpointManager.lastCheckpointProgress();
- UUID lastCheckpointId = lastCheckpoint == null ? null :
lastCheckpoint.id();
+ updateMeta((lastCheckpointId, meta) -> {
+ replicationProtocolGroupConfigReadWriteLock.writeLock().lock();
- replicationProtocolGroupConfigReadWriteLock.writeLock().lock();
-
- try {
- if (meta.lastReplicationProtocolGroupConfigFirstPageId() ==
BlobStorage.NO_PAGE_ID) {
- long configPageId = blobStorage.addBlob(groupConfigBytes);
+ try {
+ if (meta.lastReplicationProtocolGroupConfigFirstPageId() ==
BlobStorage.NO_PAGE_ID) {
+ long configPageId = blobStorage.addBlob(groupConfigBytes);
-
meta.lastReplicationProtocolGroupConfigFirstPageId(lastCheckpointId,
configPageId);
- } else {
-
blobStorage.updateBlob(meta.lastReplicationProtocolGroupConfigFirstPageId(),
groupConfigBytes);
+
meta.lastReplicationProtocolGroupConfigFirstPageId(lastCheckpointId,
configPageId);
+ } else {
+
blobStorage.updateBlob(meta.lastReplicationProtocolGroupConfigFirstPageId(),
groupConfigBytes);
+ }
+ } catch (IgniteInternalCheckedException e) {
+ throw new StorageException(
+ "Cannot save committed group configuration:
[tableId={}, partitionId={}]",
+ e,
+ tableStorage.getTableId(), partitionId
+ );
+ } finally {
+
replicationProtocolGroupConfigReadWriteLock.writeLock().unlock();
}
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException(
- "Cannot save committed group configuration: [tableId={},
partitionId={}]",
- e,
- tableStorage.getTableId(), partitionId
- );
- } finally {
- replicationProtocolGroupConfigReadWriteLock.writeLock().unlock();
- }
+ });
}
@Override
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
new file mode 100644
index 0000000000..6a41918159
--- /dev/null
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/engine/PersistentPageMemoryStorageEngineTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.engine;
+
+import java.nio.file.Path;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.engine.AbstractStorageEngineTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Implementation of the {@link AbstractStorageEngineTest} for the {@link
PersistentPageMemoryStorageEngine#ENGINE_NAME} engine.
+ */
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+public class PersistentPageMemoryStorageEngineTest extends
AbstractStorageEngineTest {
+ @InjectConfiguration("mock {checkpoint.checkpointDelayMillis = 0,
defaultRegion.size = 1048576}")
+ private PersistentPageMemoryStorageEngineConfiguration engineConfiguration;
+
+ @WorkDirectory
+ private Path workDir;
+
+ @Override
+ protected StorageEngine createEngine() {
+ var ioRegistry = new PageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ return new PersistentPageMemoryStorageEngine(
+ "test",
+ engineConfiguration,
+ ioRegistry,
+ workDir,
+ null
+ );
+ }
+}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index 16d30ae7bb..7aa9127437 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -167,6 +167,11 @@ public class RocksDbStorageEngine implements StorageEngine
{
IgniteUtils.shutdownAndAwaitTermination(scheduledPool, 10,
TimeUnit.SECONDS);
}
+ @Override
+ public boolean isVolatile() {
+ return false;
+ }
+
@Override
public RocksDbTableStorage createMvTable(
StorageTableDescriptor tableDescriptor,
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
new file mode 100644
index 0000000000..4faa412a08
--- /dev/null
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/engine/RocksDbStorageEngineTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.engine;
+
+import java.nio.file.Path;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.engine.AbstractStorageEngineTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Implementation of the {@link AbstractStorageEngineTest} for the {@link
RocksDbStorageEngine#ENGINE_NAME} engine.
+ */
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDbStorageEngineTest extends AbstractStorageEngineTest {
+ @InjectConfiguration("mock.flushDelayMillis = 0")
+ private RocksDbStorageEngineConfiguration engineConfiguration;
+
+ @WorkDirectory
+ private Path workDir;
+
+ @Override
+ protected StorageEngine createEngine() {
+ return new RocksDbStorageEngine(
+ "test",
+ engineConfiguration,
+ workDir
+ );
+ }
+}