This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4550e073d9 IGNITE-21438 Add thread assertions to MV partition and
index storages (#3149)
4550e073d9 is described below
commit 4550e073d96a30c0adce0785440734f271d1c10e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Feb 7 13:36:04 2024 +0400
IGNITE-21438 Add thread assertions to MV partition and index storages
(#3149)
---
.../ignite/internal/thread/IgniteThread.java | 25 +++-
.../ignite/internal/thread/ThreadAttributes.java | 40 +++++
.../ignite/internal/thread/ThreadOperation.java | 30 ++++
.../network/netty/NamedNioEventLoopGroup.java | 14 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 33 ++++-
modules/storage-api/build.gradle | 1 +
.../internal/storage/ThreadAssertingCursor.java | 56 +++++++
.../storage/ThreadAssertingMvPartitionStorage.java | 164 +++++++++++++++++++++
.../ThreadAssertingPartitionTimestampCursor.java | 48 ++++++
.../engine/ThreadAssertingMvTableStorage.java | 162 ++++++++++++++++++++
.../engine/ThreadAssertingStorageEngine.java | 62 ++++++++
.../index/ThreadAssertingHashIndexStorage.java | 51 +++++++
.../storage/index/ThreadAssertingIndexStorage.java | 76 ++++++++++
.../storage/index/ThreadAssertingPeekCursor.java | 47 ++++++
.../index/ThreadAssertingSortedIndexStorage.java | 52 +++++++
.../ignite/internal/worker/ThreadAssertions.java | 75 ++++++++++
16 files changed, 924 insertions(+), 12 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
index 53f2fbf5f3..1801aef277 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
@@ -17,6 +17,11 @@
package org.apache.ignite.internal.thread;
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.worker.IgniteWorker;
@@ -24,18 +29,20 @@ import org.apache.ignite.internal.util.worker.IgniteWorker;
/**
* This class adds some necessary plumbing on top of the {@link Thread} class.
Specifically, it adds:
* <ul>
- * <li>Consistent naming of threads</li>;
- * <li>Name of the ignite node this thread belongs to</li>.
+ * <li>Consistent naming of threads;</li>
+ * <li>Name of the ignite node this thread belongs to.</li>
* </ul>
* <b>Note</b>: this class is intended for internal use only.
*/
-public class IgniteThread extends Thread {
+public class IgniteThread extends Thread implements ThreadAttributes {
/** Number of all ignite threads in the system. */
private static final AtomicLong THREAD_COUNTER = new AtomicLong();
/** The name of the Ignite instance this thread belongs to. */
protected final String igniteInstanceName;
+ private final Set<ThreadOperation> allowedOperations;
+
/**
* Creates thread with given worker.
*
@@ -61,11 +68,16 @@ public class IgniteThread extends Thread {
* @param nodeName Name of the Ignite instance this thread is created
for.
* @param threadName Name of thread.
* @param r Runnable to execute.
+ * @param allowedOperations Operations which this thread allows to execute.
*/
- public IgniteThread(String nodeName, String threadName, Runnable r) {
+ public IgniteThread(String nodeName, String threadName, Runnable r,
ThreadOperation... allowedOperations) {
super(r, createName(THREAD_COUNTER.incrementAndGet(), threadName,
nodeName));
this.igniteInstanceName = nodeName;
+
+ Set<ThreadOperation> operations =
EnumSet.noneOf(ThreadOperation.class);
+ Collections.addAll(operations, allowedOperations);
+ this.allowedOperations = unmodifiableSet(operations);
}
/**
@@ -113,4 +125,9 @@ public class IgniteThread extends Thread {
public String toString() {
return S.toString(IgniteThread.class, this, "name", getName());
}
+
+ @Override
+ public Set<ThreadOperation> allowedOperations() {
+ return allowedOperations;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadAttributes.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadAttributes.java
new file mode 100644
index 0000000000..230095c38b
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadAttributes.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread;
+
+import java.util.Set;
+
+/**
+ * Holds some thread attributes.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ThreadAttributes {
+ /**
+ * Returns all operations that this thread allows to execute.
+ */
+ Set<ThreadOperation> allowedOperations();
+
+ /**
+ * Returns {@code true} if the given operation is allowed by this thread.
+ *
+ * @param operation Operation to check.
+ */
+ default boolean allows(ThreadOperation operation) {
+ return allowedOperations().contains(operation);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
new file mode 100644
index 0000000000..62586d4397
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread;
+
+/**
+ * Operation that a thread might be allowed or denied to execute.
+ */
+public enum ThreadOperation {
+ /** Storage read. */
+ STORAGE_READ,
+ /** Storage write. */
+ STORAGE_WRITE,
+ /** Make a blocking wait (involving taking a lock or waiting on a
conditional variable or waiting for time to pass. */
+ WAIT
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java
index d0853bfda9..2ed0a4691f 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java
@@ -17,11 +17,16 @@
package org.apache.ignite.internal.network.netty;
+import static java.util.Collections.emptySet;
+
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.FastThreadLocalThread;
+import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.thread.ThreadAttributes;
+import org.apache.ignite.internal.thread.ThreadOperation;
/**
* Named netty event loop.
@@ -59,7 +64,9 @@ public class NamedNioEventLoopGroup extends NioEventLoopGroup
{
/**
* Marker class for network threads. Basically is just a {@link
FastThreadLocalThread}.
*/
- public static class NetworkThread extends FastThreadLocalThread {
+ public static class NetworkThread extends FastThreadLocalThread implements
ThreadAttributes {
+ private static final Set<ThreadOperation> ALLOWED_OPERATIONS =
emptySet();
+
/**
* Constructor.
*
@@ -70,5 +77,10 @@ public class NamedNioEventLoopGroup extends
NioEventLoopGroup {
public NetworkThread(ThreadGroup group, Runnable target, String name) {
super(group, target, name);
}
+
+ @Override
+ public Set<ThreadOperation> allowedOperations() {
+ return ALLOWED_OPERATIONS;
+ }
}
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index e245959970..ed784318cc 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -27,7 +27,10 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -157,6 +160,8 @@ import
org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModule;
import org.apache.ignite.internal.storage.DataStorageModules;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.ThreadAssertingStorageEngine;
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
import org.apache.ignite.internal.systemview.api.SystemViewManager;
import org.apache.ignite.internal.table.distributed.TableManager;
@@ -180,6 +185,7 @@ import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.VaultService;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.internal.worker.CriticalWorkerWatchdog;
+import org.apache.ignite.internal.worker.ThreadAssertions;
import
org.apache.ignite.internal.worker.configuration.CriticalWorkersConfiguration;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
@@ -553,14 +559,13 @@ public class IgniteImpl implements Ignite {
GcConfiguration gcConfig =
clusterConfigRegistry.getConfiguration(GcConfiguration.KEY);
- dataStorageMgr = new DataStorageManager(
- dataStorageModules.createStorageEngines(
- name,
- nodeConfigRegistry,
- storagePath,
- longJvmPauseDetector
- )
+ Map<String, StorageEngine> storageEngines =
dataStorageModules.createStorageEngines(
+ name,
+ nodeConfigRegistry,
+ storagePath,
+ longJvmPauseDetector
);
+ dataStorageMgr = new
DataStorageManager(applyThreadAssertionsIfNeeded(storageEngines));
volatileLogStorageFactoryCreator = new
VolatileLogStorageFactoryCreator(workDir.resolve("volatile-log-spillout"));
@@ -745,6 +750,20 @@ public class IgniteImpl implements Ignite {
restComponent = createRestComponent(name);
}
+ private static Map<String, StorageEngine>
applyThreadAssertionsIfNeeded(Map<String, StorageEngine> storageEngines) {
+ if (!ThreadAssertions.enabled()) {
+ return storageEngines;
+ }
+
+ Map<String, StorageEngine> decoratedEngines = new HashMap<>();
+
+ for (Entry<String, StorageEngine> entry : storageEngines.entrySet()) {
+ decoratedEngines.put(entry.getKey(), new
ThreadAssertingStorageEngine(entry.getValue()));
+ }
+
+ return Map.copyOf(decoratedEngines);
+ }
+
private static SameValueLongSupplier
delayDurationMsSupplier(SchemaSynchronizationConfiguration schemaSyncConfig) {
return new SameValueLongSupplier(() ->
schemaSyncConfig.delayDuration().value());
}
diff --git a/modules/storage-api/build.gradle b/modules/storage-api/build.gradle
index f64dd596df..5c4a1bc0a2 100644
--- a/modules/storage-api/build.gradle
+++ b/modules/storage-api/build.gradle
@@ -30,6 +30,7 @@ dependencies {
implementation project(':ignite-configuration')
implementation project(":ignite-core")
implementation project(":ignite-catalog")
+ implementation project(":ignite-workers")
implementation libs.jetbrains.annotations
implementation libs.auto.service.annotations
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingCursor.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingCursor.java
new file mode 100644
index 0000000000..4f85219b25
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingCursor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import static
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
+
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+
+/**
+ * {@link Cursor} that performs thread assertions when doing read operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingCursor<T> implements Cursor<T> {
+ private final Cursor<T> cursor;
+
+ /** Constructor. */
+ public ThreadAssertingCursor(Cursor<T> cursor) {
+ this.cursor = cursor;
+ }
+
+ @Override
+ public void close() {
+ cursor.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ assertThreadAllowsToRead();
+
+ return cursor.hasNext();
+ }
+
+ @Override
+ public T next() {
+ assertThreadAllowsToRead();
+
+ return cursor.next();
+ }
+}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
new file mode 100644
index 0000000000..5fddecd805
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import static
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
+import static
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.gc.GcEntry;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link MvPartitionStorage} that performs thread assertions when doing
read/write operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingMvPartitionStorage implements MvPartitionStorage {
+ private final MvPartitionStorage partitionStorage;
+
+ /** Constructor. */
+ public ThreadAssertingMvPartitionStorage(MvPartitionStorage
partitionStorage) {
+ this.partitionStorage = partitionStorage;
+ }
+
+ @Override
+ public <V> V runConsistently(WriteClosure<V> closure) throws
StorageException {
+ return partitionStorage.runConsistently(closure);
+ }
+
+ @Override
+ public CompletableFuture<Void> flush() {
+ assertThreadAllowsToWrite();
+
+ return partitionStorage.flush();
+ }
+
+ @Override
+ public long lastAppliedIndex() {
+ return partitionStorage.lastAppliedIndex();
+ }
+
+ @Override
+ public long lastAppliedTerm() {
+ return partitionStorage.lastAppliedTerm();
+ }
+
+ @Override
+ public void lastApplied(long lastAppliedIndex, long lastAppliedTerm)
throws StorageException {
+ partitionStorage.lastApplied(lastAppliedIndex, lastAppliedTerm);
+ }
+
+ @Override
+ public byte @Nullable [] committedGroupConfiguration() {
+ return partitionStorage.committedGroupConfiguration();
+ }
+
+ @Override
+ public void committedGroupConfiguration(byte[] config) {
+ partitionStorage.committedGroupConfiguration(config);
+ }
+
+ @Override
+ public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws
StorageException {
+ assertThreadAllowsToRead();
+
+ return partitionStorage.read(rowId, timestamp);
+ }
+
+ @Override
+ public @Nullable BinaryRow addWrite(RowId rowId, @Nullable BinaryRow row,
UUID txId, int commitTableId, int commitPartitionId)
+ throws TxIdMismatchException, StorageException {
+ assertThreadAllowsToWrite();
+
+ return partitionStorage.addWrite(rowId, row, txId, commitTableId,
commitPartitionId);
+ }
+
+ @Override
+ public @Nullable BinaryRow abortWrite(RowId rowId) throws StorageException
{
+ assertThreadAllowsToWrite();
+
+ return partitionStorage.abortWrite(rowId);
+ }
+
+ @Override
+ public void commitWrite(RowId rowId, HybridTimestamp timestamp) throws
StorageException {
+ assertThreadAllowsToWrite();
+
+ partitionStorage.commitWrite(rowId, timestamp);
+ }
+
+ @Override
+ public void addWriteCommitted(RowId rowId, @Nullable BinaryRow row,
HybridTimestamp commitTimestamp) throws StorageException {
+ assertThreadAllowsToWrite();
+
+ partitionStorage.addWriteCommitted(rowId, row, commitTimestamp);
+ }
+
+ @Override
+ public Cursor<ReadResult> scanVersions(RowId rowId) throws
StorageException {
+ assertThreadAllowsToRead();
+
+ return new
ThreadAssertingCursor<>(partitionStorage.scanVersions(rowId));
+ }
+
+ @Override
+ public PartitionTimestampCursor scan(HybridTimestamp timestamp) throws
StorageException {
+ assertThreadAllowsToRead();
+
+ return new
ThreadAssertingPartitionTimestampCursor(partitionStorage.scan(timestamp));
+ }
+
+ @Override
+ public @Nullable RowId closestRowId(RowId lowerBound) throws
StorageException {
+ assertThreadAllowsToRead();
+
+ return partitionStorage.closestRowId(lowerBound);
+ }
+
+ @Override
+ public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
+ assertThreadAllowsToRead();
+
+ return partitionStorage.peek(lowWatermark);
+ }
+
+ @Override
+ public @Nullable BinaryRow vacuum(GcEntry entry) {
+ assertThreadAllowsToWrite();
+
+ return partitionStorage.vacuum(entry);
+ }
+
+ @Override
+ public long rowsCount() throws StorageException {
+ assertThreadAllowsToRead();
+
+ return partitionStorage.rowsCount();
+ }
+
+ @Override
+ public void close() {
+ partitionStorage.close();
+ }
+}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingPartitionTimestampCursor.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingPartitionTimestampCursor.java
new file mode 100644
index 0000000000..17a8f32c86
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingPartitionTimestampCursor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import static
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link PartitionTimestampCursor} that performs thread assertions when doing
read operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingPartitionTimestampCursor extends
ThreadAssertingCursor<ReadResult> implements PartitionTimestampCursor {
+ private final PartitionTimestampCursor cursor;
+
+ /** Constructor. */
+ public ThreadAssertingPartitionTimestampCursor(PartitionTimestampCursor
cursor) {
+ super(cursor);
+
+ this.cursor = cursor;
+ }
+
+ @Override
+ public @Nullable BinaryRow committed(HybridTimestamp timestamp) {
+ assertThreadAllowsToRead();
+
+ return cursor.committed(timestamp);
+ }
+}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
new file mode 100644
index 0000000000..97950786df
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingMvTableStorage.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.engine;
+
+import static
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.ThreadAssertingMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.HashIndexStorage;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
+import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
+import
org.apache.ignite.internal.storage.index.ThreadAssertingHashIndexStorage;
+import
org.apache.ignite.internal.storage.index.ThreadAssertingSortedIndexStorage;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link MvTableStorage} that performs thread assertions when doing
read/write operations and wraps substorages it
+ * creates to guarantee the same behavior for them.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingMvTableStorage implements MvTableStorage {
+ private final MvTableStorage tableStorage;
+
+ /** Constructor. */
+ public ThreadAssertingMvTableStorage(MvTableStorage tableStorage) {
+ this.tableStorage = tableStorage;
+ }
+
+ @Override
+ public void close() throws Exception {
+ tableStorage.close();
+ }
+
+ @Override
+ public CompletableFuture<MvPartitionStorage> createMvPartition(int
partitionId) {
+ assertThreadAllowsToWrite();
+
+ return tableStorage.createMvPartition(partitionId)
+ .thenApply(ThreadAssertingMvPartitionStorage::new);
+ }
+
+ @Override
+ public @Nullable MvPartitionStorage getMvPartition(int partitionId) {
+ return tableStorage.getMvPartition(partitionId);
+ }
+
+ @Override
+ public CompletableFuture<Void> destroyPartition(int partitionId) throws
StorageException {
+ assertThreadAllowsToWrite();
+
+ return tableStorage.destroyPartition(partitionId);
+ }
+
+ @Override
+ public SortedIndexStorage getOrCreateSortedIndex(int partitionId,
StorageSortedIndexDescriptor indexDescriptor) {
+ assertThreadAllowsToWrite();
+
+ SortedIndexStorage indexStorage =
tableStorage.getOrCreateSortedIndex(partitionId, indexDescriptor);
+ return new ThreadAssertingSortedIndexStorage(indexStorage);
+ }
+
+ @Override
+ public HashIndexStorage getOrCreateHashIndex(int partitionId,
StorageHashIndexDescriptor indexDescriptor) {
+ assertThreadAllowsToWrite();
+
+ HashIndexStorage indexStorage =
tableStorage.getOrCreateHashIndex(partitionId, indexDescriptor);
+ return new ThreadAssertingHashIndexStorage(indexStorage);
+ }
+
+ @Override
+ public CompletableFuture<Void> destroyIndex(int indexId) {
+ assertThreadAllowsToWrite();
+
+ return tableStorage.destroyIndex(indexId);
+ }
+
+ @Override
+ public boolean isVolatile() {
+ return tableStorage.isVolatile();
+ }
+
+ @Override
+ public void start() throws StorageException {
+ tableStorage.start();
+ }
+
+ @Override
+ public void stop() throws StorageException {
+ tableStorage.stop();
+ }
+
+ @Override
+ public CompletableFuture<Void> destroy() {
+ assertThreadAllowsToWrite();
+
+ return tableStorage.destroy();
+ }
+
+ @Override
+ public CompletableFuture<Void> startRebalancePartition(int partitionId) {
+ assertThreadAllowsToWrite();
+
+ return tableStorage.startRebalancePartition(partitionId);
+ }
+
+ @Override
+ public CompletableFuture<Void> abortRebalancePartition(int partitionId) {
+ assertThreadAllowsToWrite();
+
+ return tableStorage.abortRebalancePartition(partitionId);
+ }
+
+ @Override
+ public CompletableFuture<Void> finishRebalancePartition(
+ int partitionId,
+ long lastAppliedIndex,
+ long lastAppliedTerm,
+ byte[] groupConfig
+ ) {
+ assertThreadAllowsToWrite();
+
+ return tableStorage.finishRebalancePartition(partitionId,
lastAppliedIndex, lastAppliedTerm, groupConfig);
+ }
+
+ @Override
+ public CompletableFuture<Void> clearPartition(int partitionId) {
+ assertThreadAllowsToWrite();
+
+ return tableStorage.clearPartition(partitionId);
+ }
+
+ @Override
+ public @Nullable IndexStorage getIndex(int partitionId, int indexId) {
+ return tableStorage.getIndex(partitionId, indexId);
+ }
+
+ @Override
+ public StorageTableDescriptor getTableDescriptor() {
+ return tableStorage.getTableDescriptor();
+ }
+}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingStorageEngine.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingStorageEngine.java
new file mode 100644
index 0000000000..d9f02b1773
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/ThreadAssertingStorageEngine.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.engine;
+
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+
+/**
+ * {@link StorageEngine} that wraps storages it creates to perform thread
assertions read/write operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingStorageEngine implements StorageEngine {
+ private final StorageEngine storageEngine;
+
+ /** Constructor. */
+ public ThreadAssertingStorageEngine(StorageEngine storageEngine) {
+ this.storageEngine = storageEngine;
+ }
+
+ @Override
+ public String name() {
+ return storageEngine.name();
+ }
+
+ @Override
+ public void start() throws StorageException {
+ storageEngine.start();
+ }
+
+ @Override
+ public void stop() throws StorageException {
+ storageEngine.stop();
+ }
+
+ @Override
+ public boolean isVolatile() {
+ return storageEngine.isVolatile();
+ }
+
+ @Override
+ public MvTableStorage createMvTable(StorageTableDescriptor
tableDescriptor, StorageIndexDescriptorSupplier indexDescriptorSupplier) {
+ MvTableStorage tableStorage =
storageEngine.createMvTable(tableDescriptor, indexDescriptorSupplier);
+ return new ThreadAssertingMvTableStorage(tableStorage);
+ }
+}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingHashIndexStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingHashIndexStorage.java
new file mode 100644
index 0000000000..5e1ca534c1
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingHashIndexStorage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import static
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite;
+
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+
+/**
+ * {@link HashIndexStorage} that performs thread assertions when doing
read/write operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingHashIndexStorage extends
ThreadAssertingIndexStorage implements HashIndexStorage {
+ private final HashIndexStorage indexStorage;
+
+ /** Constructor. */
+ public ThreadAssertingHashIndexStorage(HashIndexStorage indexStorage) {
+ super(indexStorage);
+
+ this.indexStorage = indexStorage;
+ }
+
+ @Override
+ public StorageHashIndexDescriptor indexDescriptor() {
+ return indexStorage.indexDescriptor();
+ }
+
+ @Override
+ public void destroy() throws StorageException {
+ assertThreadAllowsToWrite();
+
+ indexStorage.destroy();
+ }
+}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingIndexStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingIndexStorage.java
new file mode 100644
index 0000000000..10560742f3
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingIndexStorage.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import static
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
+import static
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToWrite;
+
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.ThreadAssertingCursor;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link IndexStorage} that performs thread assertions when doing read/write
operations.
+ *
+ * @see ThreadAssertions
+ */
+abstract class ThreadAssertingIndexStorage implements IndexStorage {
+ private final IndexStorage indexStorage;
+
+ /** Constructor. */
+ ThreadAssertingIndexStorage(IndexStorage indexStorage) {
+ this.indexStorage = indexStorage;
+ }
+
+ @Override
+ public Cursor<RowId> get(BinaryTuple key) throws StorageException {
+ assertThreadAllowsToRead();
+
+ return new ThreadAssertingCursor<>(indexStorage.get(key));
+ }
+
+ @Override
+ public void put(IndexRow row) throws StorageException {
+ assertThreadAllowsToWrite();
+
+ indexStorage.put(row);
+ }
+
+ @Override
+ public void remove(IndexRow row) throws StorageException {
+ assertThreadAllowsToWrite();
+
+ indexStorage.remove(row);
+ }
+
+ @Override
+ public @Nullable RowId getNextRowIdToBuild() throws StorageException {
+ return indexStorage.getNextRowIdToBuild();
+ }
+
+ @Override
+ public void setNextRowIdToBuild(@Nullable RowId rowId) throws
StorageException {
+ assertThreadAllowsToWrite();
+
+ indexStorage.setNextRowIdToBuild(rowId);
+ }
+}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingPeekCursor.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingPeekCursor.java
new file mode 100644
index 0000000000..8e084b642f
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingPeekCursor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import static
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
+
+import org.apache.ignite.internal.storage.ThreadAssertingCursor;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link PeekCursor} that performs thread assertions when doing read
operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingPeekCursor<T> extends ThreadAssertingCursor<T>
implements PeekCursor<T> {
+ private final PeekCursor<T> cursor;
+
+ /** Constructor. */
+ public ThreadAssertingPeekCursor(PeekCursor<T> cursor) {
+ super(cursor);
+
+ this.cursor = cursor;
+ }
+
+ @Override
+ public @Nullable T peek() {
+ assertThreadAllowsToRead();
+
+ return cursor.peek();
+ }
+}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java
new file mode 100644
index 0000000000..0b61260cfc
--- /dev/null
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import static
org.apache.ignite.internal.worker.ThreadAssertions.assertThreadAllowsToRead;
+
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.worker.ThreadAssertions;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link SortedIndexStorage} that performs thread assertions when doing
read/write operations.
+ *
+ * @see ThreadAssertions
+ */
+public class ThreadAssertingSortedIndexStorage extends
ThreadAssertingIndexStorage implements SortedIndexStorage {
+ private final SortedIndexStorage indexStorage;
+
+ /** Constructor. */
+ public ThreadAssertingSortedIndexStorage(SortedIndexStorage indexStorage) {
+ super(indexStorage);
+
+ this.indexStorage = indexStorage;
+ }
+
+ @Override
+ public StorageSortedIndexDescriptor indexDescriptor() {
+ return indexStorage.indexDescriptor();
+ }
+
+ @Override
+ public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound, int flags) {
+ assertThreadAllowsToRead();
+
+ return new ThreadAssertingPeekCursor<>(indexStorage.scan(lowerBound,
upperBound, flags));
+ }
+}
diff --git
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/ThreadAssertions.java
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/ThreadAssertions.java
new file mode 100644
index 0000000000..64b54c2c35
--- /dev/null
+++
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/ThreadAssertions.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.worker;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.ThreadAttributes;
+import org.apache.ignite.internal.thread.ThreadOperation;
+
+/**
+ * Tools to assert that the current thread allows to perform a requested
operation.
+ */
+public class ThreadAssertions {
+ public static final String ENABLED_PROPERTY =
"ignite.thread.assertions.enabled";
+
+ private static final IgniteLogger LOG =
Loggers.forClass(ThreadAssertions.class);
+
+ private static final boolean ENABLED =
Boolean.parseBoolean(System.getProperty(ENABLED_PROPERTY, "true"));
+
+ /**
+ * Returns {@code true} if thread assertions are enabled.
+ */
+ public static boolean enabled() {
+ return ENABLED;
+ }
+
+ /**
+ * Assert that the current thread allows to perform {@link
ThreadOperation#STORAGE_WRITE} operations.
+ */
+ public static void assertThreadAllowsToWrite() {
+ assertThreadAllowsTo(ThreadOperation.STORAGE_WRITE);
+ }
+
+ /**
+ * Assert that the current thread allows to perform {@link
ThreadOperation#STORAGE_READ} operations.
+ */
+ public static void assertThreadAllowsToRead() {
+ assertThreadAllowsTo(ThreadOperation.STORAGE_READ);
+ }
+
+ private static void assertThreadAllowsTo(ThreadOperation
requestedOperation) {
+ Thread currentThread = Thread.currentThread();
+
+ // TODO: IGNITE-21439 - actually throw AssertionError if the operation
is not allowed.
+
+ if (!(currentThread instanceof ThreadAttributes)) {
+ LOG.warn("Thread {} does not have allowed operations",
trackerException(), currentThread);
+
+ return;
+ }
+
+ if (!((ThreadAttributes) currentThread).allows(requestedOperation)) {
+ LOG.warn("Thread {} is not allowed to {}", trackerException(),
currentThread, requestedOperation);
+ }
+ }
+
+ private static Exception trackerException() {
+ return new Exception("Tracker");
+ }
+}