This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c947f30  IGNITE-15601 Implement stop for calcite module in 3.0 (#395)
c947f30 is described below

commit c947f301c358deb47468820d0435bd177d3a5bad
Author: Taras Ledkov <[email protected]>
AuthorDate: Wed Oct 20 13:39:50 2021 +0300

    IGNITE-15601 Implement stop for calcite module in 3.0 (#395)
---
 modules/calcite/pom.xml                            |  12 ++
 .../processors/query/calcite/QueryProcessor.java   |   3 +
 .../query/calcite/SqlQueryProcessor.java           |  75 +++++---
 .../calcite/exec/ClosableIteratorsHolder.java      |  50 ++---
 .../query/calcite/exec/ExchangeService.java        |   2 +-
 .../query/calcite/exec/ExchangeServiceImpl.java    |  21 ++-
 .../query/calcite/exec/ExecutionService.java       |   2 +-
 .../query/calcite/exec/ExecutionServiceImpl.java   |  27 ++-
 .../LifecycleAware.java}                           |  26 +--
 .../query/calcite/exec/MailboxRegistry.java        |   2 +-
 .../query/calcite/exec/MailboxRegistryImpl.java    |  14 ++
 .../query/calcite/exec/QueryTaskExecutor.java      |   2 +-
 .../query/calcite/exec/QueryTaskExecutorImpl.java  |  38 +++-
 .../query/calcite/exec/rel/RootNode.java           |   9 +-
 .../query/calcite/message/MessageService.java      |   3 +-
 .../query/calcite/message/MessageServiceImpl.java  |  11 +-
 .../query/calcite/prepare/DummyPlanCache.java      |  10 +
 .../query/calcite/prepare/QueryPlanCache.java      |   4 +-
 .../query/calcite/StopCalciteModuleTest.java       | 208 +++++++++++++++++++++
 .../calcite/exec/rel/AbstractExecutionTest.java    |  15 +-
 .../ignite/internal/thread/IgniteThread.java       | 105 +++++++++++
 .../ignite/internal/thread/NamedThreadFactory.java |   2 +-
 .../apache/ignite/lang/NodeStoppingException.java  |   2 +-
 .../internal/metastorage/MetaStorageManager.java   |  54 +++---
 24 files changed, 567 insertions(+), 130 deletions(-)

diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml
index af9b7a7..1251065 100644
--- a/modules/calcite/pom.xml
+++ b/modules/calcite/pom.xml
@@ -119,6 +119,18 @@
 
         <!-- Test dependencies -->
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-api</artifactId>
             <scope>test</scope>
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryProcessor.java
index 1e72e97..abd3e30 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryProcessor.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite;
 
 import java.util.List;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.lang.IgniteException;
 
 /**
  * QueryProcessor interface.
@@ -31,6 +32,8 @@ public interface QueryProcessor extends IgniteComponent {
      * @param qry Sql query.
      * @param params Query parameters.
      * @return List of sql cursors.
+     *
+     * @throws IgniteException in case of an error.
      * */
     List<SqlCursor<List<?>>> query(String schemaName, String qry, Object... 
params);
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
index f2699cd..bacd5bb 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
@@ -16,8 +16,12 @@
  */
 package org.apache.ignite.internal.processors.query.calcite;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
+import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.manager.EventListener;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionService;
@@ -31,17 +35,15 @@ import 
org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolderIm
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.table.event.TableEventParameters;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterService;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 public class SqlQueryProcessor implements QueryProcessor {
-    /** Default Ignite thread keep alive time. */
-    public static final long DFLT_THREAD_KEEP_ALIVE_TIME = 60_000L;
-
     private volatile ExecutionService executionSrvc;
 
     private volatile MessageService msgSrvc;
@@ -52,6 +54,12 @@ public class SqlQueryProcessor implements QueryProcessor {
 
     private final TableManager tableManager;
 
+    /** Busy lock for stop synchronisation. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Event listeners to close. */
+    private final List<Pair<TableEvent, EventListener>> evtLsnrs = new 
ArrayList<>();
+
     public SqlQueryProcessor(
         ClusterService clusterSrvc,
         TableManager tableManager
@@ -62,17 +70,7 @@ public class SqlQueryProcessor implements QueryProcessor {
 
     /** {@inheritDoc} */
     @Override public void start() {
-        String nodeName = clusterSrvc.localConfiguration().getName();
-
-        taskExecutor = new QueryTaskExecutorImpl(
-            new StripedThreadPoolExecutor(
-                4,
-                NamedThreadFactory.threadPrefix(nodeName, "calciteQry"),
-                null,
-                true,
-                DFLT_THREAD_KEEP_ALIVE_TIME
-            )
-        );
+        taskExecutor = new 
QueryTaskExecutorImpl(clusterSrvc.localConfiguration().getName());
 
         msgSrvc = new MessageServiceImpl(
             clusterSrvc.topologyService(),
@@ -91,20 +89,51 @@ public class SqlQueryProcessor implements QueryProcessor {
             ArrayRowHandler.INSTANCE
         );
 
-        tableManager.listen(TableEvent.CREATE, new 
TableCreatedListener(schemaHolder));
-        tableManager.listen(TableEvent.ALTER, new 
TableUpdatedListener(schemaHolder));
-        tableManager.listen(TableEvent.DROP, new 
TableDroppedListener(schemaHolder));
+        registerTableListener(TableEvent.CREATE, new 
TableCreatedListener(schemaHolder));
+        registerTableListener(TableEvent.ALTER, new 
TableUpdatedListener(schemaHolder));
+        registerTableListener(TableEvent.DROP, new 
TableDroppedListener(schemaHolder));
+
+        taskExecutor.start();
+        msgSrvc.start();
+        executionSrvc.start();
     }
 
+    /** */
+    private void registerTableListener(TableEvent evt, 
AbstractTableEventListener lsnr) {
+        evtLsnrs.add(Pair.of(evt, lsnr));
+
+        tableManager.listen(evt, lsnr);
+    }
 
     /** {@inheritDoc} */
-    @Override public void stop() throws NodeStoppingException {
-        // TODO: IGNITE-15161 Implement component's stop.
+    @SuppressWarnings("unchecked")
+    @Override public void stop() throws Exception {
+        busyLock.block();
+
+        List<AutoCloseable> toClose = new 
ArrayList<AutoCloseable>(Arrays.asList(
+            executionSrvc::stop,
+            msgSrvc::stop,
+            taskExecutor::stop
+        ));
+
+        toClose.addAll(evtLsnrs.stream()
+            .map((p) -> (AutoCloseable)() -> 
tableManager.removeListener(p.left, p.right))
+            .collect(Collectors.toList()));
+
+        IgniteUtils.closeAll(toClose);
     }
 
     /** {@inheritDoc} */
     @Override public List<SqlCursor<List<?>>> query(String schemaName, String 
qry, Object... params) {
-        return executionSrvc.executeQuery(schemaName, qry, params);
+        if (!busyLock.enterBusy())
+            throw new IgniteException(new NodeStoppingException());
+
+        try {
+            return executionSrvc.executeQuery(schemaName, qry, params);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     private abstract static class AbstractTableEventListener implements 
EventListener<TableEventParameters> {
@@ -118,7 +147,7 @@ public class SqlQueryProcessor implements QueryProcessor {
 
         /** {@inheritDoc} */
         @Override public void remove(@NotNull Throwable exception) {
-            throw new IllegalStateException();
+            // No-op.
         }
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
index 56bb0e4..fb0d9f2 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
@@ -26,12 +26,16 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.thread.IgniteThread;
 import org.apache.ignite.lang.IgniteLogger;
 
 /**
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
-public class ClosableIteratorsHolder {
+public class ClosableIteratorsHolder implements LifecycleAware {
+    /** */
+    private final String nodeName;
+
     /** */
     private final ReferenceQueue refQueue;
 
@@ -45,16 +49,24 @@ public class ClosableIteratorsHolder {
     private volatile boolean stopped;
 
     /** */
-    private Thread cleanWorker;
+    private volatile IgniteThread cleanWorker;
 
     /** */
-    public ClosableIteratorsHolder(IgniteLogger log) {
+    public ClosableIteratorsHolder(String nodeName, IgniteLogger log) {
+        this.nodeName = nodeName;
         this.log = log;
 
         refQueue = new ReferenceQueue<>();
         refMap = new ConcurrentHashMap<>();
     }
 
+    /** {@inheritDoc} */
+    @Override public void start() {
+        cleanWorker = new IgniteThread(nodeName, 
"calciteIteratorsCleanWorker", () -> cleanUp(true));
+        cleanWorker.setDaemon(true);
+        cleanWorker.start();
+    }
+
     /**
      * @param src Closeable iterator.
      * @return Weak closable iterator wrapper.
@@ -66,24 +78,6 @@ public class ClosableIteratorsHolder {
     }
 
     /** */
-    public void init() {
-        cleanWorker = new Thread(null, () -> cleanUp(true), 
"calciteIteratorsCleanWorker");
-        cleanWorker.setDaemon(true);
-        cleanWorker.start();
-    }
-
-    /** */
-    public void tearDown() {
-        stopped = true;
-        refMap.clear();
-
-        Thread t = cleanWorker;
-
-        if (t != null)
-            t.interrupt();
-    }
-
-    /** */
     private void cleanUp(boolean blocking) {
         for (Reference<?> ref = nextRef(blocking); !stopped && ref != null; 
ref = nextRef(blocking))
             Commons.close(refMap.remove(ref), log);
@@ -107,6 +101,20 @@ public class ClosableIteratorsHolder {
         return new CloseableReference(referent, resource);
     }
 
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        stopped = true;
+
+        refMap.values().forEach(o -> Commons.close(o, log));
+
+        refMap.clear();
+
+        IgniteThread t = cleanWorker;
+
+        if (t != null)
+            t.interrupt();
+    }
+
     /** */
     private final class DelegatingIterator<T> implements Iterator<T>, 
AutoCloseable {
         /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
index 112c6c4..ed99a1d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
@@ -25,7 +25,7 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
 /**
  *
  */
-public interface ExchangeService {
+public interface ExchangeService extends LifecycleAware {
     /**
      * Sends a batch of data to remote node.
      * @param nodeId Target node ID.
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index d5c0afc..bed5e6b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -57,6 +57,7 @@ public class ExchangeServiceImpl implements ExchangeService {
     /** */
     private final MessageService msgSrvc;
 
+    /** */
     public ExchangeServiceImpl(
         QueryTaskExecutor taskExecutor,
         MailboxRegistry mailboxRegistry,
@@ -65,8 +66,14 @@ public class ExchangeServiceImpl implements ExchangeService {
         this.taskExecutor = taskExecutor;
         this.mailboxRegistry = mailboxRegistry;
         this.msgSrvc = msgSrvc;
+    }
 
-        init();
+    /** {@inheritDoc} */
+    @Override public void start() {
+        msgSrvc.register((n, m) -> onMessage(n, (InboxCloseMessage) m), 
SqlQueryMessageGroup.INBOX_CLOSE_MESSAGE);
+        msgSrvc.register((n, m) -> onMessage(n, (OutboxCloseMessage) m), 
SqlQueryMessageGroup.OUTBOX_CLOSE_MESSAGE);
+        msgSrvc.register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage) 
m), SqlQueryMessageGroup.QUERY_BATCH_ACK);
+        msgSrvc.register((n, m) -> onMessage(n, (QueryBatchMessage) m), 
SqlQueryMessageGroup.QUERY_BATCH_MESSAGE);
     }
 
     /** {@inheritDoc} */
@@ -135,13 +142,6 @@ public class ExchangeServiceImpl implements 
ExchangeService {
         );
     }
 
-    private void init() {
-        msgSrvc.register((n, m) -> onMessage(n, (InboxCloseMessage) m), 
SqlQueryMessageGroup.INBOX_CLOSE_MESSAGE);
-        msgSrvc.register((n, m) -> onMessage(n, (OutboxCloseMessage) m), 
SqlQueryMessageGroup.OUTBOX_CLOSE_MESSAGE);
-        msgSrvc.register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage) 
m), SqlQueryMessageGroup.QUERY_BATCH_ACK);
-        msgSrvc.register((n, m) -> onMessage(n, (QueryBatchMessage) m), 
SqlQueryMessageGroup.QUERY_BATCH_MESSAGE);
-    }
-
     /** {@inheritDoc} */
     @Override public boolean alive(String nodeId) {
         return msgSrvc.alive(nodeId);
@@ -259,4 +259,9 @@ public class ExchangeServiceImpl implements ExchangeService 
{
             null,
             ImmutableMap.of());
     }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        // No-op.
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
index f550437..a1a7ed0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
@@ -25,7 +25,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.SqlCursor;
 /**
  *
  */
-public interface ExecutionService {
+public interface ExecutionService extends LifecycleAware {
     /**
      * Executes a query.
      *
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index c99eff6..f2f2f6d 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -117,6 +117,9 @@ public class ExecutionServiceImpl<Row> implements 
ExecutionService {
     private static final SqlQueryMessagesFactory FACTORY = new 
SqlQueryMessagesFactory();
 
     /** */
+    private final TopologyService topSrvc;
+
+    /** */
     private final MessageService msgSrvc;
 
     /** */
@@ -163,6 +166,7 @@ public class ExecutionServiceImpl<Row> implements 
ExecutionService {
         QueryTaskExecutor taskExecutor,
         RowHandler<Row> handler
     ) {
+        this.topSrvc = topSrvc;
         this.handler = handler;
         this.msgSrvc = msgSrvc;
         this.schemaHolder = schemaHolder;
@@ -172,24 +176,25 @@ public class ExecutionServiceImpl<Row> implements 
ExecutionService {
         qryPlanCache = planCache;
         running = new ConcurrentHashMap<>();
         ddlConverter = new DdlSqlToCommandConverter();
-        iteratorsHolder = new ClosableIteratorsHolder(LOG);
+        iteratorsHolder = new 
ClosableIteratorsHolder(topSrvc.localMember().name(), LOG);
         mailboxRegistry = new MailboxRegistryImpl(topSrvc);
         exchangeSrvc = new ExchangeServiceImpl(taskExecutor, mailboxRegistry, 
msgSrvc);
         mappingSrvc = new MappingServiceImpl(topSrvc);
         // TODO: fix this
         affSrvc = cacheId -> Objects::hashCode;
+    }
 
-        topSrvc.addEventHandler(new NodeLeaveHandler(this::onNodeLeft));
+    /** {@inheritDoc} */
+    @Override public void start() {
+        iteratorsHolder.start();
+        mailboxRegistry.start();
+        exchangeSrvc.start();
 
-        init();
-    }
+        topSrvc.addEventHandler(new NodeLeaveHandler(this::onNodeLeft));
 
-    private void init() {
         msgSrvc.register((n, m) -> onMessage(n, (QueryStartRequest) m), 
SqlQueryMessageGroup.QUERY_START_REQUEST);
         msgSrvc.register((n, m) -> onMessage(n, (QueryStartResponse) m), 
SqlQueryMessageGroup.QUERY_START_RESPONSE);
         msgSrvc.register((n, m) -> onMessage(n, (ErrorMessage) m), 
SqlQueryMessageGroup.ERROR_MESSAGE);
-
-        iteratorsHolder.init();
     }
 
     /** {@inheritDoc} */
@@ -664,6 +669,12 @@ public class ExecutionServiceImpl<Row> implements 
ExecutionService {
         running.forEach((uuid, queryInfo) -> queryInfo.onNodeLeft(node.id()));
     }
 
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws Exception {
+        IgniteUtils.closeAll(qryPlanCache::stop, iteratorsHolder::stop, 
mailboxRegistry::stop, exchangeSrvc::stop);
+    }
+
     /** */
     private enum QueryState {
         /** */
@@ -785,7 +796,7 @@ public class ExecutionServiceImpl<Row> implements 
ExecutionService {
             }
 
             if (state0 == QueryState.CLOSED) {
-                // 2) unregister runing query
+                // 2) unregister running query
                 running.remove(ctx.queryId());
 
                 IgniteInternalException wrpEx = null;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LifecycleAware.java
similarity index 67%
copy from 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
copy to 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LifecycleAware.java
index f07658e..4913b70 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LifecycleAware.java
@@ -14,18 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import java.util.List;
+package org.apache.ignite.internal.processors.query.calcite.exec;
 
-public class DummyPlanCache implements QueryPlanCache {
-    /** {@inheritDoc} */
-    @Override public List<QueryPlan> queryPlan(PlanningContext ctx, CacheKey 
key, QueryPlanFactory factory) {
-        return factory.create(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void clear() {
+/**
+ * SQL query engine service life cycle mark interface.
+ */
+public interface LifecycleAware {
+    /**
+     * Initialize the service.
+     */
+    void start();
 
-    }
+    /**
+     * Stop the service.
+     *
+     * @throws Exception on eny error during the stopping.
+     */
+    void stop() throws Exception;
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
index cdc6b54..4a32cf6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
@@ -27,7 +27,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  *
  */
-public interface MailboxRegistry {
+public interface MailboxRegistry extends LifecycleAware {
     /**
      * Tries to register and inbox node and returns it if success or returns 
previously registered inbox otherwise.
      *
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
index e9c46a9..02a8b42 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
@@ -42,15 +42,23 @@ public class MailboxRegistryImpl implements MailboxRegistry 
{
     private static final Predicate<Mailbox<?>> ALWAYS_TRUE = o -> true;
 
     /** */
+    private final TopologyService topSrvc;
+
+    /** */
     private final Map<MailboxKey, Outbox<?>> locals;
 
     /** */
     private final Map<MailboxKey, Inbox<?>> remotes;
 
     public MailboxRegistryImpl(TopologyService topSrvc) {
+        this.topSrvc = topSrvc;
+
         locals = new ConcurrentHashMap<>();
         remotes = new ConcurrentHashMap<>();
+    }
 
+    /** {@inheritDoc} */
+    @Override public void start() {
         topSrvc.addEventHandler(new NodeLeaveHandler(this::onNodeLeft));
     }
 
@@ -126,6 +134,12 @@ public class MailboxRegistryImpl implements 
MailboxRegistry {
         return S.toString(MailboxRegistryImpl.class, this);
     }
 
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        locals.clear();
+        remotes.clear();
+    }
+
     /** */
     private static class MailboxKey {
         /** */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
index b4a5fb2..0dda0ed 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
@@ -23,7 +23,7 @@ import java.util.concurrent.CompletableFuture;
 /**
  *
  */
-public interface QueryTaskExecutor {
+public interface QueryTaskExecutor extends LifecycleAware {
     /**
      * Executes a query task in a thread, responsible for particular query 
fragment.
      *
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
index 81849d4..27f337a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
@@ -19,26 +19,45 @@ package 
org.apache.ignite.internal.processors.query.calcite.exec;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteLogger;
 
 /** */
 public class QueryTaskExecutorImpl implements QueryTaskExecutor, 
Thread.UncaughtExceptionHandler {
+    /** Default Ignite thread keep alive time. */
+    public static final long DFLT_THREAD_KEEP_ALIVE_TIME = 60_000L;
+
     /** */
     private static final IgniteLogger LOG = 
IgniteLogger.forClass(QueryTaskExecutorImpl.class);
 
     /** */
-    private final StripedThreadPoolExecutor stripedThreadPoolExecutor;
+    private final String nodeName;
+
+    /** */
+    private volatile StripedThreadPoolExecutor stripedThreadPoolExecutor;
 
     /** */
     private Thread.UncaughtExceptionHandler eHnd;
 
     /**
-     * @param stripedThreadPoolExecutor Executor.
+     * @param nodeName Node name.
      */
-    public QueryTaskExecutorImpl(StripedThreadPoolExecutor 
stripedThreadPoolExecutor) {
-        this.stripedThreadPoolExecutor = stripedThreadPoolExecutor;
+    public QueryTaskExecutorImpl(String nodeName) {
+        this.nodeName = nodeName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        this.stripedThreadPoolExecutor = new StripedThreadPoolExecutor(
+            4,
+            NamedThreadFactory.threadPrefix(nodeName, "calciteQry"),
+            null,
+            true,
+            DFLT_THREAD_KEEP_ALIVE_TIME
+        );
     }
 
     /**
@@ -75,11 +94,6 @@ public class QueryTaskExecutorImpl implements 
QueryTaskExecutor, Thread.Uncaught
         return stripedThreadPoolExecutor.submit(qryTask, hash(qryId, 
fragmentId));
     }
 
-    /** Releases resources. */
-    public void tearDown() {
-        stripedThreadPoolExecutor.shutdownNow();
-    }
-
     /** {@inheritDoc} */
     @Override public void uncaughtException(Thread t, Throwable e) {
         if (eHnd != null)
@@ -91,4 +105,10 @@ public class QueryTaskExecutorImpl implements 
QueryTaskExecutor, Thread.Uncaught
         // inlined Objects.hash(...)
         return IgniteUtils.safeAbs(31 * (31 + (qryId != null ? 
qryId.hashCode() : 0)) + Long.hashCode(fragmentId));
     }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        if (stripedThreadPoolExecutor != null)
+            stripedThreadPoolExecutor.shutdownNow();
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index 2c39021..b102bd7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -99,7 +99,7 @@ public class RootNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
 
         lock.lock();
         try {
-            if (waiting != -1)
+            if (waiting != -1 || !outBuff.isEmpty())
                 ex.compareAndSet(null, new IgniteException("Query was 
cancelled"));
 
             closed = true; // an exception has to be set first to get right 
check order
@@ -229,7 +229,7 @@ public class RootNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
                     outBuff = tmp;
                 }
 
-                if (waiting == -1)
+                if (waiting == -1 && outBuff.isEmpty())
                     close();
                 else if (inBuff.isEmpty() && waiting == 0) {
                     int req = waiting = inBufSize;
@@ -259,7 +259,10 @@ public class RootNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row>,
         if (e == null)
             return;
 
-        throw new IgniteInternalException(e);
+        if (e instanceof IgniteException)
+            throw (IgniteException)e;
+        else
+            throw new IgniteException("An error occurred while query 
executing.", e);
 // TODO: rework with SQL error code
 //        if (e instanceof IgniteSQLException)
 //            throw (IgniteSQLException)e;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
index cdf24e9..ad0831f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.internal.processors.query.calcite.message;
 
+import org.apache.ignite.internal.processors.query.calcite.exec.LifecycleAware;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.network.NetworkMessage;
 
 /**
  *
  */
-public interface MessageService {
+public interface MessageService extends LifecycleAware {
     /**
      * Sends a message to given node.
      *
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
index e044cd7..b123204 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
@@ -54,7 +54,7 @@ public class MessageServiceImpl implements MessageService {
     private final QueryTaskExecutor taskExecutor;
 
     /** */
-    private Map<Short, MessageListener> lsnrs;
+    private volatile Map<Short, MessageListener> lsnrs;
 
     /** */
     public MessageServiceImpl(
@@ -67,7 +67,10 @@ public class MessageServiceImpl implements MessageService {
         this.taskExecutor = taskExecutor;
 
         locNodeId = topSrvc.localMember().id();
+    }
 
+    /** {@inheritDoc} */
+    @Override public void start() {
         messagingSrvc.addMessageHandler(SqlQueryMessageGroup.class, 
this::onMessage);
     }
 
@@ -148,4 +151,10 @@ public class MessageServiceImpl implements MessageService {
 
         lsnr.onMessage(nodeId, msg);
     }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        if (lsnrs != null)
+            lsnrs.clear();
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
index f07658e..4d0c4c7 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
@@ -26,6 +26,16 @@ public class DummyPlanCache implements QueryPlanCache {
 
     /** {@inheritDoc} */
     @Override public void clear() {
+        // No-op.
+    }
 
+    /** {@inheritDoc} */
+    @Override public void start() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        clear();
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
index 07f94f8..1d450ba 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
@@ -19,10 +19,12 @@ package 
org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.List;
 
+import org.apache.ignite.internal.processors.query.calcite.exec.LifecycleAware;
+
 /**
  *
  */
-public interface QueryPlanCache {
+public interface QueryPlanCache extends LifecycleAware {
     /**
      * @param ctx Context.
      * @param key Cache key.
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java
new file mode 100644
index 0000000..5cd7240
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Flow;
+
+import org.apache.ignite.internal.manager.EventListener;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.table.event.TableEvent;
+import org.apache.ignite.internal.table.event.TableEventParameters;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+/**
+ * Stop Calcite module test.
+ */
+@ExtendWith(MockitoExtension.class)
+public class StopCalciteModuleTest {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(StopCalciteModuleTest.class);
+
+    private static final int ROWS = 5;
+
+    private static final String NODE_NAME = "mock-node-name";
+
+    @Mock
+    ClusterService clusterSrvc;
+
+    @Mock
+    TableManager tableManager;
+
+    @Mock
+    MessagingService msgSrvc;
+
+    @Mock
+    TopologyService topologySrvc;
+
+    @Mock
+    ClusterLocalConfiguration localCfg;
+
+    @Mock
+    InternalTable tbl;
+
+    SchemaRegistry schemaReg;
+
+    @BeforeEach
+    public void before(TestInfo testInfo) {
+        when(clusterSrvc.messagingService()).thenReturn(msgSrvc);
+        when(clusterSrvc.topologyService()).thenReturn(topologySrvc);
+        when(clusterSrvc.localConfiguration()).thenReturn(localCfg);
+
+        ClusterNode node = new ClusterNode("mock-node-id", NODE_NAME, null);
+        when(topologySrvc.localMember()).thenReturn(node);
+        
when(topologySrvc.allMembers()).thenReturn(Collections.singleton(node));
+
+        SchemaDescriptor schemaDesc = new SchemaDescriptor(
+            0,
+            new Column[]{new Column("ID", NativeTypes.INT32, false)},
+            new Column[]{new Column("VAL", NativeTypes.INT32, false)}
+        );
+
+        schemaReg = new SchemaRegistryImpl(0, (v) -> schemaDesc);
+
+        when(tbl.tableName()).thenReturn("PUBLIC.TEST");
+
+        // Mock create table (notify on register listener).
+        doAnswer(invocation -> {
+            EventListener<TableEventParameters> clo = 
(EventListener<TableEventParameters>)invocation.getArguments()[1];
+
+            clo.notify(new TableEventParameters(new 
IgniteUuid(UUID.randomUUID(), 0), "TEST", new TableImpl(tbl, schemaReg, 
tableManager)), null);
+
+            return null;
+        }).when(tableManager).listen(eq(TableEvent.CREATE), any());
+
+        RowAssembler asm = new RowAssembler(schemaReg.schema(), 0, 0, 0, 0);
+
+        asm.appendInt(0);
+        asm.appendInt(0);
+
+        BinaryRow binaryRow = asm.build();
+
+        // Mock table scan
+        doAnswer(invocation -> {
+            int part = (int)invocation.getArguments()[0];
+
+            return (Flow.Publisher<BinaryRow>)s -> {
+                s.onSubscribe(new Flow.Subscription() {
+                    @Override public void request(long n) {
+                        // No-op.
+                    }
+
+                    @Override public void cancel() {
+                        // No-op.
+                    }
+                });
+
+                if (part == 0) {
+                    for (int i = 0; i < ROWS; ++i)
+                        s.onNext(binaryRow);
+                }
+
+                s.onComplete();
+            };
+        }).when(tbl).scan(anyInt(), any());
+
+        LOG.info(">>>> Starting test {}", 
testInfo.getTestMethod().orElseThrow().getName());
+    }
+
+    /** */
+    @Test
+    public void testStopQueryOnNodeStop() throws Exception {
+        SqlQueryProcessor qryProc = new SqlQueryProcessor(clusterSrvc, 
tableManager);
+
+        qryProc.start();
+
+        List<SqlCursor<List<?>>> cursors = qryProc.query(
+            "PUBLIC",
+            "SELECT * FROM TEST"
+        );
+
+        SqlCursor<List<?>> cur = cursors.get(0);
+        cur.next();
+
+        assertTrue(isThereNodeThreads(NODE_NAME));
+
+        qryProc.stop();
+
+        // Check cursor closed.
+        assertTrue(assertThrows(IgniteException.class, 
cur::hasNext).getMessage().contains("Query was cancelled"));
+        assertTrue(assertThrows(IgniteException.class, 
cur::next).getMessage().contains("Query was cancelled"));
+
+        // Check execute query on stopped node.
+        assertTrue(assertThrows(IgniteException.class, () -> qryProc.query(
+            "PUBLIC",
+            "SELECT 1"
+        )).getCause() instanceof NodeStoppingException);
+
+        // Check: there are no alive Ignite threads.
+        assertFalse(isThereNodeThreads(NODE_NAME));
+    }
+
+    /**
+     * @return {@code true} is there are any threads with node name prefix;
+     * Otherwise returns {@code false}.
+     */
+    private boolean isThereNodeThreads(String nodeName) {
+        ThreadMXBean bean = ManagementFactory.getThreadMXBean();
+        ThreadInfo[] infos = bean.dumpAllThreads(true, true);
+
+        return Arrays.stream(infos)
+            .anyMatch((ti) -> ti.getThreadName().contains(nodeName));
+    }
+}
+
+
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index 1ab8beb..986e944 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+
 import com.google.common.collect.ImmutableMap;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -32,7 +33,6 @@ import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDesc
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -50,21 +50,14 @@ public class AbstractExecutionTest extends 
IgniteAbstractTest {
     /** */
     @BeforeEach
     public void beforeTest() {
-        taskExecutor = new QueryTaskExecutorImpl(
-            new StripedThreadPoolExecutor(
-                4,
-                "calciteQry",
-                this::handle,
-                true,
-                60_000L
-            )
-        );
+        taskExecutor = new QueryTaskExecutorImpl("no_node");
+        taskExecutor.start();
     }
 
     /** */
     @AfterEach
     public void afterTest() {
-        taskExecutor.tearDown();
+        taskExecutor.stop();
 
         if (lastE != null)
             throw new AssertionError(lastE);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
new file mode 100644
index 0000000..11bf464
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * This class adds some necessary plumbing on top of the {@link Thread} class.
+ * Specifically, it adds:
+ * <ul>
+ *      <li>Consistent naming of threads</li>;
+ *      <li>Name of the grid this thread belongs to</li>.
+ * </ul>
+ * <b>Note</b>: this class is intended for internal use only.
+ */
+public class IgniteThread extends Thread {
+    /** Number of all grid threads in the system. */
+    private static final AtomicLong cntr = new AtomicLong();
+
+    /** The name of the Ignite instance this thread belongs to. */
+    protected final String igniteInstanceName;
+
+    /**
+     * Creates grid thread with given name for a given Ignite instance.
+     *
+     * @param nodeName Name of the Ignite instance this thread is created for.
+     * @param threadName Name of thread.
+     */
+    public IgniteThread(String nodeName, String threadName) {
+        this(nodeName, threadName, null);
+    }
+
+    /**
+     * Creates grid thread with given name for a given Ignite instance.
+     *
+     * @param nodeName Name of the Ignite instance this thread is created for.
+     * @param threadName Name of thread.
+     * @param r Runnable to execute.
+     */
+    public IgniteThread(String nodeName, String threadName, Runnable r) {
+        super(r, createName(cntr.incrementAndGet(), threadName, nodeName));
+
+        this.igniteInstanceName = nodeName;
+    }
+
+    /**
+     * Gets name of the Ignite instance this thread belongs to.
+     *
+     * @return Name of the Ignite instance this thread belongs to.
+     */
+    public String getIgniteInstanceName() {
+        return igniteInstanceName;
+    }
+
+    /**
+     * @return IgniteThread or {@code null} if current thread is not an 
instance of IgniteThread.
+     */
+    public static IgniteThread current() {
+        Thread thread = Thread.currentThread();
+
+        return thread.getClass() == IgniteThread.class || thread instanceof 
IgniteThread ?
+            ((IgniteThread)thread) : null;
+    }
+
+    /**
+     * Create prefix for thread name.
+     */
+    public static String threadPrefix(String nodeName, String threadName) {
+        return "%" + nodeName + "%" + threadName + "-";
+    }
+
+    /**
+     * Creates new thread name.
+     *
+     * @param num Thread number.
+     * @param threadName Thread name.
+     * @param nodeName Ignite instance name.
+     * @return New thread name.
+     */
+    protected static String createName(long num, String threadName, String 
nodeName) {
+        return threadPrefix(nodeName, threadName) + num;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteThread.class, this, "name", getName());
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
index 9a0691b..726f3df 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
@@ -90,7 +90,7 @@ public class NamedThreadFactory implements ThreadFactory {
      * Create prefix for thread name.
      */
     public static String threadPrefix(String nodeName, String poolName) {
-        return "%" + nodeName + "%" + poolName + "-";
+        return IgniteThread.threadPrefix(nodeName, poolName);
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/lang/NodeStoppingException.java 
b/modules/core/src/main/java/org/apache/ignite/lang/NodeStoppingException.java
index 02349ca..a126d57 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/lang/NodeStoppingException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/lang/NodeStoppingException.java
@@ -30,7 +30,7 @@ public class NodeStoppingException extends 
IgniteInternalCheckedException {
      * Creates an empty node stopping exception.
      */
     public NodeStoppingException() {
-        super("Node is stopping.");
+        super("Operation has been cancelled (node is stopping).");
     }
 
     /**
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index fc85e7f..648f427 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -278,7 +278,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public synchronized void deployWatches() throws NodeStoppingException {
         if (!busyLock.enterBusy())
-            throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
+            throw new NodeStoppingException();
 
         try {
             var watch = watchAggregator.watch(
@@ -318,7 +318,7 @@ public class MetaStorageManager implements IgniteComponent {
         @NotNull WatchListener lsnr
     ) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return waitForReDeploy(watchAggregator.add(key, lsnr));
@@ -340,7 +340,7 @@ public class MetaStorageManager implements IgniteComponent {
         @NotNull WatchListener lsnr
     ) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return waitForReDeploy(watchAggregator.addPrefix(key, lsnr));
@@ -363,7 +363,7 @@ public class MetaStorageManager implements IgniteComponent {
         @NotNull WatchListener lsnr
     ) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return waitForReDeploy(watchAggregator.add(keys, lsnr));
@@ -387,7 +387,7 @@ public class MetaStorageManager implements IgniteComponent {
         @NotNull WatchListener lsnr
     ) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return waitForReDeploy(watchAggregator.add(from, to, lsnr));
@@ -405,7 +405,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public synchronized CompletableFuture<Void> unregisterWatch(long id) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             watchAggregator.cancel(id);
@@ -424,7 +424,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> svc.get(key));
@@ -439,7 +439,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key, long 
revUpperBound) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> svc.get(key, 
revUpperBound));
@@ -454,7 +454,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull CompletableFuture<Map<ByteArray, Entry>> 
getAll(Set<ByteArray> keys) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys));
@@ -469,7 +469,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull CompletableFuture<Map<ByteArray, Entry>> 
getAll(Set<ByteArray> keys, long revUpperBound) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys, 
revUpperBound));
@@ -484,7 +484,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte[] 
val) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> svc.put(key, val));
@@ -499,7 +499,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull CompletableFuture<Entry> getAndPut(@NotNull ByteArray key, 
byte[] val) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> svc.getAndPut(key, 
val));
@@ -514,7 +514,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, 
byte[]> vals) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> svc.putAll(vals));
@@ -529,7 +529,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull CompletableFuture<Map<ByteArray, Entry>> 
getAndPutAll(@NotNull Map<ByteArray, byte[]> vals) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> 
svc.getAndPutAll(vals));
@@ -544,7 +544,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> svc.remove(key));
@@ -559,7 +559,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull ByteArray 
key) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemove(key));
@@ -574,7 +574,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull CompletableFuture<Void> removeAll(@NotNull Set<ByteArray> 
keys) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> svc.removeAll(keys));
@@ -589,7 +589,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull CompletableFuture<Map<ByteArray, Entry>> 
getAndRemoveAll(@NotNull Set<ByteArray> keys) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> 
svc.getAndRemoveAll(keys));
@@ -610,7 +610,7 @@ public class MetaStorageManager implements IgniteComponent {
         @NotNull Operation failure
     ) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, 
success, failure));
@@ -629,7 +629,7 @@ public class MetaStorageManager implements IgniteComponent {
             @NotNull Collection<Operation> failure
     ) {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, 
success, failure));
@@ -644,7 +644,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable 
ByteArray keyTo, long revUpperBound) throws NodeStoppingException {
         if (!busyLock.enterBusy())
-            throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
+            throw new NodeStoppingException();
 
         try {
             return new CursorWrapper<>(
@@ -671,7 +671,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull Cursor<Entry> rangeWithAppliedRevision(@NotNull ByteArray 
keyFrom, @Nullable ByteArray keyTo) throws NodeStoppingException {
         if (!busyLock.enterBusy())
-            throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
+            throw new NodeStoppingException();
 
         try {
             return new CursorWrapper<>(
@@ -688,7 +688,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable 
ByteArray keyTo) throws NodeStoppingException {
         if (!busyLock.enterBusy())
-            throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
+            throw new NodeStoppingException();
 
         try {
             return new CursorWrapper<>(
@@ -716,7 +716,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull Cursor<Entry> prefixWithAppliedRevision(@NotNull ByteArray 
keyPrefix) throws NodeStoppingException {
         if (!busyLock.enterBusy())
-            throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
+            throw new NodeStoppingException();
 
         try {
             var rangeCriterion = 
KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
@@ -761,7 +761,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull Cursor<Entry> prefix(@NotNull ByteArray keyPrefix, long 
revUpperBound) throws NodeStoppingException {
         if (!busyLock.enterBusy())
-            throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
+            throw new NodeStoppingException();
 
         try {
             var rangeCriterion = 
KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
@@ -779,7 +779,7 @@ public class MetaStorageManager implements IgniteComponent {
      */
     public @NotNull CompletableFuture<Void> compact() {
         if (!busyLock.enterBusy())
-            return CompletableFuture.failedFuture(new 
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+            return CompletableFuture.failedFuture(new NodeStoppingException());
 
         try {
             return metaStorageSvcFut.thenCompose(MetaStorageService::compact);
@@ -920,7 +920,7 @@ public class MetaStorageManager implements IgniteComponent {
             /** {@inheritDoc} */
         @Override public void close() throws Exception {
             if (!busyLock.enterBusy())
-                throw new NodeStoppingException("Operation has been cancelled 
(node is stopping).");
+                throw new NodeStoppingException();
 
             try {
                 innerCursorFut.thenApply(cursor -> {

Reply via email to