This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new cef030bd85 IGNITE-19169 Deadlock detected while calling
MvPartitionStorage#pollForVacuum (#1874)
cef030bd85 is described below
commit cef030bd858bde1e5af552bf471e3f682565535e
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Apr 3 11:00:22 2023 +0300
IGNITE-19169 Deadlock detected while calling
MvPartitionStorage#pollForVacuum (#1874)
---
.../internal/components/LongJvmPauseDetector.java | 8 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
.../internal/storage/rocksdb/GarbageCollector.java | 2 +-
modules/table/build.gradle | 1 +
.../table/distributed/StorageUpdateHandler.java | 26 ++--
.../AbstractMvStorageUpdateHandlerTest.java | 143 +++++++++++++++++++++
...istentPageMemoryMvStorageUpdateHandlerTest.java | 55 ++++++++
.../RocksDbMvStorageUpdateHandlerTest.java | 41 ++++++
.../TestMvStorageUpdateHandlerTest.java | 28 ++++
...latilePageMemoryMvStorageUpdateHandlerTest.java | 39 ++++++
10 files changed, 324 insertions(+), 21 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/components/LongJvmPauseDetector.java
b/modules/core/src/main/java/org/apache/ignite/internal/components/LongJvmPauseDetector.java
index 042e210582..64e69c357c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/components/LongJvmPauseDetector.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/components/LongJvmPauseDetector.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tostring.S;
@@ -39,6 +40,8 @@ import org.jetbrains.annotations.Nullable;
* accordingly.
*/
public class LongJvmPauseDetector implements IgniteComponent {
+ private final IgniteLogger log =
Loggers.forClass(LongJvmPauseDetector.class);
+
/** Ignite JVM pause detector threshold default value. */
public static final int DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD = 500;
@@ -81,11 +84,8 @@ public class LongJvmPauseDetector implements IgniteComponent
{
private final String nodeName;
- private final IgniteLogger log;
-
- public LongJvmPauseDetector(String nodeName, IgniteLogger log) {
+ public LongJvmPauseDetector(String nodeName) {
this.nodeName = nodeName;
- this.log = log;
}
/** {@inheritDoc} */
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 76087f301e..8bbf30d12d 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -284,7 +284,7 @@ public class IgniteImpl implements Ignite {
IgniteImpl(String name, Path configPath, Path workDir, @Nullable
ClassLoader serviceProviderClassLoader) {
this.name = name;
- longJvmPauseDetector = new LongJvmPauseDetector(name,
Loggers.forClass(LongJvmPauseDetector.class));
+ longJvmPauseDetector = new LongJvmPauseDetector(name);
lifecycleManager = new LifecycleManager(name);
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
index de50ca07ab..ae04ae797b 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -204,7 +204,7 @@ class GarbageCollector {
// Someone has processed the element in parallel, so we need
to take a new head of the queue.
if (!gcRowVersion.equals(oldGcRowVersion)) {
- helper.lockByRowId.releaseLock(gcRowVersion.getRowId());
+ helper.lockByRowId.releaseLock(oldGcRowVersion.getRowId());
continue;
}
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index f570553349..7913cd8afd 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -53,6 +53,7 @@ dependencies {
testImplementation project(':ignite-raft')
testImplementation project(':ignite-schema')
testImplementation project(':ignite-page-memory')
+ testImplementation project(':ignite-storage-rocksdb')
testImplementation(testFixtures(project(':ignite-core')))
testImplementation(testFixtures(project(':ignite-schema')))
testImplementation(testFixtures(project(':ignite-configuration')))
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index de76d70584..88dee09f03 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -120,8 +120,6 @@ public class StorageUpdateHandler {
@Nullable HybridTimestamp lowWatermark
) {
storage.runConsistently(() -> {
- executeBatchGc(lowWatermark);
-
BinaryRow row = rowBuffer != null ? new ByteBufferRow(rowBuffer) :
null;
RowId rowId = new RowId(partitionId, rowUuid);
UUID commitTblId = commitPartitionId.tableId();
@@ -134,14 +132,16 @@ public class StorageUpdateHandler {
tryRemovePreviousWritesIndex(rowId, oldRow);
}
+ addToIndexes(row, rowId);
+
if (onReplication != null) {
onReplication.accept(rowId);
}
- addToIndexes(row, rowId);
-
return null;
});
+
+ executeBatchGc(lowWatermark);
}
/**
@@ -179,8 +179,6 @@ public class StorageUpdateHandler {
@Nullable HybridTimestamp lowWatermark
) {
storage.runConsistently(() -> {
- executeBatchGc(lowWatermark);
-
UUID commitTblId = commitPartitionId.tableId();
int commitPartId = commitPartitionId.partitionId();
@@ -209,6 +207,8 @@ public class StorageUpdateHandler {
return null;
});
+
+ executeBatchGc(lowWatermark);
}
private void executeBatchGc(@Nullable HybridTimestamp newLwm) {
@@ -356,16 +356,12 @@ public class StorageUpdateHandler {
* @param lowWatermark Low watermark for the vacuum.
* @param count Count of entries to GC.
*/
- private void vacuumBatch(HybridTimestamp lowWatermark, int count) {
- storage.runConsistently(() -> {
- for (int i = 0; i < count; i++) {
- if (!internalVacuum(lowWatermark)) {
- break;
- }
+ void vacuumBatch(HybridTimestamp lowWatermark, int count) {
+ for (int i = 0; i < count; i++) {
+ if (!storage.runConsistently(() -> internalVacuum(lowWatermark))) {
+ break;
}
-
- return null;
- });
+ }
}
/**
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/AbstractMvStorageUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/AbstractMvStorageUpdateHandlerTest.java
new file mode 100644
index 0000000000..ce596b0d68
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/AbstractMvStorageUpdateHandlerTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.storage.BaseMvStoragesTest;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Abstract class for testing {@link StorageUpdateHandler} using different
implementations of {@link MvPartitionStorage}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+abstract class AbstractMvStorageUpdateHandlerTest extends BaseMvStoragesTest {
+ /** To be used in a loop. {@link RepeatedTest} has a smaller failure rate
due to recreating the storage every time. */
+ private static final int REPEATS = 100;
+
+ private static final int PARTITION_ID = 0;
+
+ private StorageEngine storageEngine;
+
+ private MvTableStorage tableStorage;
+
+ private MvPartitionStorage partitionStorage;
+
+ private TestPartitionDataStorage partitionDataStorage;
+
+ private StorageUpdateHandler storageUpdateHandler;
+
+ @BeforeEach
+ void setUp(
+ @InjectConfiguration("mock.tables.foo{}") TablesConfiguration
tablesConfig,
+ @InjectConfiguration DistributionZoneConfiguration
distributionZoneConfig
+ ) {
+ storageEngine = createStorageEngine();
+
+ storageEngine.start();
+
+ TableConfiguration tableConfig = tablesConfig.tables().get("foo");
+
+ assertThat(
+ tableConfig.dataStorage().change(dataStorageChange ->
dataStorageChange.convert(storageEngine.name())),
+ willCompleteSuccessfully()
+ );
+
+ tableStorage = storageEngine.createMvTable(tableConfig, tablesConfig,
distributionZoneConfig);
+
+ tableStorage.start();
+
+ partitionStorage = getOrCreateMvPartition(tableStorage, PARTITION_ID);
+
+ partitionDataStorage = new TestPartitionDataStorage(partitionStorage);
+
+ storageUpdateHandler = new StorageUpdateHandler(PARTITION_ID,
partitionDataStorage, Map::of, tableConfig.dataStorage());
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAllManually(
+ partitionStorage,
+ tableStorage,
+ storageEngine == null ? null : storageEngine::stop
+ );
+ }
+
+ protected abstract StorageEngine createStorageEngine();
+
+ @Test
+ void testConcurrentVacuumBatch() {
+ RowId rowId0 = new RowId(PARTITION_ID);
+ RowId rowId1 = new RowId(PARTITION_ID);
+
+ BinaryRow row0 = binaryRow(new TestKey(0, "key0"), new TestValue(0,
"value0"));
+ BinaryRow row1 = binaryRow(new TestKey(0, "key0"), new TestValue(0,
"value0"));
+
+ for (int i = 0; i < REPEATS; i++) {
+ addWriteCommitted(partitionDataStorage, rowId0, row0, clock.now());
+ addWriteCommitted(partitionDataStorage, rowId1, row1, clock.now());
+
+ addWriteCommitted(partitionDataStorage, rowId0, row0, clock.now());
+ addWriteCommitted(partitionDataStorage, rowId1, row1, clock.now());
+
+ addWriteCommitted(partitionDataStorage, rowId0, null, clock.now());
+ addWriteCommitted(partitionDataStorage, rowId1, null, clock.now());
+
+ runRace(
+ () ->
storageUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE, 2),
+ () ->
storageUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE, 2)
+ );
+
+
assertNull(partitionDataStorage.getStorage().closestRowId(RowId.lowestRowId(PARTITION_ID)));
+ }
+ }
+
+ private static void addWriteCommitted(PartitionDataStorage storage, RowId
rowId, @Nullable BinaryRow row, HybridTimestamp timestamp) {
+ storage.runConsistently(() -> {
+ storage.addWrite(rowId, row, UUID.randomUUID(), UUID.randomUUID(),
PARTITION_ID);
+
+ storage.commitWrite(rowId, timestamp);
+
+ return null;
+ });
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PersistentPageMemoryMvStorageUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PersistentPageMemoryMvStorageUpdateHandlerTest.java
new file mode 100644
index 0000000000..c12736fb2d
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PersistentPageMemoryMvStorageUpdateHandlerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.nio.file.Path;
+import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(WorkDirectoryExtension.class)
+class PersistentPageMemoryMvStorageUpdateHandlerTest extends
AbstractMvStorageUpdateHandlerTest {
+ @InjectConfiguration
+ private PersistentPageMemoryStorageEngineConfiguration storageEngineConfig;
+
+ @WorkDirectory
+ private Path workDir;
+
+ @Override
+ protected StorageEngine createStorageEngine() {
+ PageIoRegistry ioRegistry = new PageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ String nodeName = "test";
+
+ return new PersistentPageMemoryStorageEngine(
+ nodeName,
+ storageEngineConfig,
+ ioRegistry,
+ workDir,
+ new LongJvmPauseDetector(nodeName)
+ );
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/RocksDbMvStorageUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/RocksDbMvStorageUpdateHandlerTest.java
new file mode 100644
index 0000000000..974828f748
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/RocksDbMvStorageUpdateHandlerTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.nio.file.Path;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import
org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(WorkDirectoryExtension.class)
+class RocksDbMvStorageUpdateHandlerTest extends
AbstractMvStorageUpdateHandlerTest {
+ @InjectConfiguration
+ private RocksDbStorageEngineConfiguration storageEngineConfig;
+
+ @WorkDirectory
+ private Path workDir;
+
+ @Override
+ protected StorageEngine createStorageEngine() {
+ return new RocksDbStorageEngine(storageEngineConfig, workDir);
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TestMvStorageUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TestMvStorageUpdateHandlerTest.java
new file mode 100644
index 0000000000..d01d78d7fc
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TestMvStorageUpdateHandlerTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.impl.TestStorageEngine;
+
+class TestMvStorageUpdateHandlerTest extends
AbstractMvStorageUpdateHandlerTest {
+ @Override
+ protected StorageEngine createStorageEngine() {
+ return new TestStorageEngine();
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/VolatilePageMemoryMvStorageUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/VolatilePageMemoryMvStorageUpdateHandlerTest.java
new file mode 100644
index 0000000000..ab821eb99c
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/VolatilePageMemoryMvStorageUpdateHandlerTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.evict.PageEvictionTrackerNoOp;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import
org.apache.ignite.internal.storage.pagememory.VolatilePageMemoryStorageEngine;
+import
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineConfiguration;
+
+class VolatilePageMemoryMvStorageUpdateHandlerTest extends
AbstractMvStorageUpdateHandlerTest {
+ @InjectConfiguration
+ private VolatilePageMemoryStorageEngineConfiguration storageEngineConfig;
+
+ @Override
+ protected StorageEngine createStorageEngine() {
+ PageIoRegistry ioRegistry = new PageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ return new VolatilePageMemoryStorageEngine("test",
storageEngineConfig, ioRegistry, PageEvictionTrackerNoOp.INSTANCE);
+ }
+}