This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6cf0f671f7 IGNITE-18019 GC methods in MvPartitionStorage & new tests &
old tests reorganization. (#1464)
6cf0f671f7 is described below
commit 6cf0f671f729da434a87bb96be5678dbcb8574a7
Author: Ivan Bessonov <[email protected]>
AuthorDate: Fri Dec 23 13:50:09 2022 +0300
IGNITE-18019 GC methods in MvPartitionStorage & new tests & old tests
reorganization. (#1464)
---
.../internal/testframework/IgniteTestUtils.java | 60 +++++
modules/storage-api/build.gradle | 5 +-
.../internal/storage/BinaryRowAndRowId.java} | 31 ++-
.../internal/storage/MvPartitionStorage.java | 13 +-
.../internal/storage/engine/StorageEngine.java | 5 +
... => TestMvPartitionStorageConcurrencyTest.java} | 15 +-
...Test.java => TestMvPartitionStorageGcTest.java} | 15 +-
.../storage/TestMvPartitionStorageTest.java | 10 +-
.../AbstractMvPartitionStorageConcurrencyTest.java | 241 +++++++++++++++++++++
.../storage/AbstractMvPartitionStorageGcTest.java | 126 +++++++++++
.../storage/AbstractMvPartitionStorageTest.java | 97 +--------
.../storage/AbstractMvTableStorageTest.java | 4 -
.../storage/BaseMvPartitionStorageTest.java | 180 +++++++++++++++
.../internal/storage/BaseMvStoragesTest.java | 15 ++
.../storage/impl/TestMvPartitionStorage.java | 108 ++++++++-
.../internal/storage/impl/TestStorageEngine.java | 5 +
.../PersistentPageMemoryStorageEngine.java | 5 +
.../VolatilePageMemoryStorageEngine.java | 5 +
.../mv/AbstractPageMemoryMvPartitionStorage.java | 2 +-
.../AbstractPageMemoryMvPartitionStorageTest.java | 4 +-
...ageMemoryMvPartitionStorageConcurrencyTest.java | 49 +++++
...PersistentPageMemoryMvPartitionStorageTest.java | 63 +-----
...ageMemoryMvPartitionStorageConcurrencyTest.java | 44 ++++
.../VolatilePageMemoryMvPartitionStorageTest.java | 43 +---
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 2 +-
.../storage/rocksdb/RocksDbStorageEngine.java | 5 +
.../RocksDbMvPartitionStorageConcurrencyTest.java | 44 ++++
.../rocksdb/RocksDbMvPartitionStorageTest.java | 53 +----
28 files changed, 968 insertions(+), 281 deletions(-)
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 048252264b..efbff359ca 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -31,12 +31,17 @@ import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -766,4 +771,59 @@ public final class IgniteTestUtils {
public static <T> T await(CompletionStage<T> stage) {
return await(stage, TIMEOUT_SEC, TimeUnit.SECONDS);
}
+
+ /**
+ * {@link #runRace(long, RunnableX...)} with default timeout of 10 seconds.
+ */
+ public static void runRace(RunnableX... actions) {
+ runRace(TimeUnit.SECONDS.toMillis(10), actions);
+ }
+
+ /**
+ * Runs all actions, each in a separate thread, having a {@link
CyclicBarrier} before calling {@link RunnableX#run()}.
+ * Waits for threads completion or fails with the assertion if timeout
exceeded.
+ *
+ * @throws AssertionError In case of timeout or if any of the runnables
thrown an exception.
+ */
+ public static void runRace(long timeoutMillis, RunnableX... actions) {
+ int length = actions.length;
+
+ CyclicBarrier barrier = new CyclicBarrier(length);
+
+ Set<Throwable> throwables = ConcurrentHashMap.newKeySet();
+
+ Thread[] threads = IntStream.range(0, length).mapToObj(i -> new
Thread(() -> {
+ try {
+ barrier.await();
+
+ actions[i].run();
+ } catch (Throwable e) {
+ throwables.add(e);
+ }
+ })).toArray(Thread[]::new);
+
+ Stream.of(threads).forEach(Thread::start);
+
+ try {
+ for (Thread thread : threads) {
+ thread.join(timeoutMillis);
+ }
+ } catch (InterruptedException e) {
+ for (Thread thread : threads) {
+ thread.interrupt();
+ }
+
+ fail("Race operations took too long.");
+ }
+
+ if (!throwables.isEmpty()) {
+ AssertionError assertionError = new AssertionError("One or several
threads have failed.");
+
+ for (Throwable throwable : throwables) {
+ assertionError.addSuppressed(throwable);
+ }
+
+ throw assertionError;
+ }
+ }
}
diff --git a/modules/storage-api/build.gradle b/modules/storage-api/build.gradle
index 53085b851e..8cb950c654 100644
--- a/modules/storage-api/build.gradle
+++ b/modules/storage-api/build.gradle
@@ -46,11 +46,12 @@ dependencies {
testFixturesAnnotationProcessor libs.auto.service
testFixturesImplementation project(':ignite-core')
- testFixturesImplementation(testFixtures(project(':ignite-core')))
testFixturesImplementation project(':ignite-configuration')
testFixturesImplementation project(':ignite-schema')
- testFixturesImplementation(testFixtures(project(':ignite-schema')))
testFixturesImplementation project(':ignite-api')
+ testFixturesImplementation(testFixtures(project(':ignite-core')))
+ testFixturesImplementation(testFixtures(project(':ignite-configuration')))
+ testFixturesImplementation(testFixtures(project(':ignite-schema')))
testFixturesImplementation libs.jetbrains.annotations
testFixturesImplementation libs.hamcrest.core
testFixturesImplementation libs.junit5.api
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/BinaryRowAndRowId.java
similarity index 54%
copy from
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
copy to
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/BinaryRowAndRowId.java
index 11dd05822a..2c4b10e3fa 100644
---
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/BinaryRowAndRowId.java
@@ -17,16 +17,35 @@
package org.apache.ignite.internal.storage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
/**
- * MV partition storage test implementation for {@link TestMvPartitionStorage}
class.
+ * Wrapper that holds both {@link BinaryRow} and {@link RowId}. {@link
BinaryRow} is null for tombstones.
*/
-public class TestMvPartitionStorageTest extends AbstractMvPartitionStorageTest
{
+public class BinaryRowAndRowId {
+ /** Binary row. */
+ private final @Nullable BinaryRow binaryRow;
+
+ /** Row id. */
+ private final RowId rowId;
+
/**
- * Creates new instance.
+ * Constructor.
+ *
+ * @param binaryRow Binary row.
+ * @param rowId Row id.
*/
- public TestMvPartitionStorageTest() {
- storage = new TestMvPartitionStorage(PARTITION_ID);
+ public BinaryRowAndRowId(@Nullable BinaryRow binaryRow, RowId rowId) {
+ this.binaryRow = binaryRow;
+ this.rowId = rowId;
+ }
+
+ public @Nullable BinaryRow binaryRow() {
+ return binaryRow;
+ }
+
+ public RowId rowId() {
+ return rowId;
}
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 62528dc6dc..59cca1ba64 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -177,7 +177,7 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
* @param commitTimestamp Timestamp to associate with committed value.
* @throws StorageException If failed to write data to the storage.
*/
- void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp
commitTimestamp) throws StorageException;
+ void addWriteCommitted(RowId rowId, @Nullable BinaryRow row,
HybridTimestamp commitTimestamp) throws StorageException;
/**
* Scans all versions of a single row.
@@ -209,6 +209,17 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
*/
@Nullable RowId closestRowId(RowId lowerBound) throws StorageException;
+ /**
+ * Polls the oldest row in the partition, removing it at the same time.
+ *
+ * @param lowWatermark A time threshold for the row. Rows younger then the
watermark value will not be removed.
+ * @return A pair of binary row and row id, where a timestamp of the row
is less than or equal to {@code lowWatermark}.
+ * {@code null} if there's no such value.
+ */
+ default @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ throw new UnsupportedOperationException("pollForVacuum");
+ }
+
/**
* Returns rows count belongs to current storage.
*
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
index c9b37849d5..ec0549f0cc 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
@@ -25,6 +25,11 @@ import org.apache.ignite.internal.storage.StorageException;
* General storage engine interface.
*/
public interface StorageEngine {
+ /**
+ * Returns a storage engine name.
+ */
+ String name();
+
/**
* Starts the engine.
*
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageConcurrencyTest.java
similarity index 69%
copy from
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
copy to
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageConcurrencyTest.java
index 11dd05822a..56f4b5dd66 100644
---
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageConcurrencyTest.java
@@ -17,16 +17,15 @@
package org.apache.ignite.internal.storage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.impl.TestStorageEngine;
/**
- * MV partition storage test implementation for {@link TestMvPartitionStorage}
class.
+ * Test implementation for {@link TestStorageEngine}.
*/
-public class TestMvPartitionStorageTest extends AbstractMvPartitionStorageTest
{
- /**
- * Creates new instance.
- */
- public TestMvPartitionStorageTest() {
- storage = new TestMvPartitionStorage(PARTITION_ID);
+public class TestMvPartitionStorageConcurrencyTest extends
AbstractMvPartitionStorageConcurrencyTest {
+ @Override
+ protected StorageEngine createEngine() {
+ return new TestStorageEngine();
}
}
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageGcTest.java
similarity index 69%
copy from
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
copy to
modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageGcTest.java
index 11dd05822a..469e595055 100644
---
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageGcTest.java
@@ -17,16 +17,15 @@
package org.apache.ignite.internal.storage;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.impl.TestStorageEngine;
/**
- * MV partition storage test implementation for {@link TestMvPartitionStorage}
class.
+ * Test implementation for {@link TestStorageEngine}.
*/
-public class TestMvPartitionStorageTest extends AbstractMvPartitionStorageTest
{
- /**
- * Creates new instance.
- */
- public TestMvPartitionStorageTest() {
- storage = new TestMvPartitionStorage(PARTITION_ID);
+public class TestMvPartitionStorageGcTest extends
AbstractMvPartitionStorageGcTest {
+ @Override
+ protected StorageEngine createEngine() {
+ return new TestStorageEngine();
}
}
diff --git
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
index 11dd05822a..73293ad81d 100644
---
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
+++
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/TestMvPartitionStorageTest.java
@@ -17,16 +17,16 @@
package org.apache.ignite.internal.storage;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.impl.TestStorageEngine;
/**
* MV partition storage test implementation for {@link TestMvPartitionStorage}
class.
*/
public class TestMvPartitionStorageTest extends AbstractMvPartitionStorageTest
{
- /**
- * Creates new instance.
- */
- public TestMvPartitionStorageTest() {
- storage = new TestMvPartitionStorage(PARTITION_ID);
+ @Override
+ protected StorageEngine createEngine() {
+ return new TestStorageEngine();
}
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
new file mode 100644
index 0000000000..836f1217b1
--- /dev/null
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.impl.TestStorageEngine;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Test to check for race conditions in MV partition storage.
+ */
+public abstract class AbstractMvPartitionStorageConcurrencyTest extends
BaseMvPartitionStorageTest {
+ /** To be used in a loop. {@link RepeatedTest} has a smaller failure rate
due to recreating the storage every time. */
+ private static final int REPEATS = 100;
+
+ @Test
+ void testAbortAndRead() {
+ for (int i = 0; i < REPEATS; i++) {
+ addWrite(ROW_ID, BINARY_ROW, TX_ID);
+
+ runRace(
+ () -> abortWrite(ROW_ID),
+ () -> read(ROW_ID, clock.now()),
+ () -> scanFirstEntry(clock.now())
+ );
+
+ assertNull(read(ROW_ID, clock.now()));
+ }
+ }
+
+ @Test
+ void testCommitAndRead() {
+ for (int i = 0; i < REPEATS; i++) {
+ addWrite(ROW_ID, BINARY_ROW, TX_ID);
+
+ runRace(
+ () -> commitWrite(ROW_ID, clock.now()),
+ () -> read(ROW_ID, clock.now()),
+ () -> scanFirstEntry(clock.now())
+ );
+
+ assertRowMatches(read(ROW_ID, clock.now()), BINARY_ROW);
+ }
+ }
+
+ @Test
+ void testUpdateAndRead() {
+ for (int i = 0; i < REPEATS; i++) {
+ addWrite(ROW_ID, BINARY_ROW, TX_ID);
+
+ runRace(
+ () -> addWrite(ROW_ID, BINARY_ROW2, TX_ID),
+ () -> read(ROW_ID, clock.now()),
+ () -> scanFirstEntry(clock.now())
+ );
+
+ assertRowMatches(read(ROW_ID, clock.now()), BINARY_ROW2);
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddAndCommit.class)
+ void testRegularGcAndRead(AddAndCommit addAndCommit) {
+ //TODO https://issues.apache.org/jira/browse/IGNITE-18020
+ assumeTrue(engine instanceof TestStorageEngine);
+
+ for (int i = 0; i < REPEATS; i++) {
+ HybridTimestamp firstCommitTs = addAndCommit(BINARY_ROW);
+
+ addAndCommit.perform(this, BINARY_ROW2);
+
+ runRace(
+ () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
+ () -> read(ROW_ID, firstCommitTs),
+ () -> scanFirstEntry(firstCommitTs)
+ );
+
+ assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
+
+ cleanup();
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddAndCommit.class)
+ void testTombstoneGcAndRead(AddAndCommit addAndCommit) {
+ //TODO https://issues.apache.org/jira/browse/IGNITE-18020
+ assumeTrue(engine instanceof TestStorageEngine);
+
+ for (int i = 0; i < REPEATS; i++) {
+ HybridTimestamp firstCommitTs = addAndCommit.perform(this,
BINARY_ROW);
+
+ addAndCommit.perform(this, null);
+
+ runRace(
+ () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
+ () -> read(ROW_ID, firstCommitTs),
+ () -> scanFirstEntry(firstCommitTs)
+ );
+
+ assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddAndCommit.class)
+ void testTombstoneGcAndAddWrite(AddAndCommit addAndCommit) {
+ //TODO https://issues.apache.org/jira/browse/IGNITE-18020
+ assumeTrue(engine instanceof TestStorageEngine);
+
+ for (int i = 0; i < REPEATS; i++) {
+ addAndCommit.perform(this, BINARY_ROW);
+
+ addAndCommit.perform(this, null);
+
+ runRace(
+ () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
+ () -> addWrite(ROW_ID, BINARY_ROW2, TX_ID)
+ );
+
+ assertRowMatches(read(ROW_ID, HybridTimestamp.MAX_VALUE),
BINARY_ROW2);
+
+ abortWrite(ROW_ID);
+
+ assertNull(storage.closestRowId(ROW_ID));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddAndCommit.class)
+ void testTombstoneGcAndCommitWrite(AddAndCommit addAndCommit) {
+ //TODO https://issues.apache.org/jira/browse/IGNITE-18020
+ assumeTrue(engine instanceof TestStorageEngine);
+
+ for (int i = 0; i < REPEATS; i++) {
+ addAndCommit.perform(this, BINARY_ROW);
+
+ addAndCommit.perform(this, null);
+
+ addWrite(ROW_ID, BINARY_ROW2, TX_ID);
+
+ runRace(
+ () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
+ () -> commitWrite(ROW_ID, clock.now())
+ );
+
+ assertRowMatches(read(ROW_ID, HybridTimestamp.MAX_VALUE),
BINARY_ROW2);
+
+ assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
+
+ cleanup();
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddAndCommit.class)
+ void testTombstoneGcAndAbortWrite(AddAndCommit addAndCommit) {
+ //TODO https://issues.apache.org/jira/browse/IGNITE-18020
+ assumeTrue(engine instanceof TestStorageEngine);
+
+ for (int i = 0; i < REPEATS; i++) {
+ addAndCommit.perform(this, BINARY_ROW);
+
+ addAndCommit.perform(this, null);
+
+ addWrite(ROW_ID, BINARY_ROW2, TX_ID);
+
+ runRace(
+ () -> pollForVacuum(HybridTimestamp.MAX_VALUE),
+ () -> abortWrite(ROW_ID)
+ );
+
+ assertNull(storage.closestRowId(ROW_ID));
+ }
+ }
+
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private void scanFirstEntry(HybridTimestamp firstCommitTs) {
+ try (var cursor = scan(firstCommitTs)) {
+ cursor.hasNext();
+ }
+ }
+
+ /**
+ * Adds a tombstone and cleans a GC queue until nothing's there.
+ */
+ private void cleanup() {
+ addAndCommit(null);
+
+ BinaryRowAndRowId row;
+
+ do {
+ row = pollForVacuum(HybridTimestamp.MAX_VALUE);
+ } while (row != null);
+ }
+
+ private enum AddAndCommit {
+ ATOMIC {
+ @Override
+ HybridTimestamp perform(AbstractMvPartitionStorageConcurrencyTest
test, @Nullable BinaryRow binaryRow) {
+ HybridTimestamp ts = test.clock.now();
+
+ test.addWriteCommitted(ROW_ID, binaryRow, ts);
+
+ return ts;
+ }
+ },
+ NON_ATOMIC {
+ @Override
+ HybridTimestamp perform(AbstractMvPartitionStorageConcurrencyTest
test, @Nullable BinaryRow binaryRow) {
+ return test.addAndCommit(binaryRow);
+ }
+ };
+
+ abstract HybridTimestamp
perform(AbstractMvPartitionStorageConcurrencyTest test, @Nullable BinaryRow
binaryRow);
+ }
+}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
new file mode 100644
index 0000000000..c4157976b3
--- /dev/null
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Abstract test for MV partition storage GC.
+ */
+public abstract class AbstractMvPartitionStorageGcTest extends
BaseMvPartitionStorageTest {
+ @Test
+ void testEmptyStorage() {
+ assertNull(storage.pollForVacuum(clock.now()));
+ }
+
+ @Test
+ void testSingleValueStorage() {
+ addAndCommit(BINARY_ROW);
+
+ assertNull(storage.pollForVacuum(clock.now()));
+ }
+
+ @Test
+ void testRegularPoll() {
+ HybridTimestamp firstCommitTs = addAndCommit(BINARY_ROW);
+
+ HybridTimestamp tsBetweenCommits = clock.now();
+
+ HybridTimestamp secondCommitTs = addAndCommit(BINARY_ROW2);
+
+ // Data is still visible for older timestamps.
+ assertNull(storage.pollForVacuum(firstCommitTs));
+
+ assertNull(storage.pollForVacuum(tsBetweenCommits));
+
+ // Once a low watermark value becomes equal to second commit
timestamp, previous value
+ // becomes completely inaccessible and should be purged.
+ BinaryRowAndRowId gcedRow = storage.pollForVacuum(secondCommitTs);
+
+ assertNotNull(gcedRow);
+
+ assertRowMatches(gcedRow.binaryRow(), BINARY_ROW);
+
+ // Read from the old timestamp should return null.
+ assertNull(read(ROW_ID, firstCommitTs));
+
+ // Read from the newer timestamp should return last value.
+ assertRowMatches(read(ROW_ID, secondCommitTs), BINARY_ROW2);
+ }
+
+ @Test
+ void testPollFromUnderTombstone() {
+ addAndCommit(BINARY_ROW);
+ HybridTimestamp secondCommitTs = addAndCommit(null);
+
+ BinaryRowAndRowId row = storage.pollForVacuum(secondCommitTs);
+
+ assertNotNull(row);
+ assertRowMatches(row.binaryRow(), BINARY_ROW);
+
+ assertNull(read(ROW_ID, secondCommitTs));
+
+ // Check that tombstone is also deleted from the partition. It must be
empty at this point.
+ assertNull(storage.closestRowId(ROW_ID));
+ }
+
+ @Test
+ void testDoubleTombstone() {
+ addAndCommit(BINARY_ROW);
+ addAndCommit(null);
+ HybridTimestamp lastCommitTs = addAndCommit(null);
+
+ BinaryRowAndRowId row = storage.pollForVacuum(lastCommitTs);
+
+ assertNotNull(row);
+ assertRowMatches(row.binaryRow(), BINARY_ROW);
+
+ assertNull(read(ROW_ID, lastCommitTs));
+
+ // Check that all tombstones are deleted from the partition. It must
be empty at this point.
+ assertNull(storage.closestRowId(ROW_ID));
+ }
+
+ @Test
+ void testManyOldVersions() {
+ addAndCommit(BINARY_ROW);
+
+ addAndCommit(BINARY_ROW2);
+
+ HybridTimestamp lowWatermark = addAndCommit(null);
+
+ // Poll the oldest row.
+ BinaryRowAndRowId row = pollForVacuum(lowWatermark);
+
+ assertNotNull(row);
+ assertRowMatches(row.binaryRow(), BINARY_ROW);
+
+ // Poll the next oldest row.
+ row = pollForVacuum(lowWatermark);
+
+ assertNotNull(row);
+ assertRowMatches(row.binaryRow(), BINARY_ROW2);
+
+ // Nothing else to poll.
+ assertNull(pollForVacuum(lowWatermark));
+ }
+}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 3d6d230835..ac315cb10e 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -42,12 +42,10 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -55,19 +53,9 @@ import org.junit.jupiter.params.provider.EnumSource;
/**
* Base test for MV partition storages.
*/
-public abstract class AbstractMvPartitionStorageTest extends
BaseMvStoragesTest {
- private static final UUID COMMIT_TABLE_ID = UUID.randomUUID();
-
- /** A partition id that should be used to create a partition instance. */
- protected static final int PARTITION_ID = 1;
-
- protected MvPartitionStorage storage;
-
+public abstract class AbstractMvPartitionStorageTest extends
BaseMvPartitionStorageTest {
protected final UUID txId = newTransactionId();
- /** Hybrid clock to generate timestamps. */
- protected final HybridClock clock = new HybridClockImpl();
-
protected final TestKey key = new TestKey(10, "foo");
private final TestValue value = new TestValue(20, "bar");
protected final BinaryRow binaryRow = binaryRow(key, value);
@@ -75,84 +63,6 @@ public abstract class AbstractMvPartitionStorageTest extends
BaseMvStoragesTest
private final BinaryRow binaryRow2 = binaryRow(key, value2);
private final BinaryRow binaryRow3 = binaryRow(key, new TestValue(22,
"bar3"));
- /**
- * Reads a row.
- */
- protected BinaryRow read(RowId rowId, HybridTimestamp timestamp) {
- ReadResult readResult = storage.read(rowId, timestamp);
-
- return readResult.binaryRow();
- }
-
- /**
- * Scans partition.
- */
- protected PartitionTimestampCursor scan(HybridTimestamp timestamp) {
- return storage.scan(timestamp);
- }
-
- /**
- * Inserts a row inside of consistency closure.
- */
- protected RowId insert(BinaryRow binaryRow, UUID txId) {
- return insert(binaryRow, txId, null);
- }
-
- /**
- * Inserts a row inside of consistency closure.
- */
- protected RowId insert(BinaryRow binaryRow, UUID txId, @Nullable UUID
explicitRowId) {
- RowId rowId = explicitRowId == null ? new RowId(PARTITION_ID) : new
RowId(PARTITION_ID, explicitRowId);
-
- storage.runConsistently(() -> storage.addWrite(rowId, binaryRow, txId,
UUID.randomUUID(), 0));
-
- return rowId;
- }
-
- /**
- * Adds/updates a write-intent inside of consistency closure.
- */
- protected BinaryRow addWrite(RowId rowId, BinaryRow binaryRow, UUID txId) {
- return storage.runConsistently(() -> storage.addWrite(rowId,
binaryRow, txId, COMMIT_TABLE_ID, PARTITION_ID));
- }
-
- /**
- * Commits write-intent inside of consistency closure.
- */
- protected void commitWrite(RowId rowId, HybridTimestamp tsExact) {
- storage.runConsistently(() -> {
- storage.commitWrite(rowId, tsExact);
-
- return null;
- });
- }
-
- /**
- * Writes a row to storage like if it was first added using {@link
MvPartitionStorage#addWrite(RowId, BinaryRow, UUID, UUID, int)}
- * and immediately committed with {@link
MvPartitionStorage#commitWrite(RowId, HybridTimestamp)}.
- */
- protected void addWriteCommitted(RowId rowId, BinaryRow row,
HybridTimestamp commitTimestamp) {
- storage.runConsistently(() -> {
- storage.addWriteCommitted(rowId, row, commitTimestamp);
-
- return null;
- });
- }
-
- /**
- * Aborts write-intent inside of consistency closure.
- */
- protected BinaryRow abortWrite(RowId rowId) {
- return storage.runConsistently(() -> storage.abortWrite(rowId));
- }
-
- /**
- * Creates a new transaction id.
- */
- private static UUID newTransactionId() {
- return UUID.randomUUID();
- }
-
/**
* Tests that reads from empty storage return empty results.
*/
@@ -448,11 +358,6 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvStoragesTest
assertRowMatches(foundRow, binaryRow);
}
- protected final void assertRowMatches(BinaryRow rowUnderQuestion,
BinaryRow expectedRow) {
- assertThat(rowUnderQuestion, is(notNullValue()));
- assertThat(rowUnderQuestion.bytes(), is(equalTo(expectedRow.bytes())));
- }
-
@Test
void readOfCommittedRowWithAnyTransactionIdReturnsTheRow() {
RowId rowId = insert(binaryRow, txId);
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 471551cfed..a7789feb33 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -43,8 +43,6 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
@@ -92,8 +90,6 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
private TableIndexView hashIdx;
- private final HybridClock clock = new HybridClockImpl();
-
/**
* Initializes the internal structures needed for tests.
*
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
new file mode 100644
index 0000000000..f59293ca1e
--- /dev/null
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import java.util.UUID;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Base test for MV partition storages.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public abstract class BaseMvPartitionStorageTest extends BaseMvStoragesTest {
+ protected static final int PARTITION_ID = 1;
+
+ protected static final UUID COMMIT_TABLE_ID = UUID.randomUUID();
+
+ protected static final UUID TX_ID = newTransactionId();
+
+ protected static final RowId ROW_ID = new RowId(PARTITION_ID);
+
+ protected static final TestKey KEY = new TestKey(10, "foo");
+
+ protected static final BinaryRow BINARY_ROW = binaryRow(KEY, new
TestValue(20, "bar"));
+
+ protected static final BinaryRow BINARY_ROW2 = binaryRow(KEY, new
TestValue(30, "bar"));
+
+ protected @InjectConfiguration("mock.tables.foo = {}") TablesConfiguration
tablesCfg;
+
+ protected StorageEngine engine;
+
+ protected MvTableStorage table;
+
+ protected MvPartitionStorage storage;
+
+ /**
+ * Creates a new transaction id.
+ */
+ protected static UUID newTransactionId() {
+ return UUID.randomUUID();
+ }
+
+ protected abstract StorageEngine createEngine();
+
+ @BeforeEach
+ protected void setUp() {
+ TableConfiguration tableCfg = tablesCfg.tables().get("foo");
+
+ engine = createEngine();
+
+ engine.start();
+
+ tableCfg.dataStorage().change(ds -> ds.convert(engine.name())).join();
+
+ table = engine.createMvTable(tableCfg, tablesCfg);
+
+ table.start();
+
+ storage = table.getOrCreateMvPartition(PARTITION_ID);
+ }
+
+ @AfterEach
+ protected void tearDown() throws Exception {
+ IgniteUtils.closeAll(
+ storage == null ? null : storage::close,
+ table == null ? null : table::stop,
+ engine == null ? null : engine::stop
+ );
+ }
+
+ /**
+ * Reads a row.
+ */
+ protected BinaryRow read(RowId rowId, HybridTimestamp timestamp) {
+ ReadResult readResult = storage.read(rowId, timestamp);
+
+ return readResult.binaryRow();
+ }
+
+ /**
+ * Scans partition.
+ */
+ protected PartitionTimestampCursor scan(HybridTimestamp timestamp) {
+ return storage.scan(timestamp);
+ }
+
+ /**
+ * Inserts a row inside of consistency closure.
+ */
+ protected RowId insert(@Nullable BinaryRow binaryRow, UUID txId) {
+ return insert(binaryRow, txId, null);
+ }
+
+ /**
+ * Inserts a row inside of consistency closure.
+ */
+ protected RowId insert(@Nullable BinaryRow binaryRow, UUID txId, @Nullable
UUID explicitRowId) {
+ RowId rowId = explicitRowId == null ? new RowId(PARTITION_ID) : new
RowId(PARTITION_ID, explicitRowId);
+
+ storage.runConsistently(() -> storage.addWrite(rowId, binaryRow, txId,
UUID.randomUUID(), 0));
+
+ return rowId;
+ }
+
+ /**
+ * Adds/updates a write-intent inside of consistency closure.
+ */
+ protected BinaryRow addWrite(RowId rowId, @Nullable BinaryRow binaryRow,
UUID txId) {
+ return storage.runConsistently(() -> storage.addWrite(rowId,
binaryRow, txId, COMMIT_TABLE_ID, PARTITION_ID));
+ }
+
+ /**
+ * Commits write-intent inside of consistency closure.
+ */
+ protected void commitWrite(RowId rowId, HybridTimestamp tsExact) {
+ storage.runConsistently(() -> {
+ storage.commitWrite(rowId, tsExact);
+
+ return null;
+ });
+ }
+
+ /**
+ * Writes a row to storage like if it was first added using {@link
MvPartitionStorage#addWrite(RowId, BinaryRow, UUID, UUID, int)}
+ * and immediately committed with {@link
MvPartitionStorage#commitWrite(RowId, HybridTimestamp)}.
+ */
+ protected void addWriteCommitted(RowId rowId, @Nullable BinaryRow row,
HybridTimestamp commitTimestamp) {
+ storage.runConsistently(() -> {
+ storage.addWriteCommitted(rowId, row, commitTimestamp);
+
+ return null;
+ });
+ }
+
+ protected HybridTimestamp addAndCommit(@Nullable BinaryRow binaryRow) {
+ HybridTimestamp commitTs = clock.now();
+
+ addWrite(ROW_ID, binaryRow, TX_ID);
+ commitWrite(ROW_ID, commitTs);
+
+ return commitTs;
+ }
+
+ /**
+ * Aborts write-intent inside of consistency closure.
+ */
+ protected BinaryRow abortWrite(RowId rowId) {
+ return storage.runConsistently(() -> storage.abortWrite(rowId));
+ }
+
+ protected BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) {
+ return storage.runConsistently(() ->
storage.pollForVacuum(lowWatermark));
+ }
+}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index 0f17f6ce99..767cb1a017 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -17,10 +17,17 @@
package org.apache.ignite.internal.storage;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.schema.BinaryConverter;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
@@ -62,6 +69,9 @@ public abstract class BaseMvStoragesTest {
/** Key {@link BinaryTuple} converter for tests. */
protected static BinaryConverter kBinaryConverter;
+ /** Hybrid clock to generate timestamps. */
+ protected final HybridClock clock = new HybridClockImpl();
+
@BeforeAll
static void beforeAll() {
marshallerFactory = new ReflectionMarshallerFactory();
@@ -158,6 +168,11 @@ public abstract class BaseMvStoragesTest {
}
}
+ protected final void assertRowMatches(BinaryRow rowUnderQuestion,
BinaryRow expectedRow) {
+ assertThat(rowUnderQuestion, is(notNullValue()));
+ assertThat(rowUnderQuestion.bytes(), is(equalTo(expectedRow.bytes())));
+ }
+
/**
* Test pojo key.
*/
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 9f1f429f52..a1473b77e8 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -17,16 +17,21 @@
package org.apache.ignite.internal.storage.impl;
+import static java.util.Comparator.comparing;
+
import java.util.Iterator;
+import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
@@ -36,6 +41,7 @@ import
org.apache.ignite.internal.storage.StorageClosedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
/**
@@ -44,6 +50,11 @@ import org.jetbrains.annotations.Nullable;
public class TestMvPartitionStorage implements MvPartitionStorage {
private final ConcurrentNavigableMap<RowId, VersionChain> map = new
ConcurrentSkipListMap<>();
+ private final NavigableSet<IgniteBiTuple<VersionChain, RowId>> gcQueue =
new ConcurrentSkipListSet<>(
+ comparing((IgniteBiTuple<VersionChain, RowId> p) -> p.get1().ts)
+ .thenComparing(IgniteBiTuple::get2)
+ );
+
private volatile long lastAppliedIndex;
private volatile long lastAppliedTerm;
@@ -65,7 +76,7 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
final @Nullable UUID txId;
final @Nullable UUID commitTableId;
final int commitPartitionId;
- final @Nullable VersionChain next;
+ volatile @Nullable VersionChain next;
VersionChain(@Nullable BinaryRow row, @Nullable HybridTimestamp ts,
@Nullable UUID txId, @Nullable UUID commitTableId,
int commitPartitionId, @Nullable VersionChain next) {
@@ -149,8 +160,13 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
}
@Override
- public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row,
UUID txId, UUID commitTableId, int commitPartitionId)
- throws TxIdMismatchException {
+ public synchronized @Nullable BinaryRow addWrite(
+ RowId rowId,
+ @Nullable BinaryRow row,
+ UUID txId,
+ UUID commitTableId,
+ int commitPartitionId
+ ) throws TxIdMismatchException {
checkClosed();
BinaryRow[] res = {null};
@@ -173,7 +189,7 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
}
@Override
- public @Nullable BinaryRow abortWrite(RowId rowId) {
+ public synchronized @Nullable BinaryRow abortWrite(RowId rowId) {
checkClosed();
BinaryRow[] res = {null};
@@ -194,7 +210,7 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
}
@Override
- public void commitWrite(RowId rowId, HybridTimestamp timestamp) {
+ public synchronized void commitWrite(RowId rowId, HybridTimestamp
timestamp) {
checkClosed();
map.compute(rowId, (ignored, versionChain) -> {
@@ -204,12 +220,16 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
return versionChain;
}
- return VersionChain.forCommitted(timestamp, versionChain);
+ return resolveCommittedVersionChain(rowId,
VersionChain.forCommitted(timestamp, versionChain));
});
}
@Override
- public void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp
commitTimestamp) throws StorageException {
+ public synchronized void addWriteCommitted(
+ RowId rowId,
+ @Nullable BinaryRow row,
+ HybridTimestamp commitTimestamp
+ ) throws StorageException {
checkClosed();
map.compute(rowId, (ignored, versionChain) -> {
@@ -217,10 +237,33 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
throw new StorageException("Write intent exists for " + rowId);
}
- return new VersionChain(row, commitTimestamp, null, null,
ReadResult.UNDEFINED_COMMIT_PARTITION_ID, versionChain);
+ return resolveCommittedVersionChain(rowId, new VersionChain(
+ row,
+ commitTimestamp,
+ null,
+ null,
+ ReadResult.UNDEFINED_COMMIT_PARTITION_ID,
+ versionChain
+ ));
});
}
+ @Nullable
+ private VersionChain resolveCommittedVersionChain(RowId rowId,
VersionChain committedVersionChain) {
+ if (committedVersionChain.next != null) {
+ // Avoid creating tombstones for tombstones.
+ if (committedVersionChain.row == null &&
committedVersionChain.next.row == null) {
+ return committedVersionChain.next;
+ }
+
+ // Calling it from the compute is fine. Concurrent writes of the
same row are impossible, and if we call the compute closure
+ // several times, the same tuple will be inserted into the GC
queue (timestamp and rowId don't change in this case).
+ gcQueue.add(new IgniteBiTuple<>(committedVersionChain, rowId));
+ }
+
+ return committedVersionChain;
+ }
+
@Override
public ReadResult read(RowId rowId, @Nullable HybridTimestamp timestamp) {
checkClosed();
@@ -426,6 +469,51 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
return map.ceilingKey(lowerBound);
}
+ @Override
+ public synchronized @Nullable BinaryRowAndRowId
pollForVacuum(HybridTimestamp lowWatermark) {
+ Iterator<IgniteBiTuple<VersionChain, RowId>> it = gcQueue.iterator();
+
+ if (!it.hasNext()) {
+ return null;
+ }
+
+ IgniteBiTuple<VersionChain, RowId> next = it.next();
+ VersionChain dequeuedVersionChain = next.get1();
+
+ if (dequeuedVersionChain.ts.compareTo(lowWatermark) > 0) {
+ return null;
+ }
+
+ RowId rowId = next.get2();
+
+ VersionChain versionChainToRemove = dequeuedVersionChain.next;
+ assert versionChainToRemove.next == null;
+
+ dequeuedVersionChain.next = null;
+ it.remove();
+
+ // Tombstones must be deleted.
+ if (dequeuedVersionChain.row == null) {
+ map.compute(rowId, (ignored, head) -> {
+ if (head == dequeuedVersionChain) {
+ return null;
+ }
+
+ for (VersionChain cur = head; cur != null; cur = cur.next) {
+ if (cur.next == dequeuedVersionChain) {
+ cur.next = null;
+
+ gcQueue.remove(new IgniteBiTuple<>(cur, rowId));
+ }
+ }
+
+ return head;
+ });
+ }
+
+ return new BinaryRowAndRowId(versionChainToRemove.row, rowId);
+ }
+
@Override
public long rowsCount() {
checkClosed();
@@ -443,8 +531,10 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
}
/** Removes all entries from this storage. */
- public void clear() {
+ public synchronized void clear() {
map.clear();
+
+ gcQueue.clear();
}
private void checkClosed() {
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
index 510f69465e..268ff8d496 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestStorageEngine.java
@@ -31,6 +31,11 @@ public class TestStorageEngine implements StorageEngine {
/** Engine name. */
public static final String ENGINE_NAME = "test";
+ @Override
+ public String name() {
+ return ENGINE_NAME;
+ }
+
/** {@inheritDoc} */
@Override
public void start() throws StorageException {
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index 7433bf61df..79a5e81019 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -106,6 +106,11 @@ public class PersistentPageMemoryStorageEngine implements
StorageEngine {
return engineConfig;
}
+ @Override
+ public String name() {
+ return ENGINE_NAME;
+ }
+
@Override
public void start() throws StorageException {
int pageSize = engineConfig.pageSize().value();
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
index dfbce90cbc..ac47663e0a 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
@@ -63,6 +63,11 @@ public class VolatilePageMemoryStorageEngine implements
StorageEngine {
this.ioRegistry = ioRegistry;
}
+ @Override
+ public String name() {
+ return ENGINE_NAME;
+ }
+
/** {@inheritDoc} */
@Override
public void start() throws StorageException {
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 3c0bb51e0f..86a2e278e1 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -674,7 +674,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
}
@Override
- public void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp
commitTimestamp) throws StorageException {
+ public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row,
HybridTimestamp commitTimestamp) throws StorageException {
assert rowId.partitionId() == partitionId : rowId;
if (!closeBusyLock.enterBusy()) {
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
index 7d6cf962b0..74476e3b1b 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorageTest.java
@@ -81,7 +81,7 @@ abstract class AbstractPageMemoryMvPartitionStorageTest
extends AbstractMvPartit
}
@Test
- void uncommittedMultiPageValuesWorkWithScans() throws Exception {
+ void uncommittedMultiPageValuesWorkWithScans() {
BinaryRow longRow = rowStoredInFragments();
insert(longRow, txId);
@@ -94,7 +94,7 @@ abstract class AbstractPageMemoryMvPartitionStorageTest
extends AbstractMvPartit
}
@Test
- void committedMultiPageValuesWorkWithScans() throws Exception {
+ void committedMultiPageValuesWorkWithScans() {
BinaryRow longRow = rowStoredInFragments();
RowId rowId = insert(longRow, txId);
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
new file mode 100644
index 0000000000..b0ba265703
--- /dev/null
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageConcurrencyTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.nio.file.Path;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import
org.apache.ignite.internal.storage.AbstractMvPartitionStorageConcurrencyTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+@ExtendWith(WorkDirectoryExtension.class)
+class PersistentPageMemoryMvPartitionStorageConcurrencyTest extends
AbstractMvPartitionStorageConcurrencyTest {
+ @InjectConfiguration("mock.checkpoint.checkpointDelayMillis = 0")
+ private PersistentPageMemoryStorageEngineConfiguration engineConfig;
+
+ @WorkDirectory
+ private Path workDir;
+
+ @Override
+ protected StorageEngine createEngine() {
+ var ioRegistry = new PageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ return new PersistentPageMemoryStorageEngine("test", engineConfig,
ioRegistry, workDir, null);
+ }
+}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
index de63196e9d..f75b17a8b9 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
@@ -28,74 +28,29 @@ import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
-import org.apache.ignite.internal.components.LongJvmPauseDetector;
-import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
-import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryTableStorage;
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-@ExtendWith({ConfigurationExtension.class, WorkDirectoryExtension.class})
+@ExtendWith(WorkDirectoryExtension.class)
class PersistentPageMemoryMvPartitionStorageTest extends
AbstractPageMemoryMvPartitionStorageTest {
- @WorkDirectory
- private Path workDir;
-
- @InjectConfiguration(value = "mock.checkpoint.checkpointDelayMillis = 0")
+ @InjectConfiguration("mock.checkpoint.checkpointDelayMillis = 0")
private PersistentPageMemoryStorageEngineConfiguration engineConfig;
- @InjectConfiguration(
- value = "mock.tables.foo.dataStorage.name = " +
PersistentPageMemoryStorageEngine.ENGINE_NAME
- )
- private TablesConfiguration tablesConfig;
-
- private LongJvmPauseDetector longJvmPauseDetector;
-
- private PersistentPageMemoryStorageEngine engine;
-
- private PersistentPageMemoryTableStorage table;
-
- private PersistentPageMemoryMvPartitionStorage pageMemStorage;
-
- @BeforeEach
- void setUp() {
- longJvmPauseDetector = new LongJvmPauseDetector("test",
Loggers.forClass(LongJvmPauseDetector.class));
-
- longJvmPauseDetector.start();
-
- engine = new PersistentPageMemoryStorageEngine("test", engineConfig,
ioRegistry, workDir, longJvmPauseDetector);
-
- engine.start();
-
- table = engine.createMvTable(tablesConfig.tables().get("foo"),
tablesConfig);
-
- table.start();
-
- pageMemStorage = table.createMvPartitionStorage(PARTITION_ID);
- storage = pageMemStorage;
-
- ((PersistentPageMemoryMvPartitionStorage) storage).start();
- }
+ @WorkDirectory
+ private Path workDir;
- @AfterEach
- void tearDown() throws Exception {
- IgniteUtils.closeAll(
- storage::close,
- table == null ? null : table::stop,
- engine == null ? null : engine::stop,
- longJvmPauseDetector == null ? null :
longJvmPauseDetector::stop
- );
+ @Override
+ protected StorageEngine createEngine() {
+ return new PersistentPageMemoryStorageEngine("test", engineConfig,
ioRegistry, workDir, null);
}
@Override
@@ -113,7 +68,7 @@ class PersistentPageMemoryMvPartitionStorageTest extends
AbstractPageMemoryMvPar
}
private void restartStorage() throws Exception {
- engine
+ ((PersistentPageMemoryStorageEngine) engine)
.checkpointManager()
.forceCheckpoint("before_stop_engine")
.futureFor(FINISHED)
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
new file mode 100644
index 0000000000..fff0392187
--- /dev/null
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageConcurrencyTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import
org.apache.ignite.internal.storage.AbstractMvPartitionStorageConcurrencyTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-18023")
+@ExtendWith(WorkDirectoryExtension.class)
+class VolatilePageMemoryMvPartitionStorageConcurrencyTest extends
AbstractMvPartitionStorageConcurrencyTest {
+ @InjectConfiguration
+ private VolatilePageMemoryStorageEngineConfiguration engineConfig;
+
+ @Override
+ protected StorageEngine createEngine() {
+ var ioRegistry = new PageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ return new VolatilePageMemoryStorageEngine(engineConfig, ioRegistry);
+ }
+}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
index eeeff5e075..c04aa304c2 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorageTest.java
@@ -17,53 +17,18 @@
package org.apache.ignite.internal.storage.pagememory.mv;
-import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
import
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
-import
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryTableStorage;
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.extension.ExtendWith;
-@ExtendWith(ConfigurationExtension.class)
class VolatilePageMemoryMvPartitionStorageTest extends
AbstractPageMemoryMvPartitionStorageTest {
@InjectConfiguration
private VolatilePageMemoryStorageEngineConfiguration engineConfig;
- private VolatilePageMemoryStorageEngine engine;
-
- private VolatilePageMemoryTableStorage table;
-
- @BeforeEach
- void setUp(
- @InjectConfiguration(
- value = "mock.tables.foo.dataStorage.name = " +
VolatilePageMemoryStorageEngine.ENGINE_NAME
- )
- TablesConfiguration tablesConfig
- ) {
- engine = new VolatilePageMemoryStorageEngine(engineConfig, ioRegistry);
-
- engine.start();
-
- table = engine.createMvTable(tablesConfig.tables().get("foo"),
tablesConfig);
-
- table.start();
-
- storage = table.createMvPartitionStorage(PARTITION_ID);
-
- ((VolatilePageMemoryMvPartitionStorage) storage).start();
- }
-
- @AfterEach
- void tearDown() throws Exception {
- IgniteUtils.closeAll(
- storage::close,
- table == null ? null : table::stop,
- engine == null ? null : engine::stop
- );
+ @Override
+ protected StorageEngine createEngine() {
+ return new VolatilePageMemoryStorageEngine(engineConfig, ioRegistry);
}
@Override
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 4372cdc89e..3dd513a86a 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -536,7 +536,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
@Override
- public void addWriteCommitted(RowId rowId, BinaryRow row, HybridTimestamp
commitTimestamp) throws StorageException {
+ public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row,
HybridTimestamp commitTimestamp) throws StorageException {
@SuppressWarnings("resource") WriteBatchWithIndex writeBatch =
requireWriteBatch();
ByteBuffer keyBuf = prepareHeapKeyBuf(rowId);
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index 87971a8a7d..8775ad674b 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -106,6 +106,11 @@ public class RocksDbStorageEngine implements StorageEngine
{
return scheduledPool;
}
+ @Override
+ public String name() {
+ return ENGINE_NAME;
+ }
+
/** {@inheritDoc} */
@Override
public void start() throws StorageException {
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java
new file mode 100644
index 0000000000..8333f51cc9
--- /dev/null
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageConcurrencyTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb;
+
+import java.nio.file.Path;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import
org.apache.ignite.internal.storage.AbstractMvPartitionStorageConcurrencyTest;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Storage test implementation for {@link RocksDbMvPartitionStorage}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDbMvPartitionStorageConcurrencyTest extends
AbstractMvPartitionStorageConcurrencyTest {
+ @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size =
16777216, writeBufferSize = 16777216}}")
+ private RocksDbStorageEngineConfiguration engineConfig;
+
+ @WorkDirectory
+ private Path workDir;
+
+ @Override
+ protected StorageEngine createEngine() {
+ return new RocksDbStorageEngine(engineConfig, workDir);
+ }
+}
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index b11b42cce8..e8d7f03f40 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -17,66 +17,29 @@
package org.apache.ignite.internal.storage.rocksdb;
-import static
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-
import java.nio.file.Path;
-import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.schema.configuration.TableConfiguration;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest;
-import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageView;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
/**
* Storage test implementation for {@link RocksDbMvPartitionStorage}.
*/
@ExtendWith(WorkDirectoryExtension.class)
-@ExtendWith(ConfigurationExtension.class)
public class RocksDbMvPartitionStorageTest extends
AbstractMvPartitionStorageTest {
- private RocksDbStorageEngine engine;
-
- private RocksDbTableStorage table;
-
- @BeforeEach
- public void setUp(
- @WorkDirectory Path workDir,
- @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion
{size = 16536, writeBufferSize = 16536}}")
- RocksDbStorageEngineConfiguration engineConfig,
- @InjectConfiguration(
- value = "mock.tables.foo.dataStorage.name = " +
RocksDbStorageEngine.ENGINE_NAME
- ) TablesConfiguration tablesCfg
- ) throws Exception {
- TableConfiguration tableCfg = tablesCfg.tables().get("foo");
-
- assertThat(((RocksDbDataStorageView)
tableCfg.dataStorage().value()).dataRegion(),
equalTo(DEFAULT_DATA_REGION_NAME));
-
- engine = new RocksDbStorageEngine(engineConfig, workDir);
+ @InjectConfiguration("mock {flushDelayMillis = 0, defaultRegion {size =
16777216, writeBufferSize = 16777216}}")
+ private RocksDbStorageEngineConfiguration engineConfig;
- engine.start();
+ @WorkDirectory
+ private Path workDir;
- table = engine.createMvTable(tableCfg, tablesCfg);
-
- table.start();
-
- storage = table.getOrCreateMvPartition(PARTITION_ID);
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- IgniteUtils.closeAll(
- storage::close,
- table == null ? null : table::stop,
- engine == null ? null : engine::stop
- );
+ @Override
+ protected StorageEngine createEngine() {
+ return new RocksDbStorageEngine(engineConfig, workDir);
}
@Override