This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c947f30 IGNITE-15601 Implement stop for calcite module in 3.0 (#395)
c947f30 is described below
commit c947f301c358deb47468820d0435bd177d3a5bad
Author: Taras Ledkov <[email protected]>
AuthorDate: Wed Oct 20 13:39:50 2021 +0300
IGNITE-15601 Implement stop for calcite module in 3.0 (#395)
---
modules/calcite/pom.xml | 12 ++
.../processors/query/calcite/QueryProcessor.java | 3 +
.../query/calcite/SqlQueryProcessor.java | 75 +++++---
.../calcite/exec/ClosableIteratorsHolder.java | 50 ++---
.../query/calcite/exec/ExchangeService.java | 2 +-
.../query/calcite/exec/ExchangeServiceImpl.java | 21 ++-
.../query/calcite/exec/ExecutionService.java | 2 +-
.../query/calcite/exec/ExecutionServiceImpl.java | 27 ++-
.../LifecycleAware.java} | 26 +--
.../query/calcite/exec/MailboxRegistry.java | 2 +-
.../query/calcite/exec/MailboxRegistryImpl.java | 14 ++
.../query/calcite/exec/QueryTaskExecutor.java | 2 +-
.../query/calcite/exec/QueryTaskExecutorImpl.java | 38 +++-
.../query/calcite/exec/rel/RootNode.java | 9 +-
.../query/calcite/message/MessageService.java | 3 +-
.../query/calcite/message/MessageServiceImpl.java | 11 +-
.../query/calcite/prepare/DummyPlanCache.java | 10 +
.../query/calcite/prepare/QueryPlanCache.java | 4 +-
.../query/calcite/StopCalciteModuleTest.java | 208 +++++++++++++++++++++
.../calcite/exec/rel/AbstractExecutionTest.java | 15 +-
.../ignite/internal/thread/IgniteThread.java | 105 +++++++++++
.../ignite/internal/thread/NamedThreadFactory.java | 2 +-
.../apache/ignite/lang/NodeStoppingException.java | 2 +-
.../internal/metastorage/MetaStorageManager.java | 54 +++---
24 files changed, 567 insertions(+), 130 deletions(-)
diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml
index af9b7a7..1251065 100644
--- a/modules/calcite/pom.xml
+++ b/modules/calcite/pom.xml
@@ -119,6 +119,18 @@
<!-- Test dependencies -->
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryProcessor.java
index 1e72e97..abd3e30 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryProcessor.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite;
import java.util.List;
import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.lang.IgniteException;
/**
* QueryProcessor interface.
@@ -31,6 +32,8 @@ public interface QueryProcessor extends IgniteComponent {
* @param qry Sql query.
* @param params Query parameters.
* @return List of sql cursors.
+ *
+ * @throws IgniteException in case of an error.
* */
List<SqlCursor<List<?>>> query(String schemaName, String qry, Object...
params);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
index f2699cd..bacd5bb 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/SqlQueryProcessor.java
@@ -16,8 +16,12 @@
*/
package org.apache.ignite.internal.processors.query.calcite;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.manager.EventListener;
import
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionService;
@@ -31,17 +35,15 @@ import
org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolderIm
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterService;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class SqlQueryProcessor implements QueryProcessor {
- /** Default Ignite thread keep alive time. */
- public static final long DFLT_THREAD_KEEP_ALIVE_TIME = 60_000L;
-
private volatile ExecutionService executionSrvc;
private volatile MessageService msgSrvc;
@@ -52,6 +54,12 @@ public class SqlQueryProcessor implements QueryProcessor {
private final TableManager tableManager;
+ /** Busy lock for stop synchronisation. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Event listeners to close. */
+ private final List<Pair<TableEvent, EventListener>> evtLsnrs = new
ArrayList<>();
+
public SqlQueryProcessor(
ClusterService clusterSrvc,
TableManager tableManager
@@ -62,17 +70,7 @@ public class SqlQueryProcessor implements QueryProcessor {
/** {@inheritDoc} */
@Override public void start() {
- String nodeName = clusterSrvc.localConfiguration().getName();
-
- taskExecutor = new QueryTaskExecutorImpl(
- new StripedThreadPoolExecutor(
- 4,
- NamedThreadFactory.threadPrefix(nodeName, "calciteQry"),
- null,
- true,
- DFLT_THREAD_KEEP_ALIVE_TIME
- )
- );
+ taskExecutor = new
QueryTaskExecutorImpl(clusterSrvc.localConfiguration().getName());
msgSrvc = new MessageServiceImpl(
clusterSrvc.topologyService(),
@@ -91,20 +89,51 @@ public class SqlQueryProcessor implements QueryProcessor {
ArrayRowHandler.INSTANCE
);
- tableManager.listen(TableEvent.CREATE, new
TableCreatedListener(schemaHolder));
- tableManager.listen(TableEvent.ALTER, new
TableUpdatedListener(schemaHolder));
- tableManager.listen(TableEvent.DROP, new
TableDroppedListener(schemaHolder));
+ registerTableListener(TableEvent.CREATE, new
TableCreatedListener(schemaHolder));
+ registerTableListener(TableEvent.ALTER, new
TableUpdatedListener(schemaHolder));
+ registerTableListener(TableEvent.DROP, new
TableDroppedListener(schemaHolder));
+
+ taskExecutor.start();
+ msgSrvc.start();
+ executionSrvc.start();
}
+ /** */
+ private void registerTableListener(TableEvent evt,
AbstractTableEventListener lsnr) {
+ evtLsnrs.add(Pair.of(evt, lsnr));
+
+ tableManager.listen(evt, lsnr);
+ }
/** {@inheritDoc} */
- @Override public void stop() throws NodeStoppingException {
- // TODO: IGNITE-15161 Implement component's stop.
+ @SuppressWarnings("unchecked")
+ @Override public void stop() throws Exception {
+ busyLock.block();
+
+ List<AutoCloseable> toClose = new
ArrayList<AutoCloseable>(Arrays.asList(
+ executionSrvc::stop,
+ msgSrvc::stop,
+ taskExecutor::stop
+ ));
+
+ toClose.addAll(evtLsnrs.stream()
+ .map((p) -> (AutoCloseable)() ->
tableManager.removeListener(p.left, p.right))
+ .collect(Collectors.toList()));
+
+ IgniteUtils.closeAll(toClose);
}
/** {@inheritDoc} */
@Override public List<SqlCursor<List<?>>> query(String schemaName, String
qry, Object... params) {
- return executionSrvc.executeQuery(schemaName, qry, params);
+ if (!busyLock.enterBusy())
+ throw new IgniteException(new NodeStoppingException());
+
+ try {
+ return executionSrvc.executeQuery(schemaName, qry, params);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
private abstract static class AbstractTableEventListener implements
EventListener<TableEventParameters> {
@@ -118,7 +147,7 @@ public class SqlQueryProcessor implements QueryProcessor {
/** {@inheritDoc} */
@Override public void remove(@NotNull Throwable exception) {
- throw new IllegalStateException();
+ // No-op.
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
index 56bb0e4..fb0d9f2 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
@@ -26,12 +26,16 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.thread.IgniteThread;
import org.apache.ignite.lang.IgniteLogger;
/**
*/
@SuppressWarnings({"rawtypes", "unchecked"})
-public class ClosableIteratorsHolder {
+public class ClosableIteratorsHolder implements LifecycleAware {
+ /** */
+ private final String nodeName;
+
/** */
private final ReferenceQueue refQueue;
@@ -45,16 +49,24 @@ public class ClosableIteratorsHolder {
private volatile boolean stopped;
/** */
- private Thread cleanWorker;
+ private volatile IgniteThread cleanWorker;
/** */
- public ClosableIteratorsHolder(IgniteLogger log) {
+ public ClosableIteratorsHolder(String nodeName, IgniteLogger log) {
+ this.nodeName = nodeName;
this.log = log;
refQueue = new ReferenceQueue<>();
refMap = new ConcurrentHashMap<>();
}
+ /** {@inheritDoc} */
+ @Override public void start() {
+ cleanWorker = new IgniteThread(nodeName,
"calciteIteratorsCleanWorker", () -> cleanUp(true));
+ cleanWorker.setDaemon(true);
+ cleanWorker.start();
+ }
+
/**
* @param src Closeable iterator.
* @return Weak closable iterator wrapper.
@@ -66,24 +78,6 @@ public class ClosableIteratorsHolder {
}
/** */
- public void init() {
- cleanWorker = new Thread(null, () -> cleanUp(true),
"calciteIteratorsCleanWorker");
- cleanWorker.setDaemon(true);
- cleanWorker.start();
- }
-
- /** */
- public void tearDown() {
- stopped = true;
- refMap.clear();
-
- Thread t = cleanWorker;
-
- if (t != null)
- t.interrupt();
- }
-
- /** */
private void cleanUp(boolean blocking) {
for (Reference<?> ref = nextRef(blocking); !stopped && ref != null;
ref = nextRef(blocking))
Commons.close(refMap.remove(ref), log);
@@ -107,6 +101,20 @@ public class ClosableIteratorsHolder {
return new CloseableReference(referent, resource);
}
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ stopped = true;
+
+ refMap.values().forEach(o -> Commons.close(o, log));
+
+ refMap.clear();
+
+ IgniteThread t = cleanWorker;
+
+ if (t != null)
+ t.interrupt();
+ }
+
/** */
private final class DelegatingIterator<T> implements Iterator<T>,
AutoCloseable {
/** */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
index 112c6c4..ed99a1d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
@@ -25,7 +25,7 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
/**
*
*/
-public interface ExchangeService {
+public interface ExchangeService extends LifecycleAware {
/**
* Sends a batch of data to remote node.
* @param nodeId Target node ID.
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index d5c0afc..bed5e6b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -57,6 +57,7 @@ public class ExchangeServiceImpl implements ExchangeService {
/** */
private final MessageService msgSrvc;
+ /** */
public ExchangeServiceImpl(
QueryTaskExecutor taskExecutor,
MailboxRegistry mailboxRegistry,
@@ -65,8 +66,14 @@ public class ExchangeServiceImpl implements ExchangeService {
this.taskExecutor = taskExecutor;
this.mailboxRegistry = mailboxRegistry;
this.msgSrvc = msgSrvc;
+ }
- init();
+ /** {@inheritDoc} */
+ @Override public void start() {
+ msgSrvc.register((n, m) -> onMessage(n, (InboxCloseMessage) m),
SqlQueryMessageGroup.INBOX_CLOSE_MESSAGE);
+ msgSrvc.register((n, m) -> onMessage(n, (OutboxCloseMessage) m),
SqlQueryMessageGroup.OUTBOX_CLOSE_MESSAGE);
+ msgSrvc.register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage)
m), SqlQueryMessageGroup.QUERY_BATCH_ACK);
+ msgSrvc.register((n, m) -> onMessage(n, (QueryBatchMessage) m),
SqlQueryMessageGroup.QUERY_BATCH_MESSAGE);
}
/** {@inheritDoc} */
@@ -135,13 +142,6 @@ public class ExchangeServiceImpl implements
ExchangeService {
);
}
- private void init() {
- msgSrvc.register((n, m) -> onMessage(n, (InboxCloseMessage) m),
SqlQueryMessageGroup.INBOX_CLOSE_MESSAGE);
- msgSrvc.register((n, m) -> onMessage(n, (OutboxCloseMessage) m),
SqlQueryMessageGroup.OUTBOX_CLOSE_MESSAGE);
- msgSrvc.register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage)
m), SqlQueryMessageGroup.QUERY_BATCH_ACK);
- msgSrvc.register((n, m) -> onMessage(n, (QueryBatchMessage) m),
SqlQueryMessageGroup.QUERY_BATCH_MESSAGE);
- }
-
/** {@inheritDoc} */
@Override public boolean alive(String nodeId) {
return msgSrvc.alive(nodeId);
@@ -259,4 +259,9 @@ public class ExchangeServiceImpl implements ExchangeService
{
null,
ImmutableMap.of());
}
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ // No-op.
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
index f550437..a1a7ed0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
@@ -25,7 +25,7 @@ import
org.apache.ignite.internal.processors.query.calcite.SqlCursor;
/**
*
*/
-public interface ExecutionService {
+public interface ExecutionService extends LifecycleAware {
/**
* Executes a query.
*
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index c99eff6..f2f2f6d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -117,6 +117,9 @@ public class ExecutionServiceImpl<Row> implements
ExecutionService {
private static final SqlQueryMessagesFactory FACTORY = new
SqlQueryMessagesFactory();
/** */
+ private final TopologyService topSrvc;
+
+ /** */
private final MessageService msgSrvc;
/** */
@@ -163,6 +166,7 @@ public class ExecutionServiceImpl<Row> implements
ExecutionService {
QueryTaskExecutor taskExecutor,
RowHandler<Row> handler
) {
+ this.topSrvc = topSrvc;
this.handler = handler;
this.msgSrvc = msgSrvc;
this.schemaHolder = schemaHolder;
@@ -172,24 +176,25 @@ public class ExecutionServiceImpl<Row> implements
ExecutionService {
qryPlanCache = planCache;
running = new ConcurrentHashMap<>();
ddlConverter = new DdlSqlToCommandConverter();
- iteratorsHolder = new ClosableIteratorsHolder(LOG);
+ iteratorsHolder = new
ClosableIteratorsHolder(topSrvc.localMember().name(), LOG);
mailboxRegistry = new MailboxRegistryImpl(topSrvc);
exchangeSrvc = new ExchangeServiceImpl(taskExecutor, mailboxRegistry,
msgSrvc);
mappingSrvc = new MappingServiceImpl(topSrvc);
// TODO: fix this
affSrvc = cacheId -> Objects::hashCode;
+ }
- topSrvc.addEventHandler(new NodeLeaveHandler(this::onNodeLeft));
+ /** {@inheritDoc} */
+ @Override public void start() {
+ iteratorsHolder.start();
+ mailboxRegistry.start();
+ exchangeSrvc.start();
- init();
- }
+ topSrvc.addEventHandler(new NodeLeaveHandler(this::onNodeLeft));
- private void init() {
msgSrvc.register((n, m) -> onMessage(n, (QueryStartRequest) m),
SqlQueryMessageGroup.QUERY_START_REQUEST);
msgSrvc.register((n, m) -> onMessage(n, (QueryStartResponse) m),
SqlQueryMessageGroup.QUERY_START_RESPONSE);
msgSrvc.register((n, m) -> onMessage(n, (ErrorMessage) m),
SqlQueryMessageGroup.ERROR_MESSAGE);
-
- iteratorsHolder.init();
}
/** {@inheritDoc} */
@@ -664,6 +669,12 @@ public class ExecutionServiceImpl<Row> implements
ExecutionService {
running.forEach((uuid, queryInfo) -> queryInfo.onNodeLeft(node.id()));
}
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws Exception {
+ IgniteUtils.closeAll(qryPlanCache::stop, iteratorsHolder::stop,
mailboxRegistry::stop, exchangeSrvc::stop);
+ }
+
/** */
private enum QueryState {
/** */
@@ -785,7 +796,7 @@ public class ExecutionServiceImpl<Row> implements
ExecutionService {
}
if (state0 == QueryState.CLOSED) {
- // 2) unregister runing query
+ // 2) unregister running query
running.remove(ctx.queryId());
IgniteInternalException wrpEx = null;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LifecycleAware.java
similarity index 67%
copy from
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
copy to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LifecycleAware.java
index f07658e..4913b70 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LifecycleAware.java
@@ -14,18 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.calcite.prepare;
-import java.util.List;
+package org.apache.ignite.internal.processors.query.calcite.exec;
-public class DummyPlanCache implements QueryPlanCache {
- /** {@inheritDoc} */
- @Override public List<QueryPlan> queryPlan(PlanningContext ctx, CacheKey
key, QueryPlanFactory factory) {
- return factory.create(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public void clear() {
+/**
+ * SQL query engine service life cycle mark interface.
+ */
+public interface LifecycleAware {
+ /**
+ * Initialize the service.
+ */
+ void start();
- }
+ /**
+ * Stop the service.
+ *
+ * @throws Exception on eny error during the stopping.
+ */
+ void stop() throws Exception;
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
index cdc6b54..4a32cf6 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
@@ -27,7 +27,7 @@ import org.jetbrains.annotations.Nullable;
/**
*
*/
-public interface MailboxRegistry {
+public interface MailboxRegistry extends LifecycleAware {
/**
* Tries to register and inbox node and returns it if success or returns
previously registered inbox otherwise.
*
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
index e9c46a9..02a8b42 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
@@ -42,15 +42,23 @@ public class MailboxRegistryImpl implements MailboxRegistry
{
private static final Predicate<Mailbox<?>> ALWAYS_TRUE = o -> true;
/** */
+ private final TopologyService topSrvc;
+
+ /** */
private final Map<MailboxKey, Outbox<?>> locals;
/** */
private final Map<MailboxKey, Inbox<?>> remotes;
public MailboxRegistryImpl(TopologyService topSrvc) {
+ this.topSrvc = topSrvc;
+
locals = new ConcurrentHashMap<>();
remotes = new ConcurrentHashMap<>();
+ }
+ /** {@inheritDoc} */
+ @Override public void start() {
topSrvc.addEventHandler(new NodeLeaveHandler(this::onNodeLeft));
}
@@ -126,6 +134,12 @@ public class MailboxRegistryImpl implements
MailboxRegistry {
return S.toString(MailboxRegistryImpl.class, this);
}
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ locals.clear();
+ remotes.clear();
+ }
+
/** */
private static class MailboxKey {
/** */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
index b4a5fb2..0dda0ed 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
@@ -23,7 +23,7 @@ import java.util.concurrent.CompletableFuture;
/**
*
*/
-public interface QueryTaskExecutor {
+public interface QueryTaskExecutor extends LifecycleAware {
/**
* Executes a query task in a thread, responsible for particular query
fragment.
*
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
index 81849d4..27f337a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutorImpl.java
@@ -19,26 +19,45 @@ package
org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+
+import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteLogger;
/** */
public class QueryTaskExecutorImpl implements QueryTaskExecutor,
Thread.UncaughtExceptionHandler {
+ /** Default Ignite thread keep alive time. */
+ public static final long DFLT_THREAD_KEEP_ALIVE_TIME = 60_000L;
+
/** */
private static final IgniteLogger LOG =
IgniteLogger.forClass(QueryTaskExecutorImpl.class);
/** */
- private final StripedThreadPoolExecutor stripedThreadPoolExecutor;
+ private final String nodeName;
+
+ /** */
+ private volatile StripedThreadPoolExecutor stripedThreadPoolExecutor;
/** */
private Thread.UncaughtExceptionHandler eHnd;
/**
- * @param stripedThreadPoolExecutor Executor.
+ * @param nodeName Node name.
*/
- public QueryTaskExecutorImpl(StripedThreadPoolExecutor
stripedThreadPoolExecutor) {
- this.stripedThreadPoolExecutor = stripedThreadPoolExecutor;
+ public QueryTaskExecutorImpl(String nodeName) {
+ this.nodeName = nodeName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() {
+ this.stripedThreadPoolExecutor = new StripedThreadPoolExecutor(
+ 4,
+ NamedThreadFactory.threadPrefix(nodeName, "calciteQry"),
+ null,
+ true,
+ DFLT_THREAD_KEEP_ALIVE_TIME
+ );
}
/**
@@ -75,11 +94,6 @@ public class QueryTaskExecutorImpl implements
QueryTaskExecutor, Thread.Uncaught
return stripedThreadPoolExecutor.submit(qryTask, hash(qryId,
fragmentId));
}
- /** Releases resources. */
- public void tearDown() {
- stripedThreadPoolExecutor.shutdownNow();
- }
-
/** {@inheritDoc} */
@Override public void uncaughtException(Thread t, Throwable e) {
if (eHnd != null)
@@ -91,4 +105,10 @@ public class QueryTaskExecutorImpl implements
QueryTaskExecutor, Thread.Uncaught
// inlined Objects.hash(...)
return IgniteUtils.safeAbs(31 * (31 + (qryId != null ?
qryId.hashCode() : 0)) + Long.hashCode(fragmentId));
}
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ if (stripedThreadPoolExecutor != null)
+ stripedThreadPoolExecutor.shutdownNow();
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index 2c39021..b102bd7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -99,7 +99,7 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
lock.lock();
try {
- if (waiting != -1)
+ if (waiting != -1 || !outBuff.isEmpty())
ex.compareAndSet(null, new IgniteException("Query was
cancelled"));
closed = true; // an exception has to be set first to get right
check order
@@ -229,7 +229,7 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
outBuff = tmp;
}
- if (waiting == -1)
+ if (waiting == -1 && outBuff.isEmpty())
close();
else if (inBuff.isEmpty() && waiting == 0) {
int req = waiting = inBufSize;
@@ -259,7 +259,10 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
if (e == null)
return;
- throw new IgniteInternalException(e);
+ if (e instanceof IgniteException)
+ throw (IgniteException)e;
+ else
+ throw new IgniteException("An error occurred while query
executing.", e);
// TODO: rework with SQL error code
// if (e instanceof IgniteSQLException)
// throw (IgniteSQLException)e;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
index cdf24e9..ad0831f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageService.java
@@ -17,13 +17,14 @@
package org.apache.ignite.internal.processors.query.calcite.message;
+import org.apache.ignite.internal.processors.query.calcite.exec.LifecycleAware;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.network.NetworkMessage;
/**
*
*/
-public interface MessageService {
+public interface MessageService extends LifecycleAware {
/**
* Sends a message to given node.
*
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
index e044cd7..b123204 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageServiceImpl.java
@@ -54,7 +54,7 @@ public class MessageServiceImpl implements MessageService {
private final QueryTaskExecutor taskExecutor;
/** */
- private Map<Short, MessageListener> lsnrs;
+ private volatile Map<Short, MessageListener> lsnrs;
/** */
public MessageServiceImpl(
@@ -67,7 +67,10 @@ public class MessageServiceImpl implements MessageService {
this.taskExecutor = taskExecutor;
locNodeId = topSrvc.localMember().id();
+ }
+ /** {@inheritDoc} */
+ @Override public void start() {
messagingSrvc.addMessageHandler(SqlQueryMessageGroup.class,
this::onMessage);
}
@@ -148,4 +151,10 @@ public class MessageServiceImpl implements MessageService {
lsnr.onMessage(nodeId, msg);
}
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ if (lsnrs != null)
+ lsnrs.clear();
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
index f07658e..4d0c4c7 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DummyPlanCache.java
@@ -26,6 +26,16 @@ public class DummyPlanCache implements QueryPlanCache {
/** {@inheritDoc} */
@Override public void clear() {
+ // No-op.
+ }
+ /** {@inheritDoc} */
+ @Override public void start() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ clear();
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
index 07f94f8..1d450ba 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCache.java
@@ -19,10 +19,12 @@ package
org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.List;
+import org.apache.ignite.internal.processors.query.calcite.exec.LifecycleAware;
+
/**
*
*/
-public interface QueryPlanCache {
+public interface QueryPlanCache extends LifecycleAware {
/**
* @param ctx Context.
* @param key Cache key.
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java
new file mode 100644
index 0000000..5cd7240
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/StopCalciteModuleTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Flow;
+
+import org.apache.ignite.internal.manager.EventListener;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.table.event.TableEvent;
+import org.apache.ignite.internal.table.event.TableEventParameters;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+/**
+ * Stop Calcite module test.
+ */
+@ExtendWith(MockitoExtension.class)
+public class StopCalciteModuleTest {
+ /** The logger. */
+ private static final IgniteLogger LOG =
IgniteLogger.forClass(StopCalciteModuleTest.class);
+
+ private static final int ROWS = 5;
+
+ private static final String NODE_NAME = "mock-node-name";
+
+ @Mock
+ ClusterService clusterSrvc;
+
+ @Mock
+ TableManager tableManager;
+
+ @Mock
+ MessagingService msgSrvc;
+
+ @Mock
+ TopologyService topologySrvc;
+
+ @Mock
+ ClusterLocalConfiguration localCfg;
+
+ @Mock
+ InternalTable tbl;
+
+ SchemaRegistry schemaReg;
+
+ @BeforeEach
+ public void before(TestInfo testInfo) {
+ when(clusterSrvc.messagingService()).thenReturn(msgSrvc);
+ when(clusterSrvc.topologyService()).thenReturn(topologySrvc);
+ when(clusterSrvc.localConfiguration()).thenReturn(localCfg);
+
+ ClusterNode node = new ClusterNode("mock-node-id", NODE_NAME, null);
+ when(topologySrvc.localMember()).thenReturn(node);
+
when(topologySrvc.allMembers()).thenReturn(Collections.singleton(node));
+
+ SchemaDescriptor schemaDesc = new SchemaDescriptor(
+ 0,
+ new Column[]{new Column("ID", NativeTypes.INT32, false)},
+ new Column[]{new Column("VAL", NativeTypes.INT32, false)}
+ );
+
+ schemaReg = new SchemaRegistryImpl(0, (v) -> schemaDesc);
+
+ when(tbl.tableName()).thenReturn("PUBLIC.TEST");
+
+ // Mock create table (notify on register listener).
+ doAnswer(invocation -> {
+ EventListener<TableEventParameters> clo =
(EventListener<TableEventParameters>)invocation.getArguments()[1];
+
+ clo.notify(new TableEventParameters(new
IgniteUuid(UUID.randomUUID(), 0), "TEST", new TableImpl(tbl, schemaReg,
tableManager)), null);
+
+ return null;
+ }).when(tableManager).listen(eq(TableEvent.CREATE), any());
+
+ RowAssembler asm = new RowAssembler(schemaReg.schema(), 0, 0, 0, 0);
+
+ asm.appendInt(0);
+ asm.appendInt(0);
+
+ BinaryRow binaryRow = asm.build();
+
+ // Mock table scan
+ doAnswer(invocation -> {
+ int part = (int)invocation.getArguments()[0];
+
+ return (Flow.Publisher<BinaryRow>)s -> {
+ s.onSubscribe(new Flow.Subscription() {
+ @Override public void request(long n) {
+ // No-op.
+ }
+
+ @Override public void cancel() {
+ // No-op.
+ }
+ });
+
+ if (part == 0) {
+ for (int i = 0; i < ROWS; ++i)
+ s.onNext(binaryRow);
+ }
+
+ s.onComplete();
+ };
+ }).when(tbl).scan(anyInt(), any());
+
+ LOG.info(">>>> Starting test {}",
testInfo.getTestMethod().orElseThrow().getName());
+ }
+
+ /** */
+ @Test
+ public void testStopQueryOnNodeStop() throws Exception {
+ SqlQueryProcessor qryProc = new SqlQueryProcessor(clusterSrvc,
tableManager);
+
+ qryProc.start();
+
+ List<SqlCursor<List<?>>> cursors = qryProc.query(
+ "PUBLIC",
+ "SELECT * FROM TEST"
+ );
+
+ SqlCursor<List<?>> cur = cursors.get(0);
+ cur.next();
+
+ assertTrue(isThereNodeThreads(NODE_NAME));
+
+ qryProc.stop();
+
+ // Check cursor closed.
+ assertTrue(assertThrows(IgniteException.class,
cur::hasNext).getMessage().contains("Query was cancelled"));
+ assertTrue(assertThrows(IgniteException.class,
cur::next).getMessage().contains("Query was cancelled"));
+
+ // Check execute query on stopped node.
+ assertTrue(assertThrows(IgniteException.class, () -> qryProc.query(
+ "PUBLIC",
+ "SELECT 1"
+ )).getCause() instanceof NodeStoppingException);
+
+ // Check: there are no alive Ignite threads.
+ assertFalse(isThereNodeThreads(NODE_NAME));
+ }
+
+ /**
+ * @return {@code true} is there are any threads with node name prefix;
+ * Otherwise returns {@code false}.
+ */
+ private boolean isThereNodeThreads(String nodeName) {
+ ThreadMXBean bean = ManagementFactory.getThreadMXBean();
+ ThreadInfo[] infos = bean.dumpAllThreads(true, true);
+
+ return Arrays.stream(infos)
+ .anyMatch((ti) -> ti.getThreadName().contains(nodeName));
+ }
+}
+
+
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index 1ab8beb..986e944 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
+
import com.google.common.collect.ImmutableMap;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
@@ -32,7 +33,6 @@ import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDesc
import
org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -50,21 +50,14 @@ public class AbstractExecutionTest extends
IgniteAbstractTest {
/** */
@BeforeEach
public void beforeTest() {
- taskExecutor = new QueryTaskExecutorImpl(
- new StripedThreadPoolExecutor(
- 4,
- "calciteQry",
- this::handle,
- true,
- 60_000L
- )
- );
+ taskExecutor = new QueryTaskExecutorImpl("no_node");
+ taskExecutor.start();
}
/** */
@AfterEach
public void afterTest() {
- taskExecutor.tearDown();
+ taskExecutor.stop();
if (lastE != null)
throw new AssertionError(lastE);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
new file mode 100644
index 0000000..11bf464
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/IgniteThread.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * This class adds some necessary plumbing on top of the {@link Thread} class.
+ * Specifically, it adds:
+ * <ul>
+ * <li>Consistent naming of threads</li>;
+ * <li>Name of the grid this thread belongs to</li>.
+ * </ul>
+ * <b>Note</b>: this class is intended for internal use only.
+ */
+public class IgniteThread extends Thread {
+ /** Number of all grid threads in the system. */
+ private static final AtomicLong cntr = new AtomicLong();
+
+ /** The name of the Ignite instance this thread belongs to. */
+ protected final String igniteInstanceName;
+
+ /**
+ * Creates grid thread with given name for a given Ignite instance.
+ *
+ * @param nodeName Name of the Ignite instance this thread is created for.
+ * @param threadName Name of thread.
+ */
+ public IgniteThread(String nodeName, String threadName) {
+ this(nodeName, threadName, null);
+ }
+
+ /**
+ * Creates grid thread with given name for a given Ignite instance.
+ *
+ * @param nodeName Name of the Ignite instance this thread is created for.
+ * @param threadName Name of thread.
+ * @param r Runnable to execute.
+ */
+ public IgniteThread(String nodeName, String threadName, Runnable r) {
+ super(r, createName(cntr.incrementAndGet(), threadName, nodeName));
+
+ this.igniteInstanceName = nodeName;
+ }
+
+ /**
+ * Gets name of the Ignite instance this thread belongs to.
+ *
+ * @return Name of the Ignite instance this thread belongs to.
+ */
+ public String getIgniteInstanceName() {
+ return igniteInstanceName;
+ }
+
+ /**
+ * @return IgniteThread or {@code null} if current thread is not an
instance of IgniteThread.
+ */
+ public static IgniteThread current() {
+ Thread thread = Thread.currentThread();
+
+ return thread.getClass() == IgniteThread.class || thread instanceof
IgniteThread ?
+ ((IgniteThread)thread) : null;
+ }
+
+ /**
+ * Create prefix for thread name.
+ */
+ public static String threadPrefix(String nodeName, String threadName) {
+ return "%" + nodeName + "%" + threadName + "-";
+ }
+
+ /**
+ * Creates new thread name.
+ *
+ * @param num Thread number.
+ * @param threadName Thread name.
+ * @param nodeName Ignite instance name.
+ * @return New thread name.
+ */
+ protected static String createName(long num, String threadName, String
nodeName) {
+ return threadPrefix(nodeName, threadName) + num;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteThread.class, this, "name", getName());
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
index 9a0691b..726f3df 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
@@ -90,7 +90,7 @@ public class NamedThreadFactory implements ThreadFactory {
* Create prefix for thread name.
*/
public static String threadPrefix(String nodeName, String poolName) {
- return "%" + nodeName + "%" + poolName + "-";
+ return IgniteThread.threadPrefix(nodeName, poolName);
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/lang/NodeStoppingException.java
b/modules/core/src/main/java/org/apache/ignite/lang/NodeStoppingException.java
index 02349ca..a126d57 100644
---
a/modules/core/src/main/java/org/apache/ignite/lang/NodeStoppingException.java
+++
b/modules/core/src/main/java/org/apache/ignite/lang/NodeStoppingException.java
@@ -30,7 +30,7 @@ public class NodeStoppingException extends
IgniteInternalCheckedException {
* Creates an empty node stopping exception.
*/
public NodeStoppingException() {
- super("Node is stopping.");
+ super("Operation has been cancelled (node is stopping).");
}
/**
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index fc85e7f..648f427 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -278,7 +278,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public synchronized void deployWatches() throws NodeStoppingException {
if (!busyLock.enterBusy())
- throw new NodeStoppingException("Operation has been cancelled
(node is stopping).");
+ throw new NodeStoppingException();
try {
var watch = watchAggregator.watch(
@@ -318,7 +318,7 @@ public class MetaStorageManager implements IgniteComponent {
@NotNull WatchListener lsnr
) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return waitForReDeploy(watchAggregator.add(key, lsnr));
@@ -340,7 +340,7 @@ public class MetaStorageManager implements IgniteComponent {
@NotNull WatchListener lsnr
) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return waitForReDeploy(watchAggregator.addPrefix(key, lsnr));
@@ -363,7 +363,7 @@ public class MetaStorageManager implements IgniteComponent {
@NotNull WatchListener lsnr
) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return waitForReDeploy(watchAggregator.add(keys, lsnr));
@@ -387,7 +387,7 @@ public class MetaStorageManager implements IgniteComponent {
@NotNull WatchListener lsnr
) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return waitForReDeploy(watchAggregator.add(from, to, lsnr));
@@ -405,7 +405,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public synchronized CompletableFuture<Void> unregisterWatch(long id) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
watchAggregator.cancel(id);
@@ -424,7 +424,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc -> svc.get(key));
@@ -439,7 +439,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key, long
revUpperBound) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc -> svc.get(key,
revUpperBound));
@@ -454,7 +454,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull CompletableFuture<Map<ByteArray, Entry>>
getAll(Set<ByteArray> keys) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys));
@@ -469,7 +469,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull CompletableFuture<Map<ByteArray, Entry>>
getAll(Set<ByteArray> keys, long revUpperBound) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys,
revUpperBound));
@@ -484,7 +484,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte[]
val) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc -> svc.put(key, val));
@@ -499,7 +499,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull CompletableFuture<Entry> getAndPut(@NotNull ByteArray key,
byte[] val) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc -> svc.getAndPut(key,
val));
@@ -514,7 +514,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray,
byte[]> vals) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc -> svc.putAll(vals));
@@ -529,7 +529,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull CompletableFuture<Map<ByteArray, Entry>>
getAndPutAll(@NotNull Map<ByteArray, byte[]> vals) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc ->
svc.getAndPutAll(vals));
@@ -544,7 +544,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc -> svc.remove(key));
@@ -559,7 +559,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull ByteArray
key) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemove(key));
@@ -574,7 +574,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull CompletableFuture<Void> removeAll(@NotNull Set<ByteArray>
keys) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc -> svc.removeAll(keys));
@@ -589,7 +589,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull CompletableFuture<Map<ByteArray, Entry>>
getAndRemoveAll(@NotNull Set<ByteArray> keys) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc ->
svc.getAndRemoveAll(keys));
@@ -610,7 +610,7 @@ public class MetaStorageManager implements IgniteComponent {
@NotNull Operation failure
) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond,
success, failure));
@@ -629,7 +629,7 @@ public class MetaStorageManager implements IgniteComponent {
@NotNull Collection<Operation> failure
) {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond,
success, failure));
@@ -644,7 +644,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable
ByteArray keyTo, long revUpperBound) throws NodeStoppingException {
if (!busyLock.enterBusy())
- throw new NodeStoppingException("Operation has been cancelled
(node is stopping).");
+ throw new NodeStoppingException();
try {
return new CursorWrapper<>(
@@ -671,7 +671,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull Cursor<Entry> rangeWithAppliedRevision(@NotNull ByteArray
keyFrom, @Nullable ByteArray keyTo) throws NodeStoppingException {
if (!busyLock.enterBusy())
- throw new NodeStoppingException("Operation has been cancelled
(node is stopping).");
+ throw new NodeStoppingException();
try {
return new CursorWrapper<>(
@@ -688,7 +688,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable
ByteArray keyTo) throws NodeStoppingException {
if (!busyLock.enterBusy())
- throw new NodeStoppingException("Operation has been cancelled
(node is stopping).");
+ throw new NodeStoppingException();
try {
return new CursorWrapper<>(
@@ -716,7 +716,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull Cursor<Entry> prefixWithAppliedRevision(@NotNull ByteArray
keyPrefix) throws NodeStoppingException {
if (!busyLock.enterBusy())
- throw new NodeStoppingException("Operation has been cancelled
(node is stopping).");
+ throw new NodeStoppingException();
try {
var rangeCriterion =
KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
@@ -761,7 +761,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull Cursor<Entry> prefix(@NotNull ByteArray keyPrefix, long
revUpperBound) throws NodeStoppingException {
if (!busyLock.enterBusy())
- throw new NodeStoppingException("Operation has been cancelled
(node is stopping).");
+ throw new NodeStoppingException();
try {
var rangeCriterion =
KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
@@ -779,7 +779,7 @@ public class MetaStorageManager implements IgniteComponent {
*/
public @NotNull CompletableFuture<Void> compact() {
if (!busyLock.enterBusy())
- return CompletableFuture.failedFuture(new
NodeStoppingException("Operation has been cancelled (node is stopping)."));
+ return CompletableFuture.failedFuture(new NodeStoppingException());
try {
return metaStorageSvcFut.thenCompose(MetaStorageService::compact);
@@ -920,7 +920,7 @@ public class MetaStorageManager implements IgniteComponent {
/** {@inheritDoc} */
@Override public void close() throws Exception {
if (!busyLock.enterBusy())
- throw new NodeStoppingException("Operation has been cancelled
(node is stopping).");
+ throw new NodeStoppingException();
try {
innerCursorFut.thenApply(cursor -> {